The Gaudi Framework  master (82fdf313)
Loading...
Searching...
No Matches
AlgResourcePool.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11#include "AlgResourcePool.h"
12#include <Gaudi/Sequence.h>
14#include <functional>
15#include <queue>
16#include <sstream>
17
18// Instantiation of a static factory class used by clients to create instances of this service
20
21#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
22#define DEBUG_MSG ON_DEBUG debug()
23
24//---------------------------------------------------------------------------
25
26// Destructor
28 for ( auto& algoId_algoQueue : m_algqueue_map ) {
29 auto* queue = algoId_algoQueue.second;
30 delete queue;
31 }
32}
33
34//---------------------------------------------------------------------------
35
36// Initialize the pool with the list of algorithms known to the IAlgManager
38
40 if ( !sc.isSuccess() ) warning() << "Base class could not be started" << endmsg;
41
42 // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
43 if ( m_topAlgNames.value().empty() ) {
44 info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
45 const Gaudi::Utils::TypeNameString appMgrName( "ApplicationMgr/ApplicationMgr" );
46 SmartIF<IProperty> appMgrProps( serviceLocator()->service( appMgrName ) );
47 m_topAlgNames.assign( appMgrProps->getProperty( "TopAlg" ) );
48 }
49
50 sc = decodeTopAlgs();
51 if ( sc.isFailure() ) warning() << "Algorithms could not be properly decoded." << endmsg;
52
53 // let's assume all resources are there
56}
57
58//---------------------------------------------------------------------------
59
61
62 StatusCode startSc = Service::start();
63 if ( !startSc.isSuccess() ) return startSc;
64
65 // sys-Start the algorithms
66 for ( IAlgorithm* ialgo : m_algList ) {
67 startSc = ialgo->sysStart();
68 if ( startSc.isFailure() ) {
69 error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
70 return startSc;
71 }
72 }
74}
75
76//---------------------------------------------------------------------------
77
78StatusCode AlgResourcePool::acquireAlgorithm( std::string_view name, IAlgorithm*& algo, bool blocking ) {
79
80 std::hash<std::string_view> hash_function;
81 size_t algo_id = hash_function( name );
82 auto itQueueIAlgPtr = m_algqueue_map.find( algo_id );
83
84 if ( itQueueIAlgPtr == m_algqueue_map.end() ) {
85 error() << "Algorithm " << name << " requested, but not recognised" << endmsg;
86 algo = nullptr;
88 }
89
90 StatusCode sc;
91 if ( blocking ) {
92 itQueueIAlgPtr->second->pop( algo );
93 } else {
94 if ( !itQueueIAlgPtr->second->try_pop( algo ) ) {
96 auto result = m_algInstanceMisses.find( name );
97 if ( result != m_algInstanceMisses.end() )
98 ++( result->second );
99 else
101 }
103 }
104 }
105
106 // Note that reentrant algorithms are not consumed so we put them
107 // back immediately in the queue at the end of this function.
108 // Now we may still be called again in between and get this
109 // error. In such a case, the Scheduler will retry later.
110 // This is of course not optimal, but should only happen very
111 // seldom and thud won't affect the global efficiency
112 if ( sc.isFailure() )
113 DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
114
115 // if (m_lazyCreation ) {
116 // TODO: fill the lazyCreation part
117 // }
118 if ( sc.isSuccess() ) {
119 state_type requirements = m_resource_requirements[algo_id];
120 m_resource_mutex.lock();
121 if ( requirements.is_subset_of( m_available_resources ) ) {
122 m_available_resources ^= requirements;
123 } else {
125 error() << "Failure to allocate resources of algorithm " << name << endmsg;
126 // in case of not reentrant, push it back. Reentrant ones are pushed back
127 // in all cases further down
128 if ( !algo->isReEntrant() ) { itQueueIAlgPtr->second->push( algo ); }
129 }
130 m_resource_mutex.unlock();
131 if ( algo->isReEntrant() ) {
132 // push back reentrant algorithms immediately as it can be reused
133 itQueueIAlgPtr->second->push( algo );
134 }
135 }
136 return sc;
137}
138
139//---------------------------------------------------------------------------
140
142
143 std::hash<std::string_view> hash_function;
144 size_t algo_id = hash_function( name );
145
146 // release resources used by the algorithm
147 m_resource_mutex.lock();
149 m_resource_mutex.unlock();
150
151 // release algorithm itself if not reentrant
152 if ( !algo->isReEntrant() ) { m_algqueue_map[algo_id]->push( algo ); }
153 return StatusCode::SUCCESS;
154}
155
156//---------------------------------------------------------------------------
157
164
165//---------------------------------------------------------------------------
166
173
174//---------------------------------------------------------------------------
175
176StatusCode AlgResourcePool::flattenSequencer( IAlgorithm* algo, std::list<IAlgorithm*>& alglist,
177 unsigned int recursionDepth ) {
178
180
181 if ( algo->isSequence() ) {
182 auto seq = dynamic_cast<Gaudi::Sequence*>( algo );
183 if ( seq == 0 ) {
184 error() << "Unable to dcast Algorithm " << algo->name() << " to a Sequence, but it has isSequence==true"
185 << endmsg;
186 return StatusCode::FAILURE;
187 }
188
189 auto subAlgorithms = seq->subAlgorithms();
190
191 // Recursively unroll
192 ++recursionDepth;
193
194 for ( auto subalgo : *subAlgorithms ) {
195 sc = flattenSequencer( subalgo, alglist, recursionDepth );
196 if ( sc.isFailure() ) {
197 error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
198 return sc;
199 }
200 }
201 } else {
202 alglist.emplace_back( algo );
203 return sc;
204 }
205 return sc;
206}
207
208//---------------------------------------------------------------------------
209
211
213 if ( !algMan.isValid() ) {
214 error() << "Algorithm manager could not be properly fetched." << endmsg;
215 return StatusCode::FAILURE;
216 }
217
219
220 // Fill the top algorithm list ----
221 for ( const std::string& typeName : m_topAlgNames ) {
222 IAlgorithm* algo = algMan->algorithm( typeName, /*createIf*/ true ).get();
223 sc = algo->sysInitialize();
224 if ( sc.isFailure() ) {
225 error() << "Unable to initialize Algorithm: " << algo->name() << endmsg;
226 return sc;
227 }
228 m_topAlgList.push_back( algo );
229 }
230 // Top algorithm list filled ----
231
232 // Now we unroll it ----
233 for ( IAlgorithm* ialgo : m_topAlgList ) { sc = flattenSequencer( ialgo, m_flatUniqueAlgList ); }
234 // stupid O(N^2) unique-ification..
235 for ( auto i = begin( m_flatUniqueAlgList ); i != end( m_flatUniqueAlgList ); ++i ) {
236 auto n = next( i );
237 while ( n != end( m_flatUniqueAlgList ) ) {
238 if ( *n == *i )
239 n = m_flatUniqueAlgList.erase( n );
240 else
241 ++n;
242 }
243 }
244 ON_DEBUG {
245 debug() << "List of algorithms is: " << endmsg;
246 for ( IAlgorithm* algo : m_flatUniqueAlgList )
247 debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
248 }
249
250 // Unrolled ---
251
252 // Now let's manage the clones
253 unsigned int resource_counter( 0 );
254 std::hash<std::string> hash_function;
255 for ( IAlgorithm* ialgo : m_flatUniqueAlgList ) {
256
257 const std::string& item_name = ialgo->name();
258 const std::string& item_type = ialgo->type();
259 size_t algo_id = hash_function( item_name );
261 m_algqueue_map[algo_id] = queue;
262
263 if ( msgLevel( MSG::VERBOSE ) ) {
264 verbose() << "Treating resource management and clones of " << item_name << endmsg;
265 }
266
267 queue->push( ialgo );
268 m_algList.push_back( ialgo );
269 if ( ialgo->isReEntrant() ) {
270 if ( ialgo->cardinality() != 0 ) {
271 info() << "Algorithm " << ialgo->name() << " is ReEntrant, but Cardinality was set to " << ialgo->cardinality()
272 << ". Only creating 1 instance" << endmsg;
273 }
274 m_n_of_allowed_instances[algo_id] = 1;
275 } else if ( ialgo->isClonable() ) {
276 m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
277 } else {
278 if ( ialgo->cardinality() == 1 ) {
279 m_n_of_allowed_instances[algo_id] = 1;
280 } else {
281 if ( !m_overrideUnClonable ) {
282 info() << "Algorithm " << ialgo->name() << " is un-Clonable but Cardinality was set to "
283 << ialgo->cardinality() << ". Only creating 1 instance" << endmsg;
284 m_n_of_allowed_instances[algo_id] = 1;
285 } else {
286 warning() << "Overriding UnClonability of Algorithm " << ialgo->name() << ". Setting Cardinality to "
287 << ialgo->cardinality() << endmsg;
288 m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
289 }
290 }
291 }
292 m_n_of_created_instances[algo_id] = 1;
293
294 state_type requirements( 0 );
295
296 for ( auto& resource_name : ialgo->neededResources() ) {
297 auto ret = m_resource_indices.emplace( resource_name, resource_counter );
298 // insert successful means == wasn't known before. So increment counter
299 if ( ret.second ) ++resource_counter;
300 // Resize for every algorithm according to the found resources
301 requirements.resize( resource_counter );
302 // in any case the return value holds the proper product index
303 requirements[ret.first->second] = true;
304 }
305
306 m_resource_requirements[algo_id] = requirements;
307
308 // potentially create clones; if not lazy creation we have to do it now
309 if ( !m_lazyCreation ) {
310 for ( unsigned int i = 1, end = m_n_of_allowed_instances[algo_id]; i < end; ++i ) {
311 DEBUG_MSG << "type/name to create clone of: " << item_type << "/" << item_name << endmsg;
312 IAlgorithm* ialgoClone( nullptr );
313
314 StatusCode createAlgSc =
315 algMan->createAlgorithm( item_type, item_name, ialgoClone, /*managed*/ true, /*checkIfExists*/ false );
316
317 ialgoClone->setIndex( i );
318 queue->push( ialgoClone );
319 m_n_of_created_instances[algo_id] += 1;
320 }
321 }
322 }
323
324 // Now resize all the requirement bitsets to the same size
325 for ( auto& kv : m_resource_requirements ) { kv.second.resize( resource_counter ); }
326
327 // Set all resources to be available
328 m_available_resources.resize( resource_counter );
330
331 return sc;
332}
333
334//---------------------------------------------------------------------------
335
336std::list<IAlgorithm*> AlgResourcePool::getFlatAlgList() { return m_flatUniqueAlgList; }
337
338//---------------------------------------------------------------------------
339
340std::list<IAlgorithm*> AlgResourcePool::getTopAlgList() { return m_topAlgList; }
341
342//---------------------------------------------------------------------------
344
345 std::multimap<unsigned int, std::string_view, std::greater<unsigned int>> sortedAlgInstanceMisses;
346
347 for ( auto& p : m_algInstanceMisses ) sortedAlgInstanceMisses.insert( { p.second, p.first } );
348
349 // determine optimal indentation
350 int indnt = std::to_string( sortedAlgInstanceMisses.cbegin()->first ).length();
351
352 std::ostringstream out;
353
354 out << "Hit parade of algorithm instance misses:\n"
355 << std::right << std::setfill( ' ' )
356 << " ===============================================================================\n"
357 << std::setw( indnt + 7 ) << "Misses "
358 << "| Algorithm (# of clones) \n"
359 << " ===============================================================================\n";
360
361 std::hash<std::string_view> hash_function;
362
363 out << std::right << std::setfill( ' ' );
364 for ( const auto& p : sortedAlgInstanceMisses ) {
365 out << std::setw( indnt + 7 ) << std::to_string( p.first ) + " "
366 << " " << p.second << " (" << m_n_of_allowed_instances.at( hash_function( p.second ) ) << ")\n";
367 }
368
369 info() << out.str() << endmsg;
370}
371
372//---------------------------------------------------------------------------
373
375
376 StatusCode stopSc = Service::stop();
377 if ( !stopSc.isSuccess() ) return stopSc;
378
379 // sys-Stop the algorithm
380 for ( IAlgorithm* ialgo : m_algList ) {
381 stopSc = ialgo->sysStop();
382 if ( stopSc.isFailure() ) {
383 error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
384 return stopSc;
385 }
386 }
388
389 return StatusCode::SUCCESS;
390}
391
393 m_topAlgList.clear();
394 m_algList.clear();
395 m_flatUniqueAlgList.clear();
396 return extends::finalize();
397}
#define DEBUG_MSG
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
#define DECLARE_COMPONENT(type)
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
std::list< IAlgorithm * > getFlatAlgList() override
std::map< size_t, size_t > m_n_of_allowed_instances
StatusCode acquireResource(std::string_view name) override
Acquire a certain resource.
Gaudi::Property< bool > m_lazyCreation
boost::dynamic_bitset state_type
std::list< IAlgorithm * > m_algList
The list of all algorithms created within the Pool which are not top.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
StatusCode initialize() override
std::mutex m_resource_mutex
StatusCode releaseAlgorithm(std::string_view name, IAlgorithm *&algo) override
Release a certain algorithm.
Gaudi::Property< std::vector< std::string > > m_topAlgNames
Gaudi::Property< bool > m_countAlgInstMisses
~AlgResourcePool() override
std::list< IAlgorithm * > m_flatUniqueAlgList
The flat list of algorithms w/o clones.
void dumpInstanceMisses() const
Dump recorded Algorithm instance misses.
std::list< IAlgorithm * > getTopAlgList() override
StatusCode finalize() override
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
StatusCode releaseResource(std::string_view name) override
Release a certain resource.
std::list< IAlgorithm * > m_topAlgList
The list of top algorithms.
StatusCode stop() override
std::map< size_t, unsigned int > m_n_of_created_instances
state_type m_available_resources
StatusCode acquireAlgorithm(std::string_view name, IAlgorithm *&algo, bool blocking=false) override
Acquire a certain algorithm using its name.
std::unordered_map< std::string_view, unsigned int > m_algInstanceMisses
Counters for Algorithm instance misses.
std::map< size_t, state_type > m_resource_requirements
StatusCode start() override
StatusCode decodeTopAlgs()
Decode the top Algorithm list.
Gaudi::Property< bool > m_overrideUnClonable
StatusCode flattenSequencer(IAlgorithm *sequencer, std::list< IAlgorithm * > &alglist, unsigned int recursionDepth=0)
Recursively flatten an algList.
std::map< std::string_view, unsigned int > m_resource_indices
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
Helper class to parse a string of format "type/name".
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
virtual bool isSequence() const =0
Are we a Sequence?
virtual void setIndex(const unsigned int &idx)=0
Set instantiation index of Alg.
virtual bool isReEntrant() const =0
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode stop() override
Definition Service.cpp:181
SmartIF< IFace > service(const std::string &name, bool createIf=true) const
Definition Service.h:79
StatusCode start() override
Definition Service.cpp:187
StatusCode initialize() override
Definition Service.cpp:118
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
TYPE * get() const
Get interface pointer.
Definition SmartIF.h:82
bool isValid() const
Allow for check if smart pointer is valid.
Definition SmartIF.h:69
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
bool isFailure() const
Definition StatusCode.h:129
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
@ VERBOSE
Definition IMessageSvc.h:22