The Gaudi Framework  master (b9786168)
Loading...
Searching...
No Matches
ThreadPoolSvc.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11#include "ThreadPoolSvc.h"
12
13#include "ThreadInitTask.h"
15
16#include <chrono>
17#include <thread>
18
19#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20#define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
21
22namespace Gaudi {
23 namespace Concurrency {
24 extern thread_local bool ThreadInitDone;
25 }
26} // namespace Gaudi
27
29
30//=============================================================================
31
32ThreadPoolSvc::ThreadPoolSvc( const std::string& name, ISvcLocator* svcLoc ) : extends( name, svcLoc ) {
33 declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
34}
35
36//-----------------------------------------------------------------------------
37
39
40 // Initialise mother class (read properties, ...)
41 if ( Service::initialize().isFailure() ) {
42 warning() << "Base class could not be initialized" << endmsg;
44 }
45
46 if ( m_threadInitTools.retrieve().isFailure() ) {
47 error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
48
50 }
51
52 if ( m_threadInitTools.size() != 0 )
53 info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
54 else
55 info() << "no thread init tools attached" << endmsg;
56
58}
59
60//-----------------------------------------------------------------------------
61
63
64 if ( !m_init )
65 warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
66 << "was never initialized" << endmsg;
67
69}
70
71//-----------------------------------------------------------------------------
72
73StatusCode ThreadPoolSvc::initPool( const int& poolSize, const int& maxParallelismExtra ) {
74
75 tbb::spin_mutex::scoped_lock lock( m_initMutex );
76
78
79 ON_DEBUG debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
80 // There is a problem in the piece of the code below. if
81 // m_threadPoolSize is set to something negative which is < -1,
82 // algorithm below might not behave as expected. For the time being
83 // I've choosen to create the barrier with the default number of
84 // threads created by the task scheduler init assuming that a
85 // negative value will choose automatic thread creation which will
86 // create default number of threads.
87 // SK
88
89 // -100 prevents the creation of the pool and the scheduler directly
90 // executes the tasks.
91 // -1 means use all available hardware threads
92
93 if ( -100 != m_threadPoolSize ) {
94 ON_DEBUG debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
95
96 if ( m_threadPoolSize == -1 ) {
97 m_threadPoolSize = std::thread::hardware_concurrency();
98 } else if ( m_threadPoolSize < -1 ) {
99 fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
100 << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
101 return StatusCode::FAILURE;
102 }
103
104 ON_VERBOSE verbose() << "Maximum allowed parallelism before adjusting: "
105 << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
106
107 // to get the number of threads we need, request one thread more to account for how TBB calculates
108 // its soft limit of the number of threads for the global thread pool
109 m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism,
110 m_threadPoolSize + maxParallelismExtra + 1 );
111
113
114 // Create the task arena to run all algorithms
115 m_arena.initialize( m_threadPoolSize + 1 );
116
117 // Create the barrier for task synchronization at termination
118 // (here we increase the number of threads by one to account for calling thread)
119 m_barrier = std::make_unique<boost::barrier>( m_threadPoolSize + 1 );
120
121 } else {
122 // don't use a thread pool
124 m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, 0 );
125 }
126
127 ON_DEBUG debug() << "Thread Pool initialization complete. Maximum allowed parallelism: "
128 << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
129
130 m_init = true;
131
132 return StatusCode::SUCCESS;
133}
134
135//-----------------------------------------------------------------------------
136
138 tbb::spin_mutex::scoped_lock lock( m_initMutex );
139
140 ON_DEBUG debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
141
142 if ( !m_init ) {
143 error() << "Trying to terminate uninitialized thread pool!" << endmsg;
144 return StatusCode::FAILURE;
145 }
146
147 // Launch the termination tasks
148 const bool terminate = true;
149 if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
150
151 ON_DEBUG debug() << "Thread pool termination complete!" << endmsg;
152
153 return StatusCode::SUCCESS;
154}
155
156//-----------------------------------------------------------------------------
157
159
160 const std::string taskType = terminate ? "termination" : "initialization";
161
162 // If we have a thread pool (via a scheduler), then we want to queue
163 // the tasks in TBB to execute on each thread.
164 if ( tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) > 0 ) {
165
166 // Make a warning message if not all threads can be finalised
167 if ( terminate and m_threadInitCount.load() != m_threadPoolSize and not m_threadInitTools.empty() ) {
168 warning() << "Finalising " << m_threadPoolSize << " threads, but " << m_threadInitCount.load()
169 << " threads were initialised" << endmsg;
170 }
171
172 // Create one task for each worker thread in the pool
173 for ( int i = 0; i < m_threadPoolSize; ++i ) {
174 ON_DEBUG debug() << "creating ThreadInitTask " << i << endmsg;
175
176 // Queue the task
179 std::this_thread::sleep_for( std::chrono::milliseconds( 20 ) );
180 }
181
182 // Now wait for all the workers to reach the barrier
183 ON_DEBUG debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
184 m_barrier->wait();
185
186 // Check to make sure all Tools were invoked.
187 // I'm not sure this mechanism is worthwhile.
188 for ( auto& t : m_threadInitTools ) {
189 // Number of threads initialized but not terminated.
190 int numInit = t->nInit();
191 // Expected number based on the type of task.
192 int expectedNumInit = terminate ? 0 : m_threadPoolSize;
193 if ( numInit != expectedNumInit ) {
194 std::ostringstream ost;
195 ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
196 << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
197 if ( terminate ) {
198 // it is likely the case that TBB activated new threads
199 // late in the game, and extra initializations were done
200 info() << ost.str() << endmsg;
201 } else {
202 error() << ost.str() << endmsg;
203 return StatusCode::FAILURE;
204 }
205 }
206 }
207
208 }
209
210 // In single-threaded mode, there is no scheduler, so we simply call
211 // the task wrapper directly in this thread.
212 else {
213 ON_DEBUG debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
214
215 boost::barrier* noBarrier = nullptr;
217 }
218
219 // Now, we do some error checking
221 error() << "a ThreadInitTask failed to execute successfully" << endmsg;
222 return StatusCode::FAILURE;
223 }
224
225 return StatusCode::SUCCESS;
226}
227
228//-----------------------------------------------------------------------------
229
230// tbb will actually create more threads than requested, and will sometimes
231// activate them late. This method is used to initialize one of these threads
232// when it is detected
233
235
237 // this should never happen
238 error() << "initThisThread triggered, but thread already initialized" << endmsg;
239 throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
240 }
241
243 boost::barrier* noBarrier = nullptr;
244 ThreadInitTask( m_threadInitTools, noBarrier, serviceLocator(), false )();
245}
#define ON_VERBOSE
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
#define DECLARE_COMPONENT(type)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
static GAUDI_API void setNumThreads(const std::size_t &nT)
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition ISvcLocator.h:42
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode terminate() override
Definition Service.h:54
Gaudi::Details::PropertyBase * declareProperty(const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
Definition Service.h:91
StatusCode initialize() override
Definition Service.cpp:118
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
Special TBB task used by ThreadPoolSvc to wrap execution of IThreadInitTools.
static bool execFailed()
A service which initializes a TBB thread pool.
virtual void initThisThread() override
StatusCode initPool(const int &poolSize, const int &maxParallelismExtra) override final
Initialize the thread pool and launch the ThreadInitTasks.
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.
std::unique_ptr< tbb::global_control > m_tbbgc
TBB global control parameter.
std::atomic< int > m_threadInitCount
Counter for all threads that are initialised.
bool m_init
Was the thread pool initialized?
std::unique_ptr< boost::barrier > m_barrier
Barrier used to synchronization thread init tasks.
int m_threadPoolSize
Size of the thread pool allocated.
ToolHandleArray< IThreadInitTool > m_threadInitTools
Handle array of thread init tools.
StatusCode terminatePool() override final
Terminate the thread pool and launch thread termination tasks.
int poolSize() const override final
tbb::task_arena m_arena
TBB task arena to run all algorithms.
StatusCode finalize() override final
Finalise.
tbb::spin_mutex m_initMutex
Mutex used to protect the initPool and terminatePool methods.
StatusCode initialize() override final
Initialise.
ThreadPoolSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Base class used to extend a class implementing other interfaces.
Definition extends.h:19
thread_local bool ThreadInitDone
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition __init__.py:1
STL namespace.