The Gaudi Framework  v33r2 (a6f0ec87)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "ThreadPoolSvc.h"
12 
14 #include "ThreadInitTask.h"
15 
16 #include <chrono>
17 #include <thread>
18 
19 #include "tbb/task.h"
20 
21 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
22 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
23 
24 namespace Gaudi {
25  namespace Concurrency {
26  extern thread_local bool ThreadInitDone;
27  }
28 } // namespace Gaudi
29 
31 
32 //=============================================================================
33 
34 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
35  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
36 }
37 
38 //-----------------------------------------------------------------------------
39 
41 
42  // Initialise mother class (read properties, ...)
43  if ( Service::initialize().isFailure() ) {
44  warning() << "Base class could not be initialized" << endmsg;
45  return StatusCode::FAILURE;
46  }
47 
49  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
50 
51  return StatusCode::FAILURE;
52  }
53 
54  if ( m_threadInitTools.size() != 0 )
55  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
56  else
57  info() << "no thread init tools attached" << endmsg;
58 
59  return StatusCode::SUCCESS;
60 }
61 
62 //-----------------------------------------------------------------------------
63 
65 
66  if ( !m_init )
67  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
68  << "was never initialized" << endmsg;
69 
70  return StatusCode::SUCCESS;
71 }
72 
73 //-----------------------------------------------------------------------------
74 
75 StatusCode ThreadPoolSvc::initPool( const int& poolSize ) {
76 
77  tbb::spin_mutex::scoped_lock lock( m_initMutex );
78 
80 
81  ON_DEBUG debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
82  // There is a problem in the piece of the code below. if
83  // m_threadPoolSize is set to something negative which is < -1,
84  // algorithm below might not behave as expected. For the time being
85  // I've choosen to create the barrier with the default number of
86  // threads created by the task scheduler init assuming that a
87  // negative value will choose automatic thread creation which will
88  // create default number of threads.
89  // SK
90 
91  // -100 prevents the creation of the pool and the scheduler directly
92  // executes the tasks.
93  // -1 means use all available hardware threads
94 
95  if ( -100 != m_threadPoolSize ) {
96  ON_DEBUG debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
97 
98  if ( m_threadPoolSize == -1 ) {
100  } else if ( m_threadPoolSize < -1 ) {
101  fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
102  << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106 #if TBB_INTERFACE_VERSION_MAJOR < 12
107  m_tbbSchedInit = std::make_unique<tbb::task_scheduler_init>( m_threadPoolSize + 1 );
108 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
109 
110  ON_VERBOSE verbose() << "Maximum allowed parallelism before adjusting: "
111  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
112 
113  // to get the number of threads we need, request one thread more to account for how TBB calculates
114  // its soft limit of the number of threads for the global thread pool
115  m_tbbgc =
116  std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, m_threadPoolSize + 1 );
117 
119 
120  // Create the barrier for task synchronization at termination
121  // (here we increase the number of threads by one to account for calling thread)
122  m_barrier = std::make_unique<boost::barrier>( m_threadPoolSize + 1 );
123 
124  } else {
125  // don't use a thread pool
127  m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, 0 );
128  }
129 
130  ON_DEBUG debug() << "Thread Pool initialization complete. Maximum allowed parallelism: "
131  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
132 
133  m_init = true;
134 
135  return StatusCode::SUCCESS;
136 }
137 
138 //-----------------------------------------------------------------------------
139 
141  tbb::spin_mutex::scoped_lock lock( m_initMutex );
142 
143  ON_DEBUG debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
144 
145  if ( !m_init ) {
146  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
147  return StatusCode::FAILURE;
148  }
149 
150  // Launch the termination tasks
151  const bool terminate = true;
152  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
153 
154  ON_DEBUG debug() << "Thread pool termination complete!" << endmsg;
155 
156  return StatusCode::SUCCESS;
157 }
158 
159 //-----------------------------------------------------------------------------
160 
162 
163  const std::string taskType = terminate ? "termination" : "initialization";
164 
165  // If we have a thread pool (via a scheduler), then we want to queue
166  // the tasks in TBB to execute on each thread.
167  if ( tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) > 0 ) {
168 
169  // Create one task for each worker thread in the pool
170  for ( int i = 0; i < m_threadPoolSize; ++i ) {
171 
172  ON_DEBUG debug() << "creating ThreadInitTask " << i << endmsg;
173  tbb::task* t = new ( tbb::task::allocate_root() )
175 
176  // Queue the task
177  tbb::task::enqueue( *t );
179  }
180 
181  // Now wait for all the workers to reach the barrier
182  ON_DEBUG debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
183  m_barrier->wait();
184 
185  // Check to make sure all Tools were invoked.
186  // I'm not sure this mechanism is worthwhile.
187  for ( auto& t : m_threadInitTools ) {
188  // Number of threads initialized but not terminated.
189  int numInit = t->nInit();
190  // Expected number based on the type of task.
191  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
192  if ( numInit != expectedNumInit ) {
193  std::ostringstream ost;
194  ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
195  << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
196  if ( terminate ) {
197  // it is likely the case that TBB activated new threads
198  // late in the game, and extra initializations were done
199  info() << ost.str() << endmsg;
200  } else {
201  error() << ost.str() << endmsg;
202  return StatusCode::FAILURE;
203  }
204  }
205  }
206 
207  }
208 
209  // In single-threaded mode, there is no scheduler, so we simply call
210  // the task wrapper directly in this thread.
211  else {
212  ON_DEBUG debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
213 
214  boost::barrier* noBarrier = nullptr;
215  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), terminate );
216  theTask.execute();
217  }
218 
219  // Now, we do some error checking
220  if ( ThreadInitTask::execFailed() ) {
221  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
222  return StatusCode::FAILURE;
223  }
224 
225  return StatusCode::SUCCESS;
226 }
227 
228 //-----------------------------------------------------------------------------
229 
230 // tbb will actually create more threads than requested, and will sometimes
231 // activate them late. This method is used to initialize one of these threads
232 // when it is detected
233 
234 // note TBB generates address sanitizer errors here, e.g.
235 //
236 // ==51081==ERROR: AddressSanitizer: stack-buffer-overflow on address 0x7fe7decf5195 at pc 0x7fe7e5da48bf bp
237 // 0x7fe7decf4f70 sp 0x7fe7decf4f68 WRITE of size 1 at 0x7fe7decf5195 thread T4
238 // #0 0x7fe7e5da48be in tbb::task::task()
239 // /cvmfs/lhcb.cern.ch/lib/lcg/releases/tbb/2019_U1-5939b/x86_64-centos7-gcc8-dbg/include/tbb/task.h:586
240 //
241 // Use GAUDI_NO_SANITIZE_ADDRESS to suppress these.
242 // To be looked at again when TBB is updated.
243 
245 
247  // this should never happen
248  error() << "initThisThread triggered, but thread already initialized" << endmsg;
249  throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
250  }
251 
252  boost::barrier* noBarrier = nullptr;
253  ThreadInitTask theTask( m_threadInitTools, noBarrier, serviceLocator(), false );
254  theTask.execute();
255 }
StatusCode finalize() override final
Finalise.
std::unique_ptr< tbb::global_control > m_tbbgc
TBB global control parameter.
Definition: ThreadPoolSvc.h:89
static GAUDI_API void setNumThreads(const std::size_t &nT)
StatusCode initialize() override
Definition: Service.cpp:70
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
tbb::task * execute() override
Execute the task.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:35
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
size_type size() const
Definition: GaudiHandle.h:475
STL namespace.
T sleep_for(T... args)
virtual void initThisThread() override
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T hardware_concurrency(T... args)
int m_threadPoolSize
Size of the thread pool allocated.
Definition: ThreadPoolSvc.h:75
STL class.
#define GAUDI_NO_SANITIZE_ADDRESS
Definition: Kernel.h:141
#define DECLARE_COMPONENT(type)
#define ON_VERBOSE
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:284
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
bool m_init
Was the thread pool initialized?
Definition: ThreadPoolSvc.h:72
StatusCode retrieve()
Retrieve all tools.
Definition: GaudiHandle.h:506
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
static bool execFailed()
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T str(T... args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode initialize() override final
Initialise.
std::unique_ptr< boost::barrier > m_barrier
Barrier used to synchronization thread init tasks.
Definition: ThreadPoolSvc.h:86
tbb::spin_mutex m_initMutex
Mutex used to protect the initPool and terminatePool methods.
Definition: ThreadPoolSvc.h:78
T get(T... args)
thread_local bool ThreadInitDone
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:41
StatusCode terminate() override
Definition: Service.h:61
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::unique_ptr< tbb::task_scheduler_init > m_tbbSchedInit
TBB task scheduler initializer.
Definition: ThreadPoolSvc.h:82
StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
ToolHandleArray< IThreadInitTool > m_threadInitTools
Handle array of thread init tools.
Definition: ThreadPoolSvc.h:69
bool isFailure() const
Definition: StatusCode.h:145
#define ON_DEBUG
int poolSize() const override final
Definition: ThreadPoolSvc.h:58
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
Header file for std:chrono::duration-based Counters.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202