The Gaudi Framework  master (37c0b60a)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "ThreadPoolSvc.h"
12 
13 #include "ThreadInitTask.h"
15 
16 #include <chrono>
17 #include <thread>
18 
19 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
21 
22 namespace Gaudi {
23  namespace Concurrency {
24  extern thread_local bool ThreadInitDone;
25  }
26 } // namespace Gaudi
27 
29 
30 //=============================================================================
31 
32 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
33  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
34 }
35 
36 //-----------------------------------------------------------------------------
37 
39 
40  // Initialise mother class (read properties, ...)
41  if ( Service::initialize().isFailure() ) {
42  warning() << "Base class could not be initialized" << endmsg;
43  return StatusCode::FAILURE;
44  }
45 
47  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
48 
49  return StatusCode::FAILURE;
50  }
51 
52  if ( m_threadInitTools.size() != 0 )
53  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
54  else
55  info() << "no thread init tools attached" << endmsg;
56 
57  return StatusCode::SUCCESS;
58 }
59 
60 //-----------------------------------------------------------------------------
61 
63 
64  if ( !m_init )
65  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
66  << "was never initialized" << endmsg;
67 
68  return StatusCode::SUCCESS;
69 }
70 
71 //-----------------------------------------------------------------------------
72 
73 StatusCode ThreadPoolSvc::initPool( const int& poolSize, const int& maxParallelismExtra ) {
74 
75  tbb::spin_mutex::scoped_lock lock( m_initMutex );
76 
78 
79  ON_DEBUG debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
80  // There is a problem in the piece of the code below. if
81  // m_threadPoolSize is set to something negative which is < -1,
82  // algorithm below might not behave as expected. For the time being
83  // I've choosen to create the barrier with the default number of
84  // threads created by the task scheduler init assuming that a
85  // negative value will choose automatic thread creation which will
86  // create default number of threads.
87  // SK
88 
89  // -100 prevents the creation of the pool and the scheduler directly
90  // executes the tasks.
91  // -1 means use all available hardware threads
92 
93  if ( -100 != m_threadPoolSize ) {
94  ON_DEBUG debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
95 
96  if ( m_threadPoolSize == -1 ) {
98  } else if ( m_threadPoolSize < -1 ) {
99  fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
100  << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
101  return StatusCode::FAILURE;
102  }
103 
104  ON_VERBOSE verbose() << "Maximum allowed parallelism before adjusting: "
105  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
106 
107  // to get the number of threads we need, request one thread more to account for how TBB calculates
108  // its soft limit of the number of threads for the global thread pool
109  m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism,
110  m_threadPoolSize + maxParallelismExtra + 1 );
111 
113 
114  // Create the task arena to run all algorithms
115  m_arena.initialize( m_threadPoolSize + 1 );
116 
117  // Create the barrier for task synchronization at termination
118  // (here we increase the number of threads by one to account for calling thread)
119  m_barrier = std::make_unique<boost::barrier>( m_threadPoolSize + 1 );
120 
121  } else {
122  // don't use a thread pool
124  m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, 0 );
125  }
126 
127  ON_DEBUG debug() << "Thread Pool initialization complete. Maximum allowed parallelism: "
128  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
129 
130  m_init = true;
131 
132  return StatusCode::SUCCESS;
133 }
134 
135 //-----------------------------------------------------------------------------
136 
138  tbb::spin_mutex::scoped_lock lock( m_initMutex );
139 
140  ON_DEBUG debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
141 
142  if ( !m_init ) {
143  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
144  return StatusCode::FAILURE;
145  }
146 
147  // Launch the termination tasks
148  const bool terminate = true;
149  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
150 
151  ON_DEBUG debug() << "Thread pool termination complete!" << endmsg;
152 
153  return StatusCode::SUCCESS;
154 }
155 
156 //-----------------------------------------------------------------------------
157 
159 
160  const std::string taskType = terminate ? "termination" : "initialization";
161 
162  // If we have a thread pool (via a scheduler), then we want to queue
163  // the tasks in TBB to execute on each thread.
164  if ( tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) > 0 ) {
165 
166  // Make a warning message if not all threads can be finalised
168  warning() << "Finalising " << m_threadPoolSize << " threads, but " << m_threadInitCount.load()
169  << " threads were initialised" << endmsg;
170  }
171 
172  // Create one task for each worker thread in the pool
173  for ( int i = 0; i < m_threadPoolSize; ++i ) {
174  ON_DEBUG debug() << "creating ThreadInitTask " << i << endmsg;
175 
176  // Queue the task
177  if ( !terminate ) m_threadInitCount++;
180  }
181 
182  // Now wait for all the workers to reach the barrier
183  ON_DEBUG debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
184  m_barrier->wait();
185 
186  // Check to make sure all Tools were invoked.
187  // I'm not sure this mechanism is worthwhile.
188  for ( auto& t : m_threadInitTools ) {
189  // Number of threads initialized but not terminated.
190  int numInit = t->nInit();
191  // Expected number based on the type of task.
192  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
193  if ( numInit != expectedNumInit ) {
194  std::ostringstream ost;
195  ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
196  << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
197  if ( terminate ) {
198  // it is likely the case that TBB activated new threads
199  // late in the game, and extra initializations were done
200  info() << ost.str() << endmsg;
201  } else {
202  error() << ost.str() << endmsg;
203  return StatusCode::FAILURE;
204  }
205  }
206  }
207 
208  }
209 
210  // In single-threaded mode, there is no scheduler, so we simply call
211  // the task wrapper directly in this thread.
212  else {
213  ON_DEBUG debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
214 
215  boost::barrier* noBarrier = nullptr;
217  }
218 
219  // Now, we do some error checking
220  if ( ThreadInitTask::execFailed() ) {
221  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
222  return StatusCode::FAILURE;
223  }
224 
225  return StatusCode::SUCCESS;
226 }
227 
228 //-----------------------------------------------------------------------------
229 
230 // tbb will actually create more threads than requested, and will sometimes
231 // activate them late. This method is used to initialize one of these threads
232 // when it is detected
233 
235 
237  // this should never happen
238  error() << "initThisThread triggered, but thread already initialized" << endmsg;
239  throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
240  }
241 
243  boost::barrier* noBarrier = nullptr;
244  ThreadInitTask( m_threadInitTools, noBarrier, serviceLocator(), false )();
245 }
ThreadPoolSvc::m_initMutex
tbb::spin_mutex m_initMutex
Mutex used to protect the initPool and terminatePool methods.
Definition: ThreadPoolSvc.h:77
std::this_thread::sleep_for
T sleep_for(T... args)
ThreadPoolSvc::poolSize
int poolSize() const override final
Definition: ThreadPoolSvc.h:55
ThreadPoolSvc::m_threadInitCount
std::atomic< int > m_threadInitCount
Counter for all threads that are initialised.
Definition: ThreadPoolSvc.h:89
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
GaudiHandleArray::size
size_type size() const
Definition: GaudiHandle.h:481
ON_VERBOSE
#define ON_VERBOSE
Definition: ThreadPoolSvc.cpp:20
GaudiHandleArray::retrieve
StatusCode retrieve()
Retrieve all tools.
Definition: GaudiHandle.h:512
ThreadPoolSvc::launchTasks
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
Definition: ThreadPoolSvc.cpp:158
Gaudi::Concurrency::ConcurrencyFlags::setNumThreads
static GAUDI_API void setNumThreads(const std::size_t &nT)
Definition: ConcurrencyFlags.h:68
ISvcLocator
Definition: ISvcLocator.h:46
GaudiException
Definition: GaudiException.h:31
std::chrono::milliseconds
ThreadPoolSvc::initialize
StatusCode initialize() override final
Initialise.
Definition: ThreadPoolSvc.cpp:38
Gaudi::Concurrency::ThreadInitDone
thread_local bool ThreadInitDone
Definition: ThreadInitTask.cpp:22
std::unique_ptr::get
T get(T... args)
GaudiHandleArray::empty
bool empty() const override
Return whether the list of tools is empty.
Definition: GaudiHandle.h:485
ThreadPoolSvc::m_tbbgc
std::unique_ptr< tbb::global_control > m_tbbgc
TBB global control parameter.
Definition: ThreadPoolSvc.h:83
ConcurrencyFlags.h
ThreadInitTask.h
ThreadInitTask::execFailed
static bool execFailed()
Definition: ThreadInitTask.h:43
ThreadPoolSvc.h
bug_34121.t
t
Definition: bug_34121.py:31
ThreadPoolSvc::m_threadInitTools
ToolHandleArray< IThreadInitTool > m_threadInitTools
Handle array of thread init tools.
Definition: ThreadPoolSvc.h:68
ThreadPoolSvc::m_init
bool m_init
Was the thread pool initialized?
Definition: ThreadPoolSvc.h:71
ThreadPoolSvc::initPool
StatusCode initPool(const int &poolSize, const int &maxParallelismExtra) override final
Initialize the thread pool and launch the ThreadInitTasks.
Definition: ThreadPoolSvc.cpp:73
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::atomic::load
T load(T... args)
ThreadPoolSvc::m_threadPoolSize
int m_threadPoolSize
Size of the thread pool allocated.
Definition: ThreadPoolSvc.h:74
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
ThreadPoolSvc::finalize
StatusCode finalize() override final
Finalise.
Definition: ThreadPoolSvc.cpp:62
ON_DEBUG
#define ON_DEBUG
Definition: ThreadPoolSvc.cpp:19
ThreadPoolSvc::m_barrier
std::unique_ptr< boost::barrier > m_barrier
Barrier used to synchronization thread init tasks.
Definition: ThreadPoolSvc.h:80
genconfuser.verbose
verbose
Definition: genconfuser.py:28
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
Gaudi
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition: __init__.py:1
std::ostringstream
STL class.
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
ThreadInitTask
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
Definition: ThreadInitTask.h:30
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
ConditionsStallTest.name
name
Definition: ConditionsStallTest.py:77
ThreadPoolSvc::m_arena
tbb::task_arena m_arena
TBB task arena to run all algorithms.
Definition: ThreadPoolSvc.h:86
std
STL namespace.
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
ThreadPoolSvc::terminatePool
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
Definition: ThreadPoolSvc.cpp:137
std::ostringstream::str
T str(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
ThreadPoolSvc::initThisThread
virtual void initThisThread() override
Definition: ThreadPoolSvc.cpp:234
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
Service::terminate
StatusCode terminate() override
Definition: Service.h:61