The Gaudi Framework  v36r9p1 (5c15b2bb)
ThreadPoolSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "ThreadPoolSvc.h"
12 
14 #include "ThreadInitTask.h"
15 
16 #include "tbb/task_group.h"
17 
18 #include <chrono>
19 #include <thread>
20 
21 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
22 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
23 
24 namespace Gaudi {
25  namespace Concurrency {
26  extern thread_local bool ThreadInitDone;
27  }
28 } // namespace Gaudi
29 
31 
32 //=============================================================================
33 
34 ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
35  declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
36 }
37 
38 //-----------------------------------------------------------------------------
39 
41 
42  // Initialise mother class (read properties, ...)
43  if ( Service::initialize().isFailure() ) {
44  warning() << "Base class could not be initialized" << endmsg;
45  return StatusCode::FAILURE;
46  }
47 
49  error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
50 
51  return StatusCode::FAILURE;
52  }
53 
54  if ( m_threadInitTools.size() != 0 )
55  info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
56  else
57  info() << "no thread init tools attached" << endmsg;
58 
59  return StatusCode::SUCCESS;
60 }
61 
62 //-----------------------------------------------------------------------------
63 
65 
66  if ( !m_init )
67  warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
68  << "was never initialized" << endmsg;
69 
70  return StatusCode::SUCCESS;
71 }
72 
73 //-----------------------------------------------------------------------------
74 
75 StatusCode ThreadPoolSvc::initPool( const int& poolSize ) {
76 
77  tbb::spin_mutex::scoped_lock lock( m_initMutex );
78 
80 
81  ON_DEBUG debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
82  // There is a problem in the piece of the code below. if
83  // m_threadPoolSize is set to something negative which is < -1,
84  // algorithm below might not behave as expected. For the time being
85  // I've choosen to create the barrier with the default number of
86  // threads created by the task scheduler init assuming that a
87  // negative value will choose automatic thread creation which will
88  // create default number of threads.
89  // SK
90 
91  // -100 prevents the creation of the pool and the scheduler directly
92  // executes the tasks.
93  // -1 means use all available hardware threads
94 
95  if ( -100 != m_threadPoolSize ) {
96  ON_DEBUG debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
97 
98  if ( m_threadPoolSize == -1 ) {
100  } else if ( m_threadPoolSize < -1 ) {
101  fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
102  << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  ON_VERBOSE verbose() << "Maximum allowed parallelism before adjusting: "
107  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
108 
109  // to get the number of threads we need, request one thread more to account for how TBB calculates
110  // its soft limit of the number of threads for the global thread pool
111  m_tbbgc =
112  std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, m_threadPoolSize + 1 );
113 
115 
116  // Create the task arena to run all algorithms
117  m_arena = tbb::task_arena( m_threadPoolSize + 1 );
118 
119  // Create the barrier for task synchronization at termination
120  // (here we increase the number of threads by one to account for calling thread)
121  m_barrier = std::make_unique<boost::barrier>( m_threadPoolSize + 1 );
122 
123  } else {
124  // don't use a thread pool
126  m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, 0 );
127  }
128 
129  ON_DEBUG debug() << "Thread Pool initialization complete. Maximum allowed parallelism: "
130  << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
131 
132  m_init = true;
133 
134  return StatusCode::SUCCESS;
135 }
136 
137 //-----------------------------------------------------------------------------
138 
140  tbb::spin_mutex::scoped_lock lock( m_initMutex );
141 
142  ON_DEBUG debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
143 
144  if ( !m_init ) {
145  error() << "Trying to terminate uninitialized thread pool!" << endmsg;
146  return StatusCode::FAILURE;
147  }
148 
149  // Launch the termination tasks
150  const bool terminate = true;
151  if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
152 
153  ON_DEBUG debug() << "Thread pool termination complete!" << endmsg;
154 
155  return StatusCode::SUCCESS;
156 }
157 
158 //-----------------------------------------------------------------------------
159 
161 
162  const std::string taskType = terminate ? "termination" : "initialization";
163 
164  // If we have a thread pool (via a scheduler), then we want to queue
165  // the tasks in TBB to execute on each thread.
166  if ( tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) > 0 ) {
167 
168  // Make a warning message if not all threads can be finalised
170  warning() << "Finalising " << m_threadPoolSize << " threads, but " << m_threadInitCount.load()
171  << " threads were initialised" << endmsg;
172  }
173 
174  // Create one task for each worker thread in the pool
175  for ( int i = 0; i < m_threadPoolSize; ++i ) {
176  ON_DEBUG debug() << "creating ThreadInitTask " << i << endmsg;
177 
178  // Queue the task
179  if ( !terminate ) m_threadInitCount++;
182  }
183 
184  // Now wait for all the workers to reach the barrier
185  ON_DEBUG debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
186  m_barrier->wait();
187 
188  // Check to make sure all Tools were invoked.
189  // I'm not sure this mechanism is worthwhile.
190  for ( auto& t : m_threadInitTools ) {
191  // Number of threads initialized but not terminated.
192  int numInit = t->nInit();
193  // Expected number based on the type of task.
194  int expectedNumInit = terminate ? 0 : m_threadPoolSize;
195  if ( numInit != expectedNumInit ) {
196  std::ostringstream ost;
197  ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
198  << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
199  if ( terminate ) {
200  // it is likely the case that TBB activated new threads
201  // late in the game, and extra initializations were done
202  info() << ost.str() << endmsg;
203  } else {
204  error() << ost.str() << endmsg;
205  return StatusCode::FAILURE;
206  }
207  }
208  }
209 
210  }
211 
212  // In single-threaded mode, there is no scheduler, so we simply call
213  // the task wrapper directly in this thread.
214  else {
215  ON_DEBUG debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
216 
217  boost::barrier* noBarrier = nullptr;
219  }
220 
221  // Now, we do some error checking
222  if ( ThreadInitTask::execFailed() ) {
223  error() << "a ThreadInitTask failed to execute successfully" << endmsg;
224  return StatusCode::FAILURE;
225  }
226 
227  return StatusCode::SUCCESS;
228 }
229 
230 //-----------------------------------------------------------------------------
231 
232 // tbb will actually create more threads than requested, and will sometimes
233 // activate them late. This method is used to initialize one of these threads
234 // when it is detected
235 
237 
239  // this should never happen
240  error() << "initThisThread triggered, but thread already initialized" << endmsg;
241  throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
242  }
243 
245  boost::barrier* noBarrier = nullptr;
246  ThreadInitTask( m_threadInitTools, noBarrier, serviceLocator(), false )();
247 }
ThreadPoolSvc::m_initMutex
tbb::spin_mutex m_initMutex
Mutex used to protect the initPool and terminatePool methods.
Definition: ThreadPoolSvc.h:77
std::this_thread::sleep_for
T sleep_for(T... args)
ThreadPoolSvc::poolSize
int poolSize() const override final
Definition: ThreadPoolSvc.h:55
ThreadPoolSvc::m_threadInitCount
std::atomic< int > m_threadInitCount
Counter for all threads that are initialised.
Definition: ThreadPoolSvc.h:89
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
GaudiHandleArray::size
size_type size() const
Definition: GaudiHandle.h:475
ON_VERBOSE
#define ON_VERBOSE
Definition: ThreadPoolSvc.cpp:22
GaudiHandleArray::retrieve
StatusCode retrieve()
Retrieve all tools.
Definition: GaudiHandle.h:506
ThreadPoolSvc::launchTasks
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
Definition: ThreadPoolSvc.cpp:160
Gaudi::Concurrency::ConcurrencyFlags::setNumThreads
static GAUDI_API void setNumThreads(const std::size_t &nT)
Definition: ConcurrencyFlags.h:68
ISvcLocator
Definition: ISvcLocator.h:46
GaudiException
Definition: GaudiException.h:31
std::chrono::milliseconds
ThreadPoolSvc::initialize
StatusCode initialize() override final
Initialise.
Definition: ThreadPoolSvc.cpp:40
Gaudi::Concurrency::ThreadInitDone
thread_local bool ThreadInitDone
Definition: ThreadInitTask.cpp:22
std::unique_ptr::get
T get(T... args)
GaudiHandleArray::empty
bool empty() const override
Return whether the list of tools is empty.
Definition: GaudiHandle.h:479
ThreadPoolSvc::m_tbbgc
std::unique_ptr< tbb::global_control > m_tbbgc
TBB global control parameter.
Definition: ThreadPoolSvc.h:83
ConcurrencyFlags.h
ThreadInitTask.h
ThreadInitTask::execFailed
static bool execFailed()
Definition: ThreadInitTask.h:43
ThreadPoolSvc::initPool
StatusCode initPool(const int &poolSize) override final
Initialize the thread pool and launch the ThreadInitTasks.
Definition: ThreadPoolSvc.cpp:75
ThreadPoolSvc.h
bug_34121.t
t
Definition: bug_34121.py:30
ThreadPoolSvc::m_threadInitTools
ToolHandleArray< IThreadInitTool > m_threadInitTools
Handle array of thread init tools.
Definition: ThreadPoolSvc.h:68
ThreadPoolSvc::m_init
bool m_init
Was the thread pool initialized?
Definition: ThreadPoolSvc.h:71
TimingHistograms.name
name
Definition: TimingHistograms.py:25
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::atomic::load
T load(T... args)
ThreadPoolSvc::m_threadPoolSize
int m_threadPoolSize
Size of the thread pool allocated.
Definition: ThreadPoolSvc.h:74
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
ThreadPoolSvc::finalize
StatusCode finalize() override final
Finalise.
Definition: ThreadPoolSvc.cpp:64
ON_DEBUG
#define ON_DEBUG
Definition: ThreadPoolSvc.cpp:21
ThreadPoolSvc::m_barrier
std::unique_ptr< boost::barrier > m_barrier
Barrier used to synchronization thread init tasks.
Definition: ThreadPoolSvc.h:80
genconfuser.verbose
verbose
Definition: genconfuser.py:30
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
Gaudi
Header file for std:chrono::duration-based Counters.
Definition: __init__.py:1
std::ostringstream
STL class.
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
ThreadInitTask
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
Definition: ThreadInitTask.h:30
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
ThreadPoolSvc::m_arena
tbb::task_arena m_arena
TBB task arena to run all algorithms.
Definition: ThreadPoolSvc.h:86
std
STL namespace.
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
ThreadPoolSvc::terminatePool
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
Definition: ThreadPoolSvc.cpp:139
std::ostringstream::str
T str(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
ThreadPoolSvc::initThisThread
virtual void initThisThread() override
Definition: ThreadPoolSvc.cpp:236
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
Service::terminate
StatusCode terminate() override
Definition: Service.h:61