The Gaudi Framework  v29r0 (ff2e7097)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 #include "ThreadPoolSvc.h"
2 
5 #include "ThreadInitTask.h"
6 
7 #include "tbb/task.h"
8 #include "tbb/task_scheduler_init.h"
9 #include "tbb/task_scheduler_observer.h"
10 #include "tbb/tbb_thread.h"
11 #include "tbb/tick_count.h"
12 
13 using namespace tbb;
14 
16 
17 //=============================================================================
18 
19 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc )
20  : base_class( name, svcLoc )
21  , m_threadInitTools( this )
22  , m_init( false )
23  , m_threadPoolSize( 0 )
24  , m_tbbSchedInit( nullptr )
25  , m_barrier( nullptr )
26 {
27 
28  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
29 }
30 
31 //-----------------------------------------------------------------------------
32 
34 {
35 
36  // Initialise mother class (read properties, ...)
38  if ( !sc.isSuccess() ) {
39  warning() << "Base class could not be initialized" << endmsg;
40  return StatusCode::FAILURE;
41  }
42 
43  if ( m_threadInitTools.retrieve().isFailure() ) {
44  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
45 
46  return StatusCode::FAILURE;
47  } else {
48  if ( m_threadInitTools.size() != 0 ) {
49  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
50  } else {
51  info() << "no thread init tools attached" << endmsg;
52  }
53  }
54 
55  return StatusCode::SUCCESS;
56 }
57 
58 //-----------------------------------------------------------------------------
59 
61 {
62 
63  if ( !m_init ) {
64  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
65  << "was never initialized" << endmsg;
66  }
67 
68  return StatusCode::SUCCESS;
69 }
70 
71 //-----------------------------------------------------------------------------
72 
73 StatusCode ThreadPoolSvc::initPool( const int& poolSize )
74 {
75 
76  tbb::spin_mutex::scoped_lock lock( m_initMutex );
77 
78  m_threadPoolSize = poolSize;
79 
80  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
81  // There is a problem in the piece of the code below. if
82  // m_threadPoolSize is set to something negative which is < -1,
83  // algorithm below might not behave as expected. For the time being
84  // I've choosen to create the barrier with the default number of
85  // threads created by the task scheduler init assuming that a
86  // negative value will choose automatic thread creation which will
87  // create default number of threads.
88  // SK
89 
90  // -100 prevents the creation of the pool and the scheduler directly
91  // executes the tasks.
92  if ( -100 != m_threadPoolSize ) {
93  if ( msgLevel( MSG::DEBUG ) ) debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
94 
95  // Leave -1 in case selected, increment otherwise
96  // - What?
97  int thePoolSize = m_threadPoolSize;
98  if ( thePoolSize != -1 ) thePoolSize += 1;
99 
100  // Create the TBB task scheduler
101  m_tbbSchedInit = std::unique_ptr<tbb::task_scheduler_init>( new tbb::task_scheduler_init( thePoolSize ) );
102  // Create the barrier for task synchronization
103  if ( m_threadPoolSize <= -1 ) thePoolSize = m_tbbSchedInit->default_num_threads();
104  if ( msgLevel( MSG::DEBUG ) ) {
105  debug() << "creating barrier of size " << thePoolSize << endmsg;
106  }
108 
109  m_barrier = std::unique_ptr<boost::barrier>( new boost::barrier( thePoolSize ) );
110 
111  } else {
113  }
114 
115  // Launch the init tool tasks
116  const bool terminate = false;
117  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
118 
119  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread Pool initialization complete!" << endmsg;
120 
121  m_init = true;
122 
123  return StatusCode::SUCCESS;
124 }
125 
126 //-----------------------------------------------------------------------------
127 
129 {
130  tbb::spin_mutex::scoped_lock lock( m_initMutex );
131  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
132 
133  if ( !m_init ) {
134  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
135  return StatusCode::FAILURE;
136  }
137 
138  // Launch the termination tasks
139  const bool terminate = true;
140  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
141 
142  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread pool termination complete!" << endmsg;
143 
144  return StatusCode::SUCCESS;
145 }
146 
147 //-----------------------------------------------------------------------------
148 
150 {
151 
152  if ( m_threadInitTools.empty() ) return StatusCode::SUCCESS;
153 
154  const std::string taskType = terminate ? "termination" : "initialization";
155 
156  // If we have a thread pool (via a scheduler), then we want to queue
157  // the tasks in TBB to execute on each thread.
158  if ( m_tbbSchedInit ) {
159 
160  // Create one task for each worker thread in the pool
161  for ( int i = 0; i < m_threadPoolSize; ++i ) {
162  if ( msgLevel( MSG::DEBUG ) ) debug() << "creating ThreadInitTask " << i << endmsg;
163  tbb::task* t = new ( tbb::task::allocate_root() )
164  ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate );
165 
166  // Queue the task
167  tbb::task::enqueue( *t );
168  this_tbb_thread::sleep( tbb::tick_count::interval_t( .1 ) );
169  }
170 
171  // Now wait for all the workers to reach the barrier
172  if ( msgLevel( MSG::DEBUG ) ) debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
173  m_barrier->wait();
174 
175  // Check to make sure all Tools were invoked.
176  // I'm not sure this mechanism is worthwhile.
177  for ( auto& t : m_threadInitTools ) {
178  // Number of threads initialized but not terminated.
179  int numInit = t->nInit();
180  // Expected number based on the type of task.
181  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
182  if ( numInit != expectedNumInit ) {
183  error() << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
184  << t->nInit() << " out of " << m_threadPoolSize << " are currently active" << endmsg;
185  return StatusCode::FAILURE;
186  }
187  }
188 
189  }
190 
191  // In single-threaded mode, there is no scheduler, so we simply call
192  // the task wrapper directly in this thread.
193  else {
194  if ( msgLevel( MSG::DEBUG ) ) debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
195  boost::barrier* noBarrier = nullptr;
196  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), terminate );
197  theTask.execute();
198  }
199 
200  // Now, we do some error checking
201  if ( ThreadInitTask::execFailed() ) {
202  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
203  return StatusCode::FAILURE;
204  }
205 
206  return StatusCode::SUCCESS;
207 }
virtual StatusCode finalize() override final
Finalise.
static GAUDI_API void setNumThreads(const std::size_t &nT)
StatusCode initialize() override
Definition: Service.cpp:64
tbb::task * execute() override
Execute the task.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
virtual StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
STL namespace.
STL class.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
Gaudi::Details::PropertyBase * declareProperty(const std::string &name, TYPE &value, const std::string &doc="none")
Declare a property (templated)
static bool execFailed()
virtual StatusCode initialize() override final
Initialise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:26
virtual StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209