The Gaudi Framework  master (ff829712)
Loading...
Searching...
No Matches
ThreadPoolSvc Class Reference

A service which initializes a TBB thread pool. More...

#include </builds/gaudi/Gaudi/GaudiHive/src/ThreadPoolSvc.h>

Inheritance diagram for ThreadPoolSvc:
Collaboration diagram for ThreadPoolSvc:

Public Member Functions

 ThreadPoolSvc (const std::string &name, ISvcLocator *svc)
 Constructor.
 
StatusCode initialize () override final
 Initialise.
 
StatusCode finalize () override final
 Finalise.
 
StatusCode initPool (const int &poolSize, const int &maxParallelismExtra) override final
 Initialize the thread pool and launch the ThreadInitTasks.
 
StatusCode terminatePool () override final
 Terminate the thread pool and launch thread termination tasks.
 
int poolSize () const override final
 
virtual bool isInit () const
 
virtual void initThisThread () override
 
tbb::task_arena * getArena ()
 
- Public Member Functions inherited from extends< Service, IThreadPoolSvc >
void const * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast.
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface.
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames.
 
- Public Member Functions inherited from Service
const std::string & name () const override
 Retrieve name of the service.
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service.
 
StatusCode sysStart () override
 Initialize Service.
 
StatusCode sysStop () override
 Initialize Service.
 
StatusCode sysFinalize () override
 Finalize Service.
 
StatusCode sysReinitialize () override
 Re-initialize the Service.
 
StatusCode sysRestart () override
 Re-initialize the Service.
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor.
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator.
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T>
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T>
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T>
StatusCode declareTool (ToolHandle< T > &handle, const std::string &toolTypeAndName, bool createIf=true)
 Declare used tool.
 
template<class T>
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T>
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked.
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property.
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals.
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string.
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value
 
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property.
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property.
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation.
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property.
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property.
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals.
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string.
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name.
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream)
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor.
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service.
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream.
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts.
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS)
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL)
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR)
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR)
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING)
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO)
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG)
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE)
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO)
 

Private Member Functions

StatusCode launchTasks (bool finalize=false)
 Launch tasks to execute the ThreadInitTools.
 

Private Attributes

ToolHandleArray< IThreadInitToolm_threadInitTools = { this }
 Handle array of thread init tools.
 
bool m_init = false
 Was the thread pool initialized?
 
int m_threadPoolSize = 0
 Size of the thread pool allocated.
 
tbb::spin_mutex m_initMutex
 Mutex used to protect the initPool and terminatePool methods.
 
std::unique_ptr< boost::barrier > m_barrier
 Barrier used to synchronization thread init tasks.
 
std::unique_ptr< tbb::global_control > m_tbbgc
 TBB global control parameter.
 
tbb::task_arena m_arena
 TBB task arena to run all algorithms.
 
std::atomic< int > m_threadInitCount = 0
 Counter for all threads that are initialised.
 

Additional Inherited Members

- Public Types inherited from extends< Service, IThreadPoolSvc >
using base_class
 Typedef to this class.
 
using extend_interfaces_base
 Typedef to the base of this class.
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory<IService*( const std::string&, ISvcLocator* )>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl
 Typedef used to refer to this class from derived classes, as in.
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids
 take union of the ext_iids of all Interfaces...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 
int outputLevel () const
 get the Service's output level
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches.
 
MSG::Level resetMessaging ()
 Reinitialize internal states.
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream.
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state.
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state.
 
Gaudi::Property< int > m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" }
 flag indicating whether ToolHandle tools have been added to m_tools
 
Gaudi::Property< bool > m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" }
 
Gaudi::Property< bool > m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" }
 
Gaudi::Property< bool > m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" }
 
Gaudi::Property< bool > m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" }
 
Gaudi::Property< bool > m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" }
 
Gaudi::Property< bool > m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" }
 
Gaudi::Property< bool > m_autoRetrieveTools
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service.
 

Detailed Description

A service which initializes a TBB thread pool.

This service can be configured with an array of IThreadInitTools which will each be invoked concurrently on each worker thread. A ThreadInitTask is created for each thread and given the list of tools. A boost::barrier is used to synchronize the calling of each tool concurrently on all threads at the same time.

Definition at line 37 of file ThreadPoolSvc.h.

Constructor & Destructor Documentation

◆ ThreadPoolSvc()

ThreadPoolSvc::ThreadPoolSvc ( const std::string & name,
ISvcLocator * svc )

Constructor.

Definition at line 32 of file ThreadPoolSvc.cpp.

32 : extends( name, svcLoc ) {
33 declareProperty( "ThreadInitTools", m_threadInitTools, "ToolHandleArray of IThreadInitTools" );
34}
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
Gaudi::Details::PropertyBase * declareProperty(const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
Definition Service.h:91
ToolHandleArray< IThreadInitTool > m_threadInitTools
Handle array of thread init tools.

Member Function Documentation

◆ finalize()

StatusCode ThreadPoolSvc::finalize ( )
finaloverride

Finalise.

Definition at line 62 of file ThreadPoolSvc.cpp.

62 {
63
64 if ( !m_init )
65 warning() << "Looks like the ThreadPoolSvc was created, but thread pool "
66 << "was never initialized" << endmsg;
67
69}
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
constexpr static const auto SUCCESS
Definition StatusCode.h:99
bool m_init
Was the thread pool initialized?

◆ getArena()

tbb::task_arena * ThreadPoolSvc::getArena ( )
inline

Definition at line 60 of file ThreadPoolSvc.h.

60{ return &m_arena; }
tbb::task_arena m_arena
TBB task arena to run all algorithms.

◆ initialize()

StatusCode ThreadPoolSvc::initialize ( )
finaloverride

Initialise.

Definition at line 38 of file ThreadPoolSvc.cpp.

38 {
39
40 // Initialise mother class (read properties, ...)
41 if ( Service::initialize().isFailure() ) {
42 warning() << "Base class could not be initialized" << endmsg;
44 }
45
46 if ( m_threadInitTools.retrieve().isFailure() ) {
47 error() << "Unable to retrieve ThreadInitTools Array" << endmsg;
48
50 }
51
52 if ( m_threadInitTools.size() != 0 )
53 info() << "retrieved " << m_threadInitTools.size() << " thread init tools" << endmsg;
54 else
55 info() << "no thread init tools attached" << endmsg;
56
58}
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode initialize() override
Definition Service.cpp:118
constexpr static const auto FAILURE
Definition StatusCode.h:100

◆ initPool()

StatusCode ThreadPoolSvc::initPool ( const int & poolSize,
const int & maxParallelismExtra )
finaloverride

Initialize the thread pool and launch the ThreadInitTasks.

Definition at line 73 of file ThreadPoolSvc.cpp.

73 {
74
75 tbb::spin_mutex::scoped_lock lock( m_initMutex );
76
78
79 ON_DEBUG debug() << "ThreadPoolSvc::initPool() poolSize = " << poolSize << endmsg;
80 // There is a problem in the piece of the code below. if
81 // m_threadPoolSize is set to something negative which is < -1,
82 // algorithm below might not behave as expected. For the time being
83 // I've choosen to create the barrier with the default number of
84 // threads created by the task scheduler init assuming that a
85 // negative value will choose automatic thread creation which will
86 // create default number of threads.
87 // SK
88
89 // -100 prevents the creation of the pool and the scheduler directly
90 // executes the tasks.
91 // -1 means use all available hardware threads
92
93 if ( -100 != m_threadPoolSize ) {
94 ON_DEBUG debug() << "Initialising a thread pool of size " << m_threadPoolSize << endmsg;
95
96 if ( m_threadPoolSize == -1 ) {
97 m_threadPoolSize = std::thread::hardware_concurrency();
98 } else if ( m_threadPoolSize < -1 ) {
99 fatal() << "Unexpected ThreadPoolSize \"" << m_threadPoolSize << "\". Allowed negative values are "
100 << "-1 (use all available cores) and -100 (don't use a thread pool)" << endmsg;
101 return StatusCode::FAILURE;
102 }
103
104 ON_VERBOSE verbose() << "Maximum allowed parallelism before adjusting: "
105 << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
106
107 // to get the number of threads we need, request one thread more to account for how TBB calculates
108 // its soft limit of the number of threads for the global thread pool
109 m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism,
110 m_threadPoolSize + maxParallelismExtra + 1 );
111
113
114 // Create the task arena to run all algorithms
115 m_arena.initialize( m_threadPoolSize + 1 );
116
117 // Create the barrier for task synchronization at termination
118 // (here we increase the number of threads by one to account for calling thread)
119 m_barrier = std::make_unique<boost::barrier>( m_threadPoolSize + 1 );
120
121 } else {
122 // don't use a thread pool
124 m_tbbgc = std::make_unique<tbb::global_control>( tbb::global_control::max_allowed_parallelism, 0 );
125 }
126
127 ON_DEBUG debug() << "Thread Pool initialization complete. Maximum allowed parallelism: "
128 << tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) << endmsg;
129
130 m_init = true;
131
132 return StatusCode::SUCCESS;
133}
#define ON_VERBOSE
#define ON_DEBUG
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
static GAUDI_API void setNumThreads(const std::size_t &nT)
std::unique_ptr< tbb::global_control > m_tbbgc
TBB global control parameter.
std::unique_ptr< boost::barrier > m_barrier
Barrier used to synchronization thread init tasks.
int m_threadPoolSize
Size of the thread pool allocated.
int poolSize() const override final
tbb::spin_mutex m_initMutex
Mutex used to protect the initPool and terminatePool methods.

◆ initThisThread()

void ThreadPoolSvc::initThisThread ( )
overridevirtual

Definition at line 234 of file ThreadPoolSvc.cpp.

234 {
235
237 // this should never happen
238 error() << "initThisThread triggered, but thread already initialized" << endmsg;
239 throw GaudiException( "initThisThread triggered, but thread already initialized", name(), StatusCode::FAILURE );
240 }
241
243 boost::barrier* noBarrier = nullptr;
244 ThreadInitTask( m_threadInitTools, noBarrier, serviceLocator(), false )();
245}
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
std::atomic< int > m_threadInitCount
Counter for all threads that are initialised.
thread_local bool ThreadInitDone

◆ isInit()

virtual bool ThreadPoolSvc::isInit ( ) const
inlinevirtual

Definition at line 56 of file ThreadPoolSvc.h.

56{ return m_init; }

◆ launchTasks()

StatusCode ThreadPoolSvc::launchTasks ( bool finalize = false)
private

Launch tasks to execute the ThreadInitTools.

Definition at line 158 of file ThreadPoolSvc.cpp.

158 {
159
160 const std::string taskType = terminate ? "termination" : "initialization";
161
162 // If we have a thread pool (via a scheduler), then we want to queue
163 // the tasks in TBB to execute on each thread.
164 if ( tbb::global_control::active_value( tbb::global_control::max_allowed_parallelism ) > 0 ) {
165
166 // Make a warning message if not all threads can be finalised
167 if ( terminate and m_threadInitCount.load() != m_threadPoolSize and not m_threadInitTools.empty() ) {
168 warning() << "Finalising " << m_threadPoolSize << " threads, but " << m_threadInitCount.load()
169 << " threads were initialised" << endmsg;
170 }
171
172 // Create one task for each worker thread in the pool
173 for ( int i = 0; i < m_threadPoolSize; ++i ) {
174 ON_DEBUG debug() << "creating ThreadInitTask " << i << endmsg;
175
176 // Queue the task
178 m_arena.enqueue( ThreadInitTask( m_threadInitTools, m_barrier.get(), serviceLocator(), terminate ) );
179 std::this_thread::sleep_for( std::chrono::milliseconds( 20 ) );
180 }
181
182 // Now wait for all the workers to reach the barrier
183 ON_DEBUG debug() << "waiting at barrier for all ThreadInitTool to finish executing" << endmsg;
184 m_barrier->wait();
185
186 // Check to make sure all Tools were invoked.
187 // I'm not sure this mechanism is worthwhile.
188 for ( auto& t : m_threadInitTools ) {
189 // Number of threads initialized but not terminated.
190 int numInit = t->nInit();
191 // Expected number based on the type of task.
192 int expectedNumInit = terminate ? 0 : m_threadPoolSize;
193 if ( numInit != expectedNumInit ) {
194 std::ostringstream ost;
195 ost << "not all threads " << ( terminate ? "terminated" : "initialized" ) << " for tool " << t << " : "
196 << t->nInit() << " out of " << m_threadPoolSize << " are currently active.";
197 if ( terminate ) {
198 // it is likely the case that TBB activated new threads
199 // late in the game, and extra initializations were done
200 info() << ost.str() << endmsg;
201 } else {
202 error() << ost.str() << endmsg;
203 return StatusCode::FAILURE;
204 }
205 }
206 }
207
208 }
209
210 // In single-threaded mode, there is no scheduler, so we simply call
211 // the task wrapper directly in this thread.
212 else {
213 ON_DEBUG debug() << "launching ThreadInitTask " << taskType << "in this thread." << endmsg;
214
215 boost::barrier* noBarrier = nullptr;
216 ThreadInitTask( m_threadInitTools, noBarrier, serviceLocator(), terminate )();
217 }
218
219 // Now, we do some error checking
221 error() << "a ThreadInitTask failed to execute successfully" << endmsg;
222 return StatusCode::FAILURE;
223 }
224
225 return StatusCode::SUCCESS;
226}
StatusCode terminate() override
Definition Service.h:54
static bool execFailed()

◆ poolSize()

int ThreadPoolSvc::poolSize ( ) const
inlinefinaloverride

Definition at line 54 of file ThreadPoolSvc.h.

54{ return m_threadPoolSize; }

◆ terminatePool()

StatusCode ThreadPoolSvc::terminatePool ( )
finaloverride

Terminate the thread pool and launch thread termination tasks.

Definition at line 137 of file ThreadPoolSvc.cpp.

137 {
138 tbb::spin_mutex::scoped_lock lock( m_initMutex );
139
140 ON_DEBUG debug() << "ThreadPoolSvc::terminatePool()" << endmsg;
141
142 if ( !m_init ) {
143 error() << "Trying to terminate uninitialized thread pool!" << endmsg;
144 return StatusCode::FAILURE;
145 }
146
147 // Launch the termination tasks
148 const bool terminate = true;
149 if ( launchTasks( terminate ).isFailure() ) return StatusCode::FAILURE;
150
151 ON_DEBUG debug() << "Thread pool termination complete!" << endmsg;
152
153 return StatusCode::SUCCESS;
154}
StatusCode launchTasks(bool finalize=false)
Launch tasks to execute the ThreadInitTools.

Member Data Documentation

◆ m_arena

tbb::task_arena ThreadPoolSvc::m_arena
private

TBB task arena to run all algorithms.

Definition at line 85 of file ThreadPoolSvc.h.

◆ m_barrier

std::unique_ptr<boost::barrier> ThreadPoolSvc::m_barrier
private

Barrier used to synchronization thread init tasks.

Definition at line 79 of file ThreadPoolSvc.h.

◆ m_init

bool ThreadPoolSvc::m_init = false
private

Was the thread pool initialized?

Definition at line 70 of file ThreadPoolSvc.h.

◆ m_initMutex

tbb::spin_mutex ThreadPoolSvc::m_initMutex
private

Mutex used to protect the initPool and terminatePool methods.

Definition at line 76 of file ThreadPoolSvc.h.

◆ m_tbbgc

std::unique_ptr<tbb::global_control> ThreadPoolSvc::m_tbbgc
private

TBB global control parameter.

Definition at line 82 of file ThreadPoolSvc.h.

◆ m_threadInitCount

std::atomic<int> ThreadPoolSvc::m_threadInitCount = 0
private

Counter for all threads that are initialised.

Definition at line 88 of file ThreadPoolSvc.h.

◆ m_threadInitTools

ToolHandleArray<IThreadInitTool> ThreadPoolSvc::m_threadInitTools = { this }
private

Handle array of thread init tools.

Definition at line 67 of file ThreadPoolSvc.h.

67{ this };

◆ m_threadPoolSize

int ThreadPoolSvc::m_threadPoolSize = 0
private

Size of the thread pool allocated.

Definition at line 73 of file ThreadPoolSvc.h.


The documentation for this class was generated from the following files: