The Gaudi Framework  v29r0 (ff2e7097)
1 // Include Files
3 // Framework
4 #include "AlgResourcePool.h"
10 // C++
11 #include <functional>
12 #include <queue>
14 // DP TODO: Manage smartifs and not pointers to algos
16 // Instantiation of a static factory class used by clients to create instances of this service
19 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20 #define DEBUG_MSG ON_DEBUG debug()
22 //---------------------------------------------------------------------------
24 // destructor
26 {
28  for ( auto& algoId_algoQueue : m_algqueue_map ) {
29  auto* queue = algoId_algoQueue.second;
30  delete queue;
31  }
33  delete m_CFGraph;
34 }
36 //---------------------------------------------------------------------------
38 // initialize the pool with the list of algos known to the IAlgManager
40 {
43  if ( !sc.isSuccess() ) warning() << "Base class could not be started" << endmsg;
45  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
46  if ( m_topAlgNames.value().empty() ) {
47  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
48  const Gaudi::Utils::TypeNameString appMgrName( "ApplicationMgr/ApplicationMgr" );
49  SmartIF<IProperty> appMgrProps( serviceLocator()->service( appMgrName ) );
50  m_topAlgNames.assign( appMgrProps->getProperty( "TopAlg" ) );
51  }
53  // Prepare empty control flow graph
54  // (Only ForwardScheduler requires assembling the graph in AlgResourcePool.
55  // The AvalancheScheduler relies on the graph that is assembled by the PrecedenceSvc)
56  if ( serviceLocator()->existsService( "ForwardSchedulerSvc" ) ) {
57  const std::string& name = "ControlFlowGraph";
60  }
62  sc = decodeTopAlgs();
63  if ( sc.isFailure() ) warning() << "Algorithms could not be properly decoded." << endmsg;
65  // let's assume all resources are there
67  return StatusCode::SUCCESS;
68 }
70 //---------------------------------------------------------------------------
73 {
75  StatusCode startSc = Service::start();
76  if ( !startSc.isSuccess() ) return startSc;
78  // sys-Start the algos
79  for ( auto& ialgo : m_algList ) {
80  startSc = ialgo->sysStart();
81  if ( startSc.isFailure() ) {
82  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
83  return startSc;
84  }
85  }
86  return StatusCode::SUCCESS;
87 }
89 //---------------------------------------------------------------------------
92 {
94  std::hash<std::string> hash_function;
95  size_t algo_id = hash_function( name );
96  auto itQueueIAlgPtr = m_algqueue_map.find( algo_id );
98  if ( itQueueIAlgPtr == m_algqueue_map.end() ) {
99  error() << "Algorithm " << name << " requested, but not recognised" << endmsg;
100  algo = nullptr;
101  return StatusCode::FAILURE;
102  }
104  StatusCode sc;
105  if ( blocking ) {
106  itQueueIAlgPtr->second->pop( algo );
107  sc = StatusCode::SUCCESS;
108  } else {
109  sc = itQueueIAlgPtr->second->try_pop( algo );
110  }
112  // Note that reentrant algos are not consumed so we put them
113  // back immediately in the queue at the end of this function.
114  // Now we may still be called again in between and get this
115  // error. In such a case, the Scheduler will retry later.
116  // This is of course not optimal, but should only happen very
117  // seldom and thud won't affect the global efficiency
118  if ( sc.isFailure() )
119  DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
121  // if (m_lazyCreation ) {
122  // TODO: fill the lazyCreation part
123  // }
124  if ( sc.isSuccess() ) {
125  state_type requirements = m_resource_requirements[algo_id];
127  if ( requirements.is_subset_of( m_available_resources ) ) {
128  m_available_resources ^= requirements;
129  } else {
130  sc = StatusCode::FAILURE;
131  error() << "Failure to allocate resources of algorithm " << name << endmsg;
132  // in case of not reentrant, push it back. Reentrant ones are pushed back
133  // in all cases further down
134  if ( 0 != algo->cardinality() ) {
135  itQueueIAlgPtr->second->push( algo );
136  }
137  }
139  if ( 0 == algo->cardinality() ) {
140  // push back reentrant algos immediately as it can be reused
141  itQueueIAlgPtr->second->push( algo );
142  }
143  }
144  return sc;
145 }
147 //---------------------------------------------------------------------------
150 {
152  std::hash<std::string> hash_function;
153  size_t algo_id = hash_function( name );
155  // release resources used by the algorithm
160  // release algorithm itself if not reentrant
161  if ( 0 != algo->cardinality() ) {
162  m_algqueue_map[algo_id]->push( algo );
163  }
164  return StatusCode::SUCCESS;
165 }
167 //---------------------------------------------------------------------------
170 {
174  return StatusCode::SUCCESS;
175 }
177 //---------------------------------------------------------------------------
180 {
184  return StatusCode::SUCCESS;
185 }
187 //---------------------------------------------------------------------------
190  unsigned int recursionDepth )
191 {
195  bool isGaudiSequencer( false );
196  bool isAthSequencer( false );
198  if ( algo->isSequence() ) {
199  if ( algo->hasProperty( "ShortCircuit" ) )
200  isGaudiSequencer = true;
201  else if ( algo->hasProperty( "StopOverride" ) )
202  isAthSequencer = true;
203  }
205  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
206  if ( // we only want to add basic algorithms -> have no subAlgs
207  // and exclude the case of empty sequencers
208  ( subAlgorithms->empty() && !( isGaudiSequencer || isAthSequencer ) ) ) {
210  alglist.emplace_back( algo );
211  // Only ForwardScheduler requires assembling the graph in AlgResourcePool.
212  // The AvalancheScheduler relies on the graph that is assembled by the PrecedenceSvc
213  if ( serviceLocator()->existsService( "ForwardSchedulerSvc" ) ) {
214  m_CFGraph->addAlgorithmNode( algo, parentName, false, false ).ignore();
215  DEBUG_MSG << std::string( recursionDepth, ' ' ) << algo->name() << " is not a sequencer. Appending it" << endmsg;
216  }
217  return sc;
218  }
220  // Recursively unroll
221  ++recursionDepth;
223  // Only ForwardScheduler requires assembling the graph in AlgResourcePool.
224  // The AvalancheScheduler relies on the graph that is assembled by the PrecedenceSvc
225  if ( serviceLocator()->existsService( "ForwardSchedulerSvc" ) ) {
226  DEBUG_MSG << std::string( recursionDepth, ' ' ) << algo->name() << " is a sequencer. Flattening it." << endmsg;
228  bool modeOR = false;
229  bool allPass = false;
230  bool isLazy = false;
231  bool isSequential = false;
233  if ( isGaudiSequencer ) {
234  modeOR = ( algo->getProperty( "ModeOR" ).toString() == "True" ) ? true : false;
235  allPass = ( algo->getProperty( "IgnoreFilterPassed" ).toString() == "True" ) ? true : false;
236  isLazy = ( algo->getProperty( "ShortCircuit" ).toString() == "True" ) ? true : false;
237  if ( allPass ) isLazy = false; // standard GaudiSequencer behavior on all pass is to execute everything
238  isSequential =
239  ( algo->hasProperty( "Sequential" ) && ( algo->getProperty( "Sequential" ).toString() == "True" ) );
240  } else if ( isAthSequencer ) {
241  modeOR = ( algo->getProperty( "ModeOR" ).toString() == "True" ) ? true : false;
242  allPass = ( algo->getProperty( "IgnoreFilterPassed" ).toString() == "True" ) ? true : false;
243  isLazy = ( algo->getProperty( "StopOverride" ).toString() == "True" ) ? false : true;
244  isSequential =
245  ( algo->hasProperty( "Sequential" ) && ( algo->getProperty( "Sequential" ).toString() == "True" ) );
246  }
247  sc = m_CFGraph->addDecisionHubNode( algo, parentName, !isSequential, isLazy, modeOR, allPass );
248  if ( sc.isFailure() ) {
249  error() << "Failed to add DecisionHub " << algo->name() << " to control flow graph" << endmsg;
250  return sc;
251  }
252  }
254  for ( Algorithm* subalgo : *subAlgorithms ) {
255  sc = flattenSequencer( subalgo, alglist, algo->name(), recursionDepth );
256  if ( sc.isFailure() ) {
257  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
258  return sc;
259  }
260  }
261  return sc;
262 }
264 //---------------------------------------------------------------------------
267 {
270  if ( !algMan.isValid() ) {
271  error() << "Algorithm manager could not be properly fetched." << endmsg;
272  return StatusCode::FAILURE;
273  }
275  // Useful lambda not to repeat ourselves --------------------------
276  auto createAlg = [&algMan, this]( const std::string& item_type, const std::string& item_name, IAlgorithm*& algo ) {
277  StatusCode createAlgSc = algMan->createAlgorithm( item_type, item_name, algo, true, false );
278  if ( createAlgSc.isFailure() )
279  this->warning() << "Algorithm " << item_type << "/" << item_name << " could not be created." << endmsg;
280  };
281  // End of lambda --------------------------------------------------
285  // Fill the top alg list ----
286  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
287  for ( auto& name : topAlgNames ) {
288  IAlgorithm* algo( nullptr );
291  const std::string& item_name =;
292  const std::string& item_type = item.type();
293  SmartIF<IAlgorithm> algoSmartIF( algMan->algorithm( item_name, false ) );
295  if ( !algoSmartIF.isValid() ) {
296  createAlg( item_type, item_name, algo );
297  algoSmartIF = algo;
298  }
299  // Init and start
300  algoSmartIF->sysInitialize().ignore();
301  m_topAlgList.push_back( algoSmartIF );
302  }
303  // Top Alg list filled ----
305  // start forming the control flow graph by adding the head decision hub
306  // Only ForwardScheduler requires assembling the graph in AlgResourcePool.
307  // The AvalancheScheduler relies on the graph that is assembled by the PrecedenceSvc
308  if ( serviceLocator()->existsService( "ForwardSchedulerSvc" ) )
309  m_CFGraph->addHeadNode( "RootDecisionHub", true, false, true, true );
311  // Now we unroll it ----
312  for ( auto& algoSmartIF : m_topAlgList ) {
313  Algorithm* algorithm = dynamic_cast<Algorithm*>( algoSmartIF.get() );
314  if ( !algorithm ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
315  sc = flattenSequencer( algorithm, m_flatUniqueAlgList, "RootDecisionHub" );
316  }
317  // stupid O(N^2) unique-ification..
318  for ( auto i = begin( m_flatUniqueAlgList ); i != end( m_flatUniqueAlgList ); ++i ) {
319  auto n = next( i );
320  while ( n != end( m_flatUniqueAlgList ) ) {
321  if ( *n == *i )
323  else
324  ++n;
325  }
326  }
327  if ( msgLevel( MSG::DEBUG ) ) {
328  debug() << "List of algorithms is: " << endmsg;
329  for ( auto& algo : m_flatUniqueAlgList )
330  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
331  }
333  // Unrolled ---
335  // Now let's manage the clones
336  unsigned int resource_counter( 0 );
337  std::hash<std::string> hash_function;
338  for ( auto& ialgoSmartIF : m_flatUniqueAlgList ) {
340  const std::string& item_name = ialgoSmartIF->name();
342  verbose() << "Treating resource management and clones of " << item_name << endmsg;
344  Algorithm* algo = dynamic_cast<Algorithm*>( ialgoSmartIF.get() );
345  if ( !algo ) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
346  const std::string& item_type = algo->type();
348  size_t algo_id = hash_function( item_name );
350  m_algqueue_map[algo_id] = queue;
352  // DP TODO Do it properly with SmartIFs, also in the queues
353  IAlgorithm* ialgo( ialgoSmartIF.get() );
355  queue->push( ialgo );
356  m_algList.push_back( ialgo );
357  if ( ialgo->isClonable() ) {
358  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
359  } else {
360  if ( ialgo->cardinality() == 1 ) {
361  m_n_of_allowed_instances[algo_id] = 1;
362  } else {
363  if ( !m_overrideUnClonable ) {
364  info() << "Algorithm " << ialgo->name() << " is un-Clonable but Cardinality was set to "
365  << ialgo->cardinality() << ". Only creating 1 instance" << endmsg;
366  m_n_of_allowed_instances[algo_id] = 1;
367  } else {
368  warning() << "Overriding UnClonability of Algorithm " << ialgo->name() << ". Setting Cardinality to "
369  << ialgo->cardinality() << endmsg;
370  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
371  }
372  }
373  }
374  m_n_of_created_instances[algo_id] = 1;
376  state_type requirements( 0 );
378  for ( auto& resource_name : ialgo->neededResources() ) {
379  auto ret = m_resource_indices.emplace( resource_name, resource_counter );
380  // insert successful means == wasn't known before. So increment counter
381  if ( ret.second ) ++resource_counter;
382  // Resize for every algo according to the found resources
383  requirements.resize( resource_counter );
384  // in any case the return value holds the proper product index
385  requirements[ret.first->second] = true;
386  }
388  m_resource_requirements[algo_id] = requirements;
390  // potentially create clones; if not lazy creation we have to do it now
391  if ( !m_lazyCreation ) {
392  for ( unsigned int i = 1, end = m_n_of_allowed_instances[algo_id]; i < end; ++i ) {
393  debug() << "type/name to create clone of: " << item_type << "/" << item_name << endmsg;
394  IAlgorithm* ialgoClone( nullptr );
395  createAlg( item_type, item_name, ialgoClone );
396  ialgoClone->setIndex( i );
397  if ( ialgoClone->sysInitialize().isFailure() ) {
398  error() << "unable to initialize Algorithm clone " << ialgoClone->name() << endmsg;
399  sc = StatusCode::FAILURE;
400  // FIXME: should we delete this failed clone?
401  } else {
402  queue->push( ialgoClone );
403  m_n_of_created_instances[algo_id] += 1;
404  }
405  }
406  }
407  }
409  // Now resize all the requirement bitsets to the same size
410  for ( auto& kv : m_resource_requirements ) {
411  kv.second.resize( resource_counter );
412  }
414  // Set all resources to be available
415  m_available_resources.resize( resource_counter );
416  m_available_resources.set();
418  return sc;
419 }
421 //---------------------------------------------------------------------------
424 {
426  for ( auto algoSmartIF : m_flatUniqueAlgList )
427  m_flatUniqueAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
428  return m_flatUniqueAlgPtrList;
429 }
431 //---------------------------------------------------------------------------
434 {
436  for ( auto algoSmartIF : m_topAlgList ) m_topAlgPtrList.push_back( const_cast<IAlgorithm*>( algoSmartIF.get() ) );
437  return m_topAlgPtrList;
438 }
440 //---------------------------------------------------------------------------
443 {
444  auto algBeginRun = [&]( SmartIF<IAlgorithm>& algoSmartIF ) -> StatusCode {
445  StatusCode sc = algoSmartIF->sysBeginRun();
446  if ( !sc.isSuccess() ) {
447  warning() << "beginRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
448  return StatusCode::FAILURE;
449  }
450  return StatusCode::SUCCESS;
451  };
452  // Call the beginRun() method of all algorithms
453  for ( auto& algoSmartIF : m_flatUniqueAlgList ) {
454  if ( algBeginRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
455  }
457  return StatusCode::SUCCESS;
458 }
460 //---------------------------------------------------------------------------
463 {
465  auto algEndRun = [&]( SmartIF<IAlgorithm>& algoSmartIF ) -> StatusCode {
466  StatusCode sc = algoSmartIF->sysEndRun();
467  if ( !sc.isSuccess() ) {
468  warning() << "endRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
469  return StatusCode::FAILURE;
470  }
471  return StatusCode::SUCCESS;
472  };
473  // Call the beginRun() method of all top algorithms
474  for ( auto& algoSmartIF : m_flatUniqueAlgList ) {
475  if ( algEndRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
476  }
477  for ( auto& algoSmartIF : m_topAlgList ) {
478  if ( algEndRun( algoSmartIF ).isFailure() ) return StatusCode::FAILURE;
479  }
480  return StatusCode::SUCCESS;
481 }
483 //---------------------------------------------------------------------------
486 {
488  StatusCode stopSc = Service::stop();
489  if ( !stopSc.isSuccess() ) return stopSc;
491  // sys-Stop the algos
492  for ( auto& ialgo : m_algList ) {
493  stopSc = ialgo->sysStop();
494  if ( stopSc.isFailure() ) {
495  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
496  return stopSc;
497  }
498  }
499  return StatusCode::SUCCESS;
500 }
