The Gaudi Framework  v30r0 (c919700c)
AlgResourcePool.cpp
Go to the documentation of this file.
1 // Include Files
2 
3 // Framework
4 #include "AlgResourcePool.h"
9 
10 // C++
11 #include <functional>
12 #include <queue>
13 
14 // DP TODO: Manage smartifs and not pointers to algos
15 
16 // Instantiation of a static factory class used by clients to create instances of this service
18 
19 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20 #define DEBUG_MSG ON_DEBUG debug()
21 
22 //---------------------------------------------------------------------------
23 
24 // destructor
26 {
27 
28  for ( auto& algoId_algoQueue : m_algqueue_map ) {
29  auto* queue = algoId_algoQueue.second;
30  delete queue;
31  }
32 }
33 
34 //---------------------------------------------------------------------------
35 
36 // initialize the pool with the list of algos known to the IAlgManager
38 {
39 
41  if ( !sc.isSuccess() ) warning() << "Base class could not be started" << endmsg;
42 
43  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
44  if ( m_topAlgNames.value().empty() ) {
45  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
46  const Gaudi::Utils::TypeNameString appMgrName( "ApplicationMgr/ApplicationMgr" );
47  SmartIF<IProperty> appMgrProps( serviceLocator()->service( appMgrName ) );
48  m_topAlgNames.assign( appMgrProps->getProperty( "TopAlg" ) );
49  }
50 
51  sc = decodeTopAlgs();
52  if ( sc.isFailure() ) warning() << "Algorithms could not be properly decoded." << endmsg;
53 
54  // let's assume all resources are there
56  return StatusCode::SUCCESS;
57 }
58 
59 //---------------------------------------------------------------------------
60 
62 {
63 
64  StatusCode startSc = Service::start();
65  if ( !startSc.isSuccess() ) return startSc;
66 
67  // sys-Start the algos
68  for ( auto& ialgo : m_algList ) {
69  startSc = ialgo->sysStart();
70  if ( startSc.isFailure() ) {
71  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
72  return startSc;
73  }
74  }
75  return StatusCode::SUCCESS;
76 }
77 
78 //---------------------------------------------------------------------------
79 
81 {
82 
83  std::hash<std::string> hash_function;
84  size_t algo_id = hash_function( name );
85  auto itQueueIAlgPtr = m_algqueue_map.find( algo_id );
86 
87  if ( itQueueIAlgPtr == m_algqueue_map.end() ) {
88  error() << "Algorithm " << name << " requested, but not recognised" << endmsg;
89  algo = nullptr;
90  return StatusCode::FAILURE;
91  }
92 
93  StatusCode sc;
94  if ( blocking ) {
95  itQueueIAlgPtr->second->pop( algo );
96  } else {
97  if ( !itQueueIAlgPtr->second->try_pop( algo ) ) {
99  }
100  }
101 
102  // Note that reentrant algos are not consumed so we put them
103  // back immediately in the queue at the end of this function.
104  // Now we may still be called again in between and get this
105  // error. In such a case, the Scheduler will retry later.
106  // This is of course not optimal, but should only happen very
107  // seldom and thud won't affect the global efficiency
108  if ( sc.isFailure() )
109  DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
110 
111  // if (m_lazyCreation ) {
112  // TODO: fill the lazyCreation part
113  // }
114  if ( sc.isSuccess() ) {
115  state_type requirements = m_resource_requirements[algo_id];
117  if ( requirements.is_subset_of( m_available_resources ) ) {
118  m_available_resources ^= requirements;
119  } else {
120  sc = StatusCode::FAILURE;
121  error() << "Failure to allocate resources of algorithm " << name << endmsg;
122  // in case of not reentrant, push it back. Reentrant ones are pushed back
123  // in all cases further down
124  if ( 0 != algo->cardinality() ) {
125  itQueueIAlgPtr->second->push( algo );
126  }
127  }
129  if ( 0 == algo->cardinality() ) {
130  // push back reentrant algos immediately as it can be reused
131  itQueueIAlgPtr->second->push( algo );
132  }
133  }
134  return sc;
135 }
136 
137 //---------------------------------------------------------------------------
138 
140 {
141 
142  std::hash<std::string> hash_function;
143  size_t algo_id = hash_function( name );
144 
145  // release resources used by the algorithm
149 
150  // release algorithm itself if not reentrant
151  if ( 0 != algo->cardinality() ) {
152  m_algqueue_map[algo_id]->push( algo );
153  }
154  return StatusCode::SUCCESS;
155 }
156 
157 //---------------------------------------------------------------------------
158 
160 {
164  return StatusCode::SUCCESS;
165 }
166 
167 //---------------------------------------------------------------------------
168 
170 {
174  return StatusCode::SUCCESS;
175 }
176 
177 //---------------------------------------------------------------------------
178 
179 StatusCode AlgResourcePool::flattenSequencer( Algorithm* algo, ListAlg& alglist, unsigned int recursionDepth )
180 {
181 
183 
184  bool isGaudiSequencer( false );
185  bool isAthSequencer( false );
186 
187  if ( algo->isSequence() ) {
188  if ( algo->hasProperty( "ShortCircuit" ) )
189  isGaudiSequencer = true;
190  else if ( algo->hasProperty( "StopOverride" ) )
191  isAthSequencer = true;
192  }
193 
194  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
195  if ( // we only want to add basic algorithms -> have no subAlgs
196  // and exclude the case of empty sequencers
197  ( subAlgorithms->empty() && !( isGaudiSequencer || isAthSequencer ) ) ) {
198 
199  alglist.emplace_back( algo );
200  return sc;
201  }
202 
203  // Recursively unroll
204  ++recursionDepth;
205 
206  for ( Algorithm* subalgo : *subAlgorithms ) {
207  sc = flattenSequencer( subalgo, alglist, recursionDepth );
208  if ( sc.isFailure() ) {
209  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
210  return sc;
211  }
212  }
213  return sc;
214 }
215 
216 //---------------------------------------------------------------------------
217 
219 {
220 
222  if ( !algMan.isValid() ) {
223  error() << "Algorithm manager could not be properly fetched." << endmsg;
224  return StatusCode::FAILURE;
225  }
226 
227  // Useful lambda not to repeat ourselves --------------------------
228  auto createAlg = [&algMan, this]( const std::string& item_type, const std::string& item_name, IAlgorithm*& algo ) {
229  StatusCode createAlgSc = algMan->createAlgorithm( item_type, item_name, algo, true, false );
230  if ( createAlgSc.isFailure() )
231  this->warning() << "Algorithm " << item_type << "/" << item_name << " could not be created." << endmsg;
232  };
233  // End of lambda --------------------------------------------------
234 
236 
237  // Fill the top alg list ----
238  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
239  for ( auto& name : topAlgNames ) {
240  IAlgorithm* algo( nullptr );
241 
243  const std::string& item_name = item.name();
244  const std::string& item_type = item.type();
245  SmartIF<IAlgorithm> algoSmartIF( algMan->algorithm( item_name, false ) );
246 
247  if ( !algoSmartIF.isValid() ) {
248  createAlg( item_type, item_name, algo );
249  algoSmartIF = algo;
250  }
251  // Init and start
252  algoSmartIF->sysInitialize().ignore();
253  m_topAlgList.push_back( algoSmartIF );
254  }
255  // Top Alg list filled ----
256 
257  // Now we unroll it ----
258  for ( auto& algoSmartIF : m_topAlgList ) {
259  Algorithm* algorithm = dynamic_cast<Algorithm*>( algoSmartIF.get() );
260  if ( !algorithm ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
261  sc = flattenSequencer( algorithm, m_flatUniqueAlgList );
262  }
263  // stupid O(N^2) unique-ification..
264  for ( auto i = begin( m_flatUniqueAlgList ); i != end( m_flatUniqueAlgList ); ++i ) {
265  auto n = next( i );
266  while ( n != end( m_flatUniqueAlgList ) ) {
267  if ( *n == *i )
269  else
270  ++n;
271  }
272  }
273  if ( msgLevel( MSG::DEBUG ) ) {
274  debug() << "List of algorithms is: " << endmsg;
275  for ( auto& algo : m_flatUniqueAlgList )
276  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
277  }
278 
279  // Unrolled ---
280 
281  // Now let's manage the clones
282  unsigned int resource_counter( 0 );
283  std::hash<std::string> hash_function;
284  for ( auto& ialgoSmartIF : m_flatUniqueAlgList ) {
285 
286  const std::string& item_name = ialgoSmartIF->name();
287 
288  verbose() << "Treating resource management and clones of " << item_name << endmsg;
289 
290  Algorithm* algo = dynamic_cast<Algorithm*>( ialgoSmartIF.get() );
291  if ( !algo ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
292  const std::string& item_type = algo->type();
293 
294  size_t algo_id = hash_function( item_name );
296  m_algqueue_map[algo_id] = queue;
297 
298  // DP TODO Do it properly with SmartIFs, also in the queues
299  IAlgorithm* ialgo( ialgoSmartIF.get() );
300 
301  queue->push( ialgo );
302  m_algList.push_back( ialgo );
303  if ( ialgo->isClonable() ) {
304  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
305  } else {
306  if ( ialgo->cardinality() == 1 ) {
307  m_n_of_allowed_instances[algo_id] = 1;
308  } else {
309  if ( !m_overrideUnClonable ) {
310  info() << "Algorithm " << ialgo->name() << " is un-Clonable but Cardinality was set to "
311  << ialgo->cardinality() << ". Only creating 1 instance" << endmsg;
312  m_n_of_allowed_instances[algo_id] = 1;
313  } else {
314  warning() << "Overriding UnClonability of Algorithm " << ialgo->name() << ". Setting Cardinality to "
315  << ialgo->cardinality() << endmsg;
316  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
317  }
318  }
319  }
320  m_n_of_created_instances[algo_id] = 1;
321 
322  state_type requirements( 0 );
323 
324  for ( auto& resource_name : ialgo->neededResources() ) {
325  auto ret = m_resource_indices.emplace( resource_name, resource_counter );
326  // insert successful means == wasn't known before. So increment counter
327  if ( ret.second ) ++resource_counter;
328  // Resize for every algo according to the found resources
329  requirements.resize( resource_counter );
330  // in any case the return value holds the proper product index
331  requirements[ret.first->second] = true;
332  }
333 
334  m_resource_requirements[algo_id] = requirements;
335 
336  // potentially create clones; if not lazy creation we have to do it now
337  if ( !m_lazyCreation ) {
338  for ( unsigned int i = 1, end = m_n_of_allowed_instances[algo_id]; i < end; ++i ) {
339  debug() << "type/name to create clone of: " << item_type << "/" << item_name << endmsg;
340  IAlgorithm* ialgoClone( nullptr );
341  createAlg( item_type, item_name, ialgoClone );
342  ialgoClone->setIndex( i );
343  if ( ialgoClone->sysInitialize().isFailure() ) {
344  error() << "unable to initialize Algorithm clone " << ialgoClone->name() << endmsg;
345  sc = StatusCode::FAILURE;
346  // FIXME: should we delete this failed clone?
347  } else {
348  queue->push( ialgoClone );
349  m_n_of_created_instances[algo_id] += 1;
350  }
351  }
352  }
353  }
354 
355  // Now resize all the requirement bitsets to the same size
356  for ( auto& kv : m_resource_requirements ) {
357  kv.second.resize( resource_counter );
358  }
359 
360  // Set all resources to be available
361  m_available_resources.resize( resource_counter );
362  m_available_resources.set();
363 
364  return sc;
365 }
366 
367 //---------------------------------------------------------------------------
368 
370 {
372  for ( auto algoSmartIF : m_flatUniqueAlgList )
373  m_flatUniqueAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
374  return m_flatUniqueAlgPtrList;
375 }
376 
377 //---------------------------------------------------------------------------
378 
380 {
382  for ( auto algoSmartIF : m_topAlgList ) m_topAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
383  return m_topAlgPtrList;
384 }
385 
386 //---------------------------------------------------------------------------
387 
389 {
390  auto algBeginRun = [&]( SmartIF<IAlgorithm>& algoSmartIF ) -> StatusCode {
391  StatusCode sc = algoSmartIF->sysBeginRun();
392  if ( !sc.isSuccess() ) {
393  warning() << "beginRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
394  return StatusCode::FAILURE;
395  }
396  return StatusCode::SUCCESS;
397  };
398  // Call the beginRun() method of all algorithms
399  for ( auto& algoSmartIF : m_flatUniqueAlgList ) {
400  if ( algBeginRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
401  }
402 
403  return StatusCode::SUCCESS;
404 }
405 
406 //---------------------------------------------------------------------------
407 
409 {
410 
411  auto algEndRun = [&]( SmartIF<IAlgorithm>& algoSmartIF ) -> StatusCode {
412  StatusCode sc = algoSmartIF->sysEndRun();
413  if ( !sc.isSuccess() ) {
414  warning() << "endRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
415  return StatusCode::FAILURE;
416  }
417  return StatusCode::SUCCESS;
418  };
419  // Call the beginRun() method of all top algorithms
420  for ( auto& algoSmartIF : m_flatUniqueAlgList ) {
421  if ( algEndRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
422  }
423  for ( auto& algoSmartIF : m_topAlgList ) {
424  if ( algEndRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
425  }
426  return StatusCode::SUCCESS;
427 }
428 
429 //---------------------------------------------------------------------------
430 
432 {
433 
434  StatusCode stopSc = Service::stop();
435  if ( !stopSc.isSuccess() ) return stopSc;
436 
437  // sys-Stop the algos
438  for ( auto& ialgo : m_algList ) {
439  stopSc = ialgo->sysStop();
440  if ( stopSc.isFailure() ) {
441  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
442  return stopSc;
443  }
444  }
445  return StatusCode::SUCCESS;
446 }
ListAlg m_flatUniqueAlgList
The flat list of algorithms w/o clones.
virtual SmartIF< IAlgorithm > & algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
StatusCode initialize() override
Definition: Service.cpp:64
T unlock(T...args)
T empty(T...args)
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode start() override
Definition: Service.cpp:137
std::map< std::string, unsigned int > m_resource_indices
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
bool hasProperty(const std::string &name) const override
Return true if we have a property with the given name.
StatusCode flattenSequencer(Algorithm *sequencer, ListAlg &alglist, unsigned int recursionDepth=0)
Recursively flatten an algList.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::mutex m_resource_mutex
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
StatusCode beginRun() override
StatusCode stop() override
T end(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
StatusCode endRun() override
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:61
state_type m_available_resources
Gaudi::Property< bool > m_overrideUnClonable
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
auto begin(reverse_wrapper< T > &w)
Definition: reverse.h:58
STL class.
boost::dynamic_bitset state_type
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Helper class to parse a string of format "type/name".
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
std::list< IAlgorithm * > m_flatUniqueAlgPtrList
The flat list of algorithms w/o clones which is returned.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::map< size_t, state_type > m_resource_requirements
T erase(T...args)
StatusCode start() override
T lock(T...args)
~AlgResourcePool() override
std::list< IAlgorithm * > m_topAlgPtrList
The top list of algorithms.
virtual unsigned int cardinality() const =0
Cardinality (Maximum number of clones that can exist) special value 0 means that algorithm is reentra...
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:64
virtual void setIndex(const unsigned int &idx)=0
Set instantiation index of Alg.
std::list< IAlgorithm * > getFlatAlgList() override
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
StatusCode releaseResource(const std::string &name) override
Release a certrain resource.
Gaudi::Property< std::vector< std::string > > m_topAlgNames
std::list< IAlgorithm * > getTopAlgList() override
StatusCode stop() override
Definition: Service.cpp:130
T clear(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
StatusCode acquireResource(const std::string &name) override
Acquire a certain resource.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
const std::vector< Algorithm * > * subAlgorithms() const
List of sub-algorithms. Returns a pointer to a vector of (sub) Algorithms.
Definition: Algorithm.cpp:782
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T find(T...args)
StatusCode decodeTopAlgs()
Decode the top alg list.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
bool isSequence() const override
Are we a Sequence?
Definition: Algorithm.h:220
const std::string & type() const
T emplace(T...args)
StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false) override
Acquire a certain algorithm using its name.
StatusCode service(const std::string &name, const T *&psvc, bool createIf=true) const
Access a service by name, creating it if it doesn&#39;t already exist.
Definition: Service.h:85
std::map< size_t, unsigned int > m_n_of_created_instances
#define DEBUG_MSG
const std::string & name() const
StatusCode initialize() override
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void ignore() const
Definition: StatusCode.h:84
ListAlg m_topAlgList
The list of top algorithms.
const std::string & type() const override
The type of the algorithm object.
Definition: Algorithm.h:168
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
std::map< size_t, size_t > m_n_of_allowed_instances
ListAlg m_algList
The list of all algorithms created withing the Pool which are not top.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo) override
Release a certain algorithm.
Gaudi::Property< bool > m_lazyCreation
T emplace_back(T...args)