ThreadPoolSvc.cpp
Go to the documentation of this file.
1 #include "ThreadPoolSvc.h"
2 
5 #include "ThreadInitTask.h"
6 
7 #include "tbb/task_scheduler_init.h"
8 #include "tbb/task_scheduler_observer.h"
9 #include "tbb/task.h"
10 #include "tbb/tick_count.h"
11 #include "tbb/tbb_thread.h"
12 
13 using namespace tbb;
14 
16 
17 //=============================================================================
18 
19 
20 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ):
21  base_class(name,svcLoc),
22  m_threadInitTools(this),
23  m_init(false),
24  m_threadPoolSize(0),
25  m_tbbSchedInit(nullptr),
26  m_barrier(nullptr)
27 {
28 
29  declareProperty("ThreadInitTools", m_threadInitTools,
30  "ToolHandleArray of IThreadInitTools");
31 
32 }
33 
34 //-----------------------------------------------------------------------------
35 
38 
39  // Initialise mother class (read properties, ...)
41  if (!sc.isSuccess()) {
42  warning () << "Base class could not be initialized" << endmsg;
43  return StatusCode::FAILURE;
44  }
45 
46  if (m_threadInitTools.retrieve().isFailure()) {
47  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
48 
49  return StatusCode::FAILURE;
50  } else {
51  if (m_threadInitTools.size() != 0) {
52  info() << "retrieved " << m_threadInitTools.size() << " thread init tools"
53  << endmsg;
54  } else {
55  info() << "no thread init tools attached" << endmsg;
56  }
57  }
58 
59  return StatusCode::SUCCESS;
60 
61 }
62 
63 //-----------------------------------------------------------------------------
64 
67 
68  if (!m_init) {
69  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
70  << "was never initialized" << endmsg;
71  }
72 
73  return StatusCode::SUCCESS;
74 
75 }
76 
77 //-----------------------------------------------------------------------------
78 
80 ThreadPoolSvc::initPool(const int& poolSize) {
81 
82  tbb::spin_mutex::scoped_lock lock( m_initMutex );
83 
84  m_threadPoolSize = poolSize;
85 
86  if (msgLevel(MSG::DEBUG))
87  debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
88  //There is a problem in the piece of the code below. if
89  // m_threadPoolSize is set to something negative which is < -1,
90  // algorithm below might not behave as expected. For the time being
91  // I've choosen to create the barrier with the default number of
92  // threads created by the task scheduler init assuming that a
93  // negative value will choose automatic thread creation which will
94  // create default number of threads.
95  // SK
96 
97  // -100 prevents the creation of the pool and the scheduler directly
98  // executes the tasks.
99  if (-100 != m_threadPoolSize) {
100  if (msgLevel(MSG::DEBUG))
101  debug() << "Initialising a thread pool of size "
102  << m_threadPoolSize << endmsg;
103 
104  // Leave -1 in case selected, increment otherwise
105  // - What?
106  int thePoolSize = m_threadPoolSize;
107  if (thePoolSize != -1) thePoolSize += 1;
108 
109 
110  // Create the TBB task scheduler
111  m_tbbSchedInit = std::unique_ptr<tbb::task_scheduler_init>( new tbb::task_scheduler_init(thePoolSize) );
112  // Create the barrier for task synchronization
113  if(m_threadPoolSize<=-1)thePoolSize=m_tbbSchedInit->default_num_threads();
114  if (msgLevel(MSG::DEBUG)){
115  debug() << "creating barrier of size " << thePoolSize << endmsg;
116  }
118 
119  m_barrier = std::unique_ptr<boost::barrier>( new boost::barrier(thePoolSize) );
120 
121  } else {
123  }
124 
125  // Launch the init tool tasks
126  const bool terminate = false;
127  if (launchTasks(terminate).isFailure())
128  return StatusCode::FAILURE;
129 
130  if (msgLevel(MSG::DEBUG))
131  debug() << "Thread Pool initialization complete!" << endmsg;
132 
133  m_init = true;
134 
135  return StatusCode::SUCCESS;
136 
137 }
138 
139 //-----------------------------------------------------------------------------
140 
143  tbb::spin_mutex::scoped_lock lock( m_initMutex );
144  if (msgLevel(MSG::DEBUG))
145  debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
146 
147  if (!m_init) {
148  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
149  return StatusCode::FAILURE;
150  }
151 
152  // Launch the termination tasks
153  const bool terminate = true;
154  if (launchTasks(terminate).isFailure())
155  return StatusCode::FAILURE;
156 
157  if (msgLevel(MSG::DEBUG))
158  debug() << "Thread pool termination complete!" << endmsg;
159 
160  return StatusCode::SUCCESS;
161 }
162 
163 //-----------------------------------------------------------------------------
164 
166 ThreadPoolSvc::launchTasks(bool terminate) {
167 
168  if (m_threadInitTools.empty()) return StatusCode::SUCCESS;
169 
170  const std::string taskType = terminate ? "termination" : "initialization";
171 
172  // If we have a thread pool (via a scheduler), then we want to queue
173  // the tasks in TBB to execute on each thread.
174  if(m_tbbSchedInit) {
175 
176  // Create one task for each worker thread in the pool
177  for (int i = 0; i < m_threadPoolSize; ++i) {
178  if (msgLevel(MSG::DEBUG))
179  debug() << "creating ThreadInitTask " << i << endmsg;
180  tbb::task* t = new(tbb::task::allocate_root())
181  ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate );
182 
183  // Queue the task
184  tbb::task::enqueue( *t );
185  this_tbb_thread::sleep(tbb::tick_count::interval_t(.1));
186  }
187 
188  // Now wait for all the workers to reach the barrier
189  if (msgLevel(MSG::DEBUG))
190  debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
191  m_barrier->wait();
192 
193  // Check to make sure all Tools were invoked.
194  // I'm not sure this mechanism is worthwhile.
195  for (auto& t : m_threadInitTools) {
196  // Number of threads initialized but not terminated.
197  int numInit = t->nInit();
198  // Expected number based on the type of task.
199  int expectedNumInit = terminate? 0 : m_threadPoolSize;
200  if (numInit != expectedNumInit) {
201  error() << "not all threads " << (terminate? "terminated" : "initialized")
202  << " for tool " << t << " : "
203  << t->nInit() << " out of " << m_threadPoolSize
204  << " are currently active" << endmsg;
205  return StatusCode::FAILURE;
206  }
207  }
208 
209  }
210 
211  // In single-threaded mode, there is no scheduler, so we simply call
212  // the task wrapper directly in this thread.
213  else {
214  if (msgLevel(MSG::DEBUG))
215  debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
216  boost::barrier* noBarrier = nullptr;
217  ThreadInitTask theTask(m_threadInitTools, noBarrier, serviceLocator(), terminate);
218  theTask.execute();
219  }
220 
221  // Now, we do some error checking
223  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
224  return StatusCode::FAILURE;
225  }
226 
227  return StatusCode::SUCCESS;
228 }
virtual StatusCode finalize() override final
Finalise.
static GAUDI_API void setNumThreads(const std::size_t &nT)
StatusCode initialize() override
Definition: Service.cpp:64
tbb::task * execute() override
Execute the task.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
virtual StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
STL namespace.
STL class.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Details::PropertyBase * declareProperty(const std::string &name, TYPE &value, const std::string &doc="none")
Declare a property (templated)
static bool execFailed()
virtual StatusCode initialize() override final
Initialise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:26
virtual StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244