The Gaudi Framework  v30r3 (a5ef0a68)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 #include "ThreadPoolSvc.h"
2 
4 #include "ThreadInitTask.h"
5 
6 #include "tbb/task.h"
7 #include "tbb/task_scheduler_init.h"
8 #include "tbb/task_scheduler_observer.h"
9 #include "tbb/tbb_thread.h"
10 #include "tbb/tick_count.h"
11 
12 using namespace tbb;
13 
15 
16 //=============================================================================
17 
18 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc )
19 {
20  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
21 }
22 
23 //-----------------------------------------------------------------------------
24 
26 {
27 
28  // Initialise mother class (read properties, ...)
30  if ( !sc.isSuccess() ) {
31  warning() << "Base class could not be initialized" << endmsg;
32  return StatusCode::FAILURE;
33  }
34 
35  if ( m_threadInitTools.retrieve().isFailure() ) {
36  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
37 
38  return StatusCode::FAILURE;
39  }
40  if ( m_threadInitTools.size() != 0 ) {
41  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
42  } else {
43  info() << "no thread init tools attached" << endmsg;
44  }
45 
46  return StatusCode::SUCCESS;
47 }
48 
49 //-----------------------------------------------------------------------------
50 
52 {
53 
54  if ( !m_init ) {
55  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
56  << "was never initialized" << endmsg;
57  }
58 
59  return StatusCode::SUCCESS;
60 }
61 
62 //-----------------------------------------------------------------------------
63 
64 StatusCode ThreadPoolSvc::initPool( const int& poolSize )
65 {
66 
67  tbb::spin_mutex::scoped_lock lock( m_initMutex );
68 
69  m_threadPoolSize = poolSize;
70 
71  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
72  // There is a problem in the piece of the code below. if
73  // m_threadPoolSize is set to something negative which is < -1,
74  // algorithm below might not behave as expected. For the time being
75  // I've choosen to create the barrier with the default number of
76  // threads created by the task scheduler init assuming that a
77  // negative value will choose automatic thread creation which will
78  // create default number of threads.
79  // SK
80 
81  // -100 prevents the creation of the pool and the scheduler directly
82  // executes the tasks.
83  if ( -100 != m_threadPoolSize ) {
84  if ( msgLevel( MSG::DEBUG ) ) debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
85 
86  // Leave -1 in case selected, increment otherwise
87  // - What?
88  int thePoolSize = m_threadPoolSize;
89  if ( thePoolSize != -1 ) thePoolSize += 1;
90 
91  // Create the TBB task scheduler
92  m_tbbSchedInit = std::make_unique<tbb::task_scheduler_init>( thePoolSize );
93  // Create the barrier for task synchronization
94  if ( m_threadPoolSize <= -1 ) thePoolSize = m_tbbSchedInit->default_num_threads();
95  if ( msgLevel( MSG::DEBUG ) ) {
96  debug() << "creating barrier of size " << thePoolSize << endmsg;
97  }
99 
100  m_barrier = std::make_unique<boost::barrier>( thePoolSize );
101 
102  } else {
104  }
105 
106  // Launch the init tool tasks
107  const bool terminate = false;
108  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
109 
110  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread Pool initialization complete!" << endmsg;
111 
112  m_init = true;
113 
114  return StatusCode::SUCCESS;
115 }
116 
117 //-----------------------------------------------------------------------------
118 
120 {
121  tbb::spin_mutex::scoped_lock lock( m_initMutex );
122  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
123 
124  if ( !m_init ) {
125  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
126  return StatusCode::FAILURE;
127  }
128 
129  // Launch the termination tasks
130  const bool terminate = true;
131  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
132 
133  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread pool termination complete!" << endmsg;
134 
135  return StatusCode::SUCCESS;
136 }
137 
138 //-----------------------------------------------------------------------------
139 
141 {
142 
143  if ( m_threadInitTools.empty() ) return StatusCode::SUCCESS;
144 
145  const std::string taskType = terminate ? "termination" : "initialization";
146 
147  // If we have a thread pool (via a scheduler), then we want to queue
148  // the tasks in TBB to execute on each thread.
149  if ( m_tbbSchedInit ) {
150 
151  // Create one task for each worker thread in the pool
152  for ( int i = 0; i < m_threadPoolSize; ++i ) {
153  if ( msgLevel( MSG::DEBUG ) ) debug() << "creating ThreadInitTask " << i << endmsg;
154  tbb::task* t = new ( tbb::task::allocate_root() )
155  ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate );
156 
157  // Queue the task
158  tbb::task::enqueue( *t );
159  this_tbb_thread::sleep( tbb::tick_count::interval_t( .1 ) );
160  }
161 
162  // Now wait for all the workers to reach the barrier
163  if ( msgLevel( MSG::DEBUG ) ) debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
164  m_barrier->wait();
165 
166  // Check to make sure all Tools were invoked.
167  // I'm not sure this mechanism is worthwhile.
168  for ( auto& t : m_threadInitTools ) {
169  // Number of threads initialized but not terminated.
170  int numInit = t->nInit();
171  // Expected number based on the type of task.
172  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
173  if ( numInit != expectedNumInit ) {
174  error() << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
175  << t->nInit() << " out of " << m_threadPoolSize << " are currently active" << endmsg;
176  return StatusCode::FAILURE;
177  }
178  }
179 
180  }
181 
182  // In single-threaded mode, there is no scheduler, so we simply call
183  // the task wrapper directly in this thread.
184  else {
185  if ( msgLevel( MSG::DEBUG ) ) debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
186  boost::barrier* noBarrier = nullptr;
187  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), terminate );
188  theTask.execute();
189  }
190 
191  // Now, we do some error checking
192  if ( ThreadInitTask::execFailed() ) {
193  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
194  return StatusCode::FAILURE;
195  }
196 
197  return StatusCode::SUCCESS;
198 }
StatusCode finalize() override final
Finalise.
static GAUDI_API void setNumThreads(const std::size_t &nT)
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
tbb::task * execute() override
Execute the task.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
bool isSuccess() const
Definition: StatusCode.h:287
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
STL namespace.
STL class.
#define DECLARE_COMPONENT(type)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
Gaudi::Details::PropertyBase * declareProperty(const std::string &name, TYPE &value, const std::string &doc="none")
Declare a property (templated)
static bool execFailed()
StatusCode initialize() override final
Initialise.
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:26
StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209