The Gaudi Framework  v33r1 (b1225454)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "ThreadPoolSvc.h"
12 
14 #include "ThreadInitTask.h"
15 
16 #include "tbb/task.h"
17 #include "tbb/task_scheduler_observer.h"
18 #include "tbb/tbb_thread.h"
19 #include "tbb/tick_count.h"
20 
21 #include <chrono>
22 #include <thread>
23 
24 using namespace tbb;
25 
26 namespace Gaudi {
27  namespace Concurrency {
28  extern thread_local bool ThreadInitDone;
29  }
30 } // namespace Gaudi
31 
33 
34 //=============================================================================
35 
36 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
37  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
38 }
39 
40 //-----------------------------------------------------------------------------
41 
43 
44  // Initialise mother class (read properties, ...)
46  if ( !sc.isSuccess() ) {
47  warning() << "Base class could not be initialized" << endmsg;
48  return StatusCode::FAILURE;
49  }
50 
51  if ( m_threadInitTools.retrieve().isFailure() ) {
52  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
53 
54  return StatusCode::FAILURE;
55  }
56  if ( m_threadInitTools.size() != 0 ) {
57  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
58  } else {
59  info() << "no thread init tools attached" << endmsg;
60  }
61 
62  return StatusCode::SUCCESS;
63 }
64 
65 //-----------------------------------------------------------------------------
66 
68 
69  if ( !m_init ) {
70  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
71  << "was never initialized" << endmsg;
72  }
73 
74  return StatusCode::SUCCESS;
75 }
76 
77 //-----------------------------------------------------------------------------
78 
79 StatusCode ThreadPoolSvc::initPool( const int& poolSize ) {
80 
81  tbb::spin_mutex::scoped_lock lock( m_initMutex );
82 
83  m_threadPoolSize = poolSize;
84 
85  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
86  // There is a problem in the piece of the code below. if
87  // m_threadPoolSize is set to something negative which is < -1,
88  // algorithm below might not behave as expected. For the time being
89  // I've choosen to create the barrier with the default number of
90  // threads created by the task scheduler init assuming that a
91  // negative value will choose automatic thread creation which will
92  // create default number of threads.
93  // SK
94 
95  // -100 prevents the creation of the pool and the scheduler directly
96  // executes the tasks.
97  // -1 means use all available cores
98 
99  if ( -100 != m_threadPoolSize ) {
100  if ( msgLevel( MSG::DEBUG ) ) debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
101 
102  // Leave -1 in case selected, increment otherwise
103  // - What?
104  int thePoolSize = m_threadPoolSize;
105  if ( thePoolSize >= 0 ) thePoolSize += 1;
106 
107  if ( m_threadPoolSize == -1 ) {
108  // if requested pool size == -1, use number of available cores
109  m_threadPoolSize = std::thread::hardware_concurrency();
110  thePoolSize = m_threadPoolSize;
111  } else if ( m_threadPoolSize < -1 ) {
112  fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
113  << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
114  return StatusCode::FAILURE;
115  }
116 
117 #if TBB_INTERFACE_VERSION_MAJOR < 12
118  m_tbbSchedInit = std::make_unique<tbb::task_scheduler_init>( thePoolSize );
119 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
120 
121  m_tbbgc = std::make_unique<tbb::global_control>( global_control::max_allowed_parallelism, thePoolSize );
122 
124 
125  // Create the barrier for task synchronization at termination
126  m_barrier = std::make_unique<boost::barrier>( thePoolSize );
127 
128  } else {
129  // don't use a thread pool
131  m_tbbgc = std::make_unique<tbb::global_control>( global_control::max_allowed_parallelism, 0 );
132  }
133 
134  if ( msgLevel( MSG::DEBUG ) )
135  debug() << "Thread Pool initialization complete. Max task concurrency: "
136  << tbb::global_control::active_value( global_control::max_allowed_parallelism ) << endmsg;
137 
138  m_init = true;
139 
140  return StatusCode::SUCCESS;
141 }
142 
143 //-----------------------------------------------------------------------------
144 
146  tbb::spin_mutex::scoped_lock lock( m_initMutex );
147  if ( msgLevel( MSG::DEBUG ) ) debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
148 
149  if ( !m_init ) {
150  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
151  return StatusCode::FAILURE;
152  }
153 
154  // Launch the termination tasks
155  const bool terminate = true;
156  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
157 
158  if ( msgLevel( MSG::DEBUG ) ) debug() << "Thread pool termination complete!" << endmsg;
159 
160  return StatusCode::SUCCESS;
161 }
162 
163 //-----------------------------------------------------------------------------
164 
166 
167  const std::string taskType = terminate ? "termination" : "initialization";
168 
169  // If we have a thread pool (via a scheduler), then we want to queue
170  // the tasks in TBB to execute on each thread.
171  if ( tbb::global_control::active_value( global_control::max_allowed_parallelism ) > 0 ) {
172 
173  // Create one task for each worker thread in the pool
174  for ( int i = 0; i < m_threadPoolSize; ++i ) {
175  if ( msgLevel( MSG::DEBUG ) ) debug() << "creating ThreadInitTask " << i << endmsg;
176  tbb::task* t = new ( tbb::task::allocate_root() )
177  ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate );
178 
179  // Queue the task
180  tbb::task::enqueue( *t );
182  }
183 
184  // Now wait for all the workers to reach the barrier
185  if ( msgLevel( MSG::DEBUG ) ) debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
186  m_barrier->wait();
187 
188  // Check to make sure all Tools were invoked.
189  // I'm not sure this mechanism is worthwhile.
190  for ( auto& t : m_threadInitTools ) {
191  // Number of threads initialized but not terminated.
192  int numInit = t->nInit();
193  // Expected number based on the type of task.
194  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
195  if ( numInit != expectedNumInit ) {
196  std::ostringstream ost;
197  ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
198  << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
199  if ( terminate ) {
200  // it is likely the case that tbb activated new theads
201  // late in the game, and extra initializations were done
202  info() << ost.str() << endmsg;
203  } else {
204  error() << ost.str() << endmsg;
205  return StatusCode::FAILURE;
206  }
207  }
208  }
209 
210  }
211 
212  // In single-threaded mode, there is no scheduler, so we simply call
213  // the task wrapper directly in this thread.
214  else {
215  if ( msgLevel( MSG::DEBUG ) ) debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
216  boost::barrier* noBarrier = nullptr;
217  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), terminate );
218  theTask.execute();
219  }
220 
221  // Now, we do some error checking
222  if ( ThreadInitTask::execFailed() ) {
223  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
224  return StatusCode::FAILURE;
225  }
226 
227  return StatusCode::SUCCESS;
228 }
229 
230 //-----------------------------------------------------------------------------
231 
232 // tbb will actually create more threads than requested, and will sometimes
233 // activate them late. This method is used to initialize one of these threads
234 // when it is detected
235 
236 // note TBB generates address sanitizer errors here, e.g.
237 //
238 // ==51081==ERROR: AddressSanitizer: stack-buffer-overflow on address 0x7fe7decf5195 at pc 0x7fe7e5da48bf bp
239 // 0x7fe7decf4f70 sp 0x7fe7decf4f68 WRITE of size 1 at 0x7fe7decf5195 thread T4
240 // #0 0x7fe7e5da48be in tbb::task::task()
241 // /cvmfs/lhcb.cern.ch/lib/lcg/releases/tbb/2019_U1-5939b/x86_64-centos7-gcc8-dbg/include/tbb/task.h:586
242 //
243 // Use GAUDI_NO_SANITIZE_ADDRESS to suppress these.
244 // To be looked at again when TBB is updated.
245 
247 
249  // this should never happen
250  error() << "initThisThread triggered, but thread already initialized" << endmsg;
251  throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
252  }
253 
254  boost::barrier* noBarrier = nullptr;
255  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), false );
256  theTask.execute();
257 }
StatusCode finalize() override final
Finalise.
static GAUDI_API void setNumThreads(const std::size_t &nT)
StatusCode initialize() override
Definition: Service.cpp:70
tbb::task * execute() override
Execute the task.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:35
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
STL namespace.
T sleep_for(T... args)
virtual void initThisThread() override
T hardware_concurrency(T... args)
STL class.
#define GAUDI_NO_SANITIZE_ADDRESS
Definition: Kernel.h:141
#define DECLARE_COMPONENT(type)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
static bool execFailed()
T str(T... args)
StatusCode initialize() override final
Initialise.
bool isSuccess() const
Definition: StatusCode.h:365
thread_local bool ThreadInitDone
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:41
constexpr static const auto FAILURE
Definition: StatusCode.h:101
StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
Header file for std:chrono::duration-based Counters.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202