The Gaudi Framework  v32r2 (46d42edc)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 #include "ThreadPoolSvc.h"
2 
4 #include "ThreadInitTask.h"
5 
6 #include "tbb/task.h"
7 #include "tbb/task_scheduler_init.h"
8 #include "tbb/task_scheduler_observer.h"
9 #include "tbb/tbb_thread.h"
10 #include "tbb/tick_count.h"
11 
12 using namespace tbb;
13 
14 namespace Gaudi {
15  namespace Concurrency {
16  extern thread_local bool ThreadInitDone;
17  }
18 } // namespace Gaudi
19 
21 
22 //=============================================================================
23 
24 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
25  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
26 }
27 
28 //-----------------------------------------------------------------------------
29 
31 
32  // Initialise mother class (read properties, ...)
34  if ( !sc.isSuccess() ) {
35  warning() << "Base class could not be initialized" << endmsg;
36  return StatusCode::FAILURE;
37  }
38 
39  if ( m_threadInitTools.retrieve().isFailure() ) {
40  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
41 
42  return StatusCode::FAILURE;
43  }
44  if ( m_threadInitTools.size() != 0 ) {
45  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
46  } else {
47  info() << "no thread init tools attached" << endmsg;
48  }
49 
50  return StatusCode::SUCCESS;
51 }
52 
53 //-----------------------------------------------------------------------------
54 
56 
57  if ( !m_init ) {
58  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
59  << "was never initialized" << endmsg;
60  }
61 
62  return StatusCode::SUCCESS;
63 }
64 
65 //-----------------------------------------------------------------------------
66 
67 StatusCode ThreadPoolSvc::initPool( const int& poolSize ) {
68 
69  tbb::spin_mutex::scoped_lock lock( m_initMutex );
70 
71  m_threadPoolSize = poolSize;
72 
73  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
74  // There is a problem in the piece of the code below. if
75  // m_threadPoolSize is set to something negative which is < -1,
76  // algorithm below might not behave as expected. For the time being
77  // I've choosen to create the barrier with the default number of
78  // threads created by the task scheduler init assuming that a
79  // negative value will choose automatic thread creation which will
80  // create default number of threads.
81  // SK
82 
83  // -100 prevents the creation of the pool and the scheduler directly
84  // executes the tasks.
85  // -1 means use all available cores
86 
87  if ( -100 != m_threadPoolSize ) {
88  if ( msgLevel( MSG::DEBUG ) ) debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
89 
90  // Leave -1 in case selected, increment otherwise
91  // - What?
92  int thePoolSize = m_threadPoolSize;
93  if ( thePoolSize >= 0 ) thePoolSize += 1;
94 
95  if ( m_threadPoolSize == -1 ) {
96  // if requested pool size == -1, use number of available cores
97  m_tbbSchedInit = std::make_unique<tbb::task_scheduler_init>();
98  thePoolSize = m_tbbSchedInit->default_num_threads();
99  m_threadPoolSize = thePoolSize;
100  } else if ( m_threadPoolSize >= 0 ) {
101  // Limit the number of threads to requested pool size plus 1
102  m_tbbgc = std::make_unique<tbb::global_control>( global_control::max_allowed_parallelism, thePoolSize );
103  m_tbbSchedInit = std::make_unique<tbb::task_scheduler_init>( thePoolSize );
104  } else {
105  fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
106  << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
107  return StatusCode::FAILURE;
108  }
109 
111 
112  // Create the barrier for task synchronization at termination
113  m_barrier = std::make_unique<boost::barrier>( thePoolSize );
114 
115  } else {
116  // don't use a thread pool
118  m_tbbgc = std::make_unique<tbb::global_control>( global_control::max_allowed_parallelism, 0 );
119  }
120 
121  if ( msgLevel( MSG::DEBUG ) )
122  debug() << "Thread Pool initialization complete. Max task concurrency: "
123  << tbb::global_control::active_value( global_control::max_allowed_parallelism ) << endmsg;
124 
125  m_init = true;
126 
127  return StatusCode::SUCCESS;
128 }
129 
130 //-----------------------------------------------------------------------------
131 
133  tbb::spin_mutex::scoped_lock lock( m_initMutex );
134  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
135 
136  if ( !m_init ) {
137  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
138  return StatusCode::FAILURE;
139  }
140 
141  // Launch the termination tasks
142  const bool terminate = true;
143  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
144 
145  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread pool termination complete!" << endmsg;
146 
147  return StatusCode::SUCCESS;
148 }
149 
150 //-----------------------------------------------------------------------------
151 
153 
154  const std::string taskType = terminate ? "termination" : "initialization";
155 
156  // If we have a thread pool (via a scheduler), then we want to queue
157  // the tasks in TBB to execute on each thread.
158  if ( m_tbbSchedInit ) {
159 
160  // Create one task for each worker thread in the pool
161  for ( int i = 0; i < m_threadPoolSize; ++i ) {
162  if ( msgLevel( MSG::DEBUG ) ) debug() << "creating ThreadInitTask " << i << endmsg;
163  tbb::task* t = new ( tbb::task::allocate_root() )
164  ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate );
165 
166  // Queue the task
167  tbb::task::enqueue( *t );
168  this_tbb_thread::sleep( tbb::tick_count::interval_t( .02 ) );
169  }
170 
171  // Now wait for all the workers to reach the barrier
172  if ( msgLevel( MSG::DEBUG ) ) debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
173  m_barrier->wait();
174 
175  // Check to make sure all Tools were invoked.
176  // I'm not sure this mechanism is worthwhile.
177  for ( auto& t : m_threadInitTools ) {
178  // Number of threads initialized but not terminated.
179  int numInit = t->nInit();
180  // Expected number based on the type of task.
181  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
182  if ( numInit != expectedNumInit ) {
183  std::ostringstream ost;
184  ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
185  << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
186  if ( terminate ) {
187  // it is likely the case that tbb activated new theads
188  // late in the game, and extra initializations were done
189  info() << ost.str() << endmsg;
190  } else {
191  error() << ost.str() << endmsg;
192  return StatusCode::FAILURE;
193  }
194  }
195  }
196 
197  }
198 
199  // In single-threaded mode, there is no scheduler, so we simply call
200  // the task wrapper directly in this thread.
201  else {
202  if ( msgLevel( MSG::DEBUG ) ) debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
203  boost::barrier* noBarrier = nullptr;
204  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), terminate );
205  theTask.execute();
206  }
207 
208  // Now, we do some error checking
209  if ( ThreadInitTask::execFailed() ) {
210  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
211  return StatusCode::FAILURE;
212  }
213 
214  return StatusCode::SUCCESS;
215 }
216 
217 //-----------------------------------------------------------------------------
218 
219 // tbb will actually create more threads than requested, and will sometimes
220 // activate them late. This method is used to initialize one of these threads
221 // when it is detected
222 
223 // note TBB generates address sanitizer errors here, e.g.
224 //
225 // ==51081==ERROR: AddressSanitizer: stack-buffer-overflow on address 0x7fe7decf5195 at pc 0x7fe7e5da48bf bp
226 // 0x7fe7decf4f70 sp 0x7fe7decf4f68 WRITE of size 1 at 0x7fe7decf5195 thread T4
227 // #0 0x7fe7e5da48be in tbb::task::task()
228 // /cvmfs/lhcb.cern.ch/lib/lcg/releases/tbb/2019_U1-5939b/x86_64-centos7-gcc8-dbg/include/tbb/task.h:586
229 //
230 // Use GAUDI_NO_SANITIZE_ADDRESS to suppress these.
231 // To be looked at again when TBB is updated.
232 
234 
236  // this should never happen
237  error() << "initThisThread triggered, but thread already initialized" << endmsg;
238  throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
239  }
240 
241  boost::barrier* noBarrier = nullptr;
242  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), false );
243  theTask.execute();
244 }
StatusCode finalize() override final
Finalise.
static GAUDI_API void setNumThreads(const std::size_t &nT)
StatusCode initialize() override
Definition: Service.cpp:60
tbb::task * execute() override
Execute the task.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
STL namespace.
virtual void initThisThread() override
STL class.
#define GAUDI_NO_SANITIZE_ADDRESS
Definition: Kernel.h:131
#define DECLARE_COMPONENT(type)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
static bool execFailed()
T str(T... args)
StatusCode initialize() override final
Initialise.
bool isSuccess() const
Definition: StatusCode.h:267
thread_local bool ThreadInitDone
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:28
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
Header file for std:chrono::duration-based Counters.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192