The Gaudi Framework  v30r5 (c7afbd0d)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
7 #include "GaudiKernel/Algorithm.h" // can be removed ASA dynamic casts to Algorithm are removed
10 #include "GaudiKernel/IAlgorithm.h"
13 
14 // C++
15 #include <algorithm>
16 #include <map>
17 #include <queue>
18 #include <sstream>
19 #include <unordered_set>
20 
21 // External libs
22 #include "boost/algorithm/string.hpp"
23 #include "boost/thread.hpp"
24 #include "boost/tokenizer.hpp"
25 // DP waiting for the TBB service
26 #include "tbb/task_scheduler_init.h"
27 
28 // Instantiation of a static factory class used by clients to create instances of this service
30 
31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
33 
34 namespace
35 {
36  struct DataObjIDSorter {
37  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
38  };
39 
40  // Sort a DataObjIDColl in a well-defined, reproducible manner.
41  // Used for making debugging dumps.
42  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
43  {
45  v.reserve( coll.size() );
46  for ( const DataObjID& id : coll ) v.push_back( &id );
47  std::sort( v.begin(), v.end(), DataObjIDSorter() );
48  return v;
49  }
50 
51  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates )
52  {
53  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
54  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
55  }
56 }
57 
58 //---------------------------------------------------------------------------
59 
66 {
67 
68  // Initialise mother class (read properties, ...)
70  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
71 
72  // Get hold of the TBBSvc. This should initialize the thread pool
73  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
74  if ( !m_threadPoolSvc.isValid() ) {
75  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
76  return StatusCode::FAILURE;
77  }
78 
79  // Activate the scheduler in another thread.
80  info() << "Activating scheduler in a separate thread" << endmsg;
81  m_thread = std::thread( [this]() { this->activate(); } );
82 
83  while ( m_isActive != ACTIVE ) {
84  if ( m_isActive == FAILURE ) {
85  fatal() << "Terminating initialization" << endmsg;
86  return StatusCode::FAILURE;
87  } else {
88  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
89  sleep( 1 );
90  }
91  }
92 
93  if ( m_enableCondSvc ) {
94  // Get hold of the CondSvc
95  m_condSvc = serviceLocator()->service( "CondSvc" );
96  if ( !m_condSvc.isValid() ) {
97  warning() << "No CondSvc found, or not enabled. "
98  << "Will not manage CondAlgorithms" << endmsg;
99  m_enableCondSvc = false;
100  }
101  }
102 
103  // Get the algo resource pool
104  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
105  if ( !m_algResourcePool.isValid() ) {
106  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
107  return StatusCode::FAILURE;
108  }
109 
110  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
111  if ( !m_algExecStateSvc.isValid() ) {
112  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
113  return StatusCode::FAILURE;
114  }
115 
116  // Get Whiteboard
117  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
118  if ( !m_whiteboard.isValid() ) {
119  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
120  return StatusCode::FAILURE;
121  }
122 
123  // Get dedicated scheduler for I/O-bound algorithms
124  if ( m_useIOBoundAlgScheduler ) {
125  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
126  if ( !m_IOBoundAlgScheduler.isValid() )
127  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
128  }
129 
130  // Set the MaxEventsInFlight parameters from the number of WB stores
131  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
132 
133  // Set the number of free slots
134  m_freeSlots = m_maxEventsInFlight;
135 
136  // Get the list of algorithms
137  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
138  const unsigned int algsNumber = algos.size();
139  info() << "Found " << algsNumber << " algorithms" << endmsg;
140 
141  /* Dependencies
142  1) Look for handles in algo, if none
143  2) Assume none are required
144  */
145 
146  DataObjIDColl globalInp, globalOutp;
147 
148  // figure out all outputs
149  for ( IAlgorithm* ialgoPtr : algos ) {
150  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
151  if ( !algoPtr ) {
152  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
153  }
154  for ( auto id : algoPtr->outputDataObjs() ) {
155  auto r = globalOutp.insert( id );
156  if ( !r.second ) {
157  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
158  "though, or control flow may guarantee only one runs...!"
159  << endmsg;
160  }
161  }
162  }
163 
164  std::ostringstream ostdd;
165  ostdd << "Data Dependencies for Algorithms:";
166 
167  std::map<std::string, DataObjIDColl> algosDependenciesMap;
168  for ( IAlgorithm* ialgoPtr : algos ) {
169  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
170  if ( nullptr == algoPtr ) {
171  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
172  << ": this will result in a crash." << endmsg;
173  return StatusCode::FAILURE;
174  }
175 
176  ostdd << "\n " << algoPtr->name();
177 
178  DataObjIDColl algoDependencies;
179  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
180  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
181  DataObjID id = *idp;
182  ostdd << "\n o INPUT " << id;
183  if ( id.key().find( ":" ) != std::string::npos ) {
184  ostdd << " contains alternatives which require resolution...\n";
185  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
186  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
187  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
188  } );
189  if ( itok != tokens.end() ) {
190  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
191  id.updateKey( *itok );
192  } else {
193  error() << "failed to find alternate in global output list"
194  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
195  m_showDataDeps = true;
196  }
197  }
198  algoDependencies.insert( id );
199  globalInp.insert( id );
200  }
201  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
202  ostdd << "\n o OUTPUT " << *id;
203  if ( id->key().find( ":" ) != std::string::npos ) {
204  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
205  << endmsg;
206  m_showDataDeps = true;
207  }
208  }
209  } else {
210  ostdd << "\n none";
211  }
212  algosDependenciesMap[algoPtr->name()] = algoDependencies;
213  }
214 
215  if ( m_showDataDeps ) {
216  info() << ostdd.str() << endmsg;
217  }
218 
219  // Check if we have unmet global input dependencies, and, optionally, heal them
220  // WARNING: this step must be done BEFORE the Precedence Service is initialized
221  if ( m_checkDeps ) {
222  DataObjIDColl unmetDep;
223  for ( auto o : globalInp )
224  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
225 
226  if ( unmetDep.size() > 0 ) {
227 
228  std::ostringstream ost;
229  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
230  ost << "\n o " << *o << " required by Algorithm: ";
231 
232  for ( const auto& p : algosDependenciesMap )
233  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
234  }
235 
236  if ( !m_useDataLoader.empty() ) {
237 
238  // Find the DataLoader Alg
239  IAlgorithm* dataLoaderAlg( nullptr );
240  for ( IAlgorithm* algo : algos )
241  if ( algo->name() == m_useDataLoader ) {
242  dataLoaderAlg = algo;
243  break;
244  }
245 
246  if ( dataLoaderAlg == nullptr ) {
247  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
248  << "\" found, and unmet INPUT dependencies "
249  << "detected:\n"
250  << ost.str() << endmsg;
251  return StatusCode::FAILURE;
252  }
253 
254  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
255  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
256 
257  // Set the property Load of DataLoader Alg
258  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
259  if ( !dataAlg ) {
260  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
261  return StatusCode::FAILURE;
262  }
263 
264  for ( auto& id : unmetDep ) {
265  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
266  << dataLoaderAlg->name() << endmsg;
268  }
269 
270  } else {
271  fatal() << "Auto DataLoading not requested, "
272  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
273  return StatusCode::FAILURE;
274  }
275 
276  } else {
277  info() << "No unmet INPUT data dependencies were found" << endmsg;
278  }
279  }
280 
281  // Get the precedence service
282  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
283  if ( !m_precSvc.isValid() ) {
284  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
285  return StatusCode::FAILURE;
286  }
287  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
288  if ( !precSvc ) {
289  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
290  return StatusCode::FAILURE;
291  }
292 
293  // Fill the containers to convert algo names to index
294  m_algname_vect.resize( algsNumber );
295  for ( IAlgorithm* algo : algos ) {
296  const std::string& name = algo->name();
297  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
298  m_algname_index_map[name] = index;
299  m_algname_vect.at( index ) = name;
300  }
301 
302  // Shortcut for the message service
303  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
304  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
305 
306  m_eventSlots.reserve( m_maxEventsInFlight );
307  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
308  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
309  m_eventSlots.back().complete = true;
310  }
311  m_actionsCounts.assign( m_maxEventsInFlight, 0 );
312 
313  if ( m_threadPoolSize > 1 ) {
314  m_maxAlgosInFlight = (size_t)m_threadPoolSize;
315  }
316 
317  // Clearly inform about the level of concurrency
318  info() << "Concurrency level information:" << endmsg;
319  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
320  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
321 
322  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
323 
324  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
325 
326  // Simulate execution flow
327  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
328 
329  return sc;
330 }
331 //---------------------------------------------------------------------------
332 
337 {
338 
340  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
341 
342  sc = deactivate();
343  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
344 
345  info() << "Joining Scheduler thread" << endmsg;
346  m_thread.join();
347 
348  // Final error check after thread pool termination
349  if ( m_isActive == FAILURE ) {
350  error() << "problems in scheduler thread" << endmsg;
351  return StatusCode::FAILURE;
352  }
353 
354  return sc;
355 }
356 //---------------------------------------------------------------------------
357 
369 {
370 
371  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
372 
373  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
374  error() << "problems initializing ThreadPoolSvc" << endmsg;
375  m_isActive = FAILURE;
376  return;
377  }
378 
379  // Wait for actions pushed into the queue by finishing tasks.
380  action thisAction;
382 
383  m_isActive = ACTIVE;
384 
385  // Continue to wait if the scheduler is running or there is something to do
386  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
387  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
388  m_actionsQueue.pop( thisAction );
389  sc = thisAction();
390  ON_VERBOSE
391  {
392  if ( sc != StatusCode::SUCCESS )
393  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
394  else
395  verbose() << "Action succeeded." << endmsg;
396  }
397  }
398 
399  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
400  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
401  error() << "Problems terminating thread pool" << endmsg;
402  m_isActive = FAILURE;
403  }
404 }
405 
406 //---------------------------------------------------------------------------
407 
415 {
416 
417  if ( m_isActive == ACTIVE ) {
418 
419  // Set the number of slots available to an error code
420  m_freeSlots.store( 0 );
421 
422  // Empty queue
423  action thisAction;
424  while ( m_actionsQueue.try_pop( thisAction ) ) {
425  };
426 
427  // This would be the last action
428  m_actionsQueue.push( [this]() -> StatusCode {
429  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
430  m_isActive = INACTIVE;
431  return StatusCode::SUCCESS;
432  } );
433  }
434 
435  return StatusCode::SUCCESS;
436 }
437 
438 //---------------------------------------------------------------------------
439 
440 // Utils and shortcuts
441 
442 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
443 
444 //---------------------------------------------------------------------------
445 
446 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname )
447 {
448  unsigned int index = m_algname_index_map[algoname];
449  return index;
450 }
451 
452 //---------------------------------------------------------------------------
453 
454 // EventSlot management
462 {
463 
464  if ( !eventContext ) {
465  fatal() << "Event context is nullptr" << endmsg;
466  return StatusCode::FAILURE;
467  }
468 
469  if ( m_freeSlots.load() == 0 ) {
470  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  // no problem as push new event is only called from one thread (event loop manager)
475  --m_freeSlots;
476 
477  auto action = [this, eventContext]() -> StatusCode {
478  // Event processing slot forced to be the same as the wb slot
479  const unsigned int thisSlotNum = eventContext->slot();
480  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
481  if ( !thisSlot.complete ) {
482  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
483  return StatusCode::FAILURE;
484  }
485 
486  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
487  thisSlot.reset( eventContext );
488 
489  // Result status code:
491 
492  // promote to CR and DR the initial set of algorithms
493  Cause cs = {Cause::source::Root, "RootDecisionHub"};
494  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
495  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
496  result = StatusCode::FAILURE;
497  }
498 
499  if ( this->updateStates( thisSlotNum ).isFailure() ) {
500  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
501  result = StatusCode::FAILURE;
502  }
503 
504  return result;
505  }; // end of lambda
506 
507  // Kick off the scheduling!
508  ON_VERBOSE
509  {
510  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
511  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
512  }
513 
514  m_actionsQueue.push( action );
515 
516  return StatusCode::SUCCESS;
517 }
518 
519 //---------------------------------------------------------------------------
520 
522 {
523  StatusCode sc;
524  for ( auto context : eventContexts ) {
525  sc = pushNewEvent( context );
526  if ( sc != StatusCode::SUCCESS ) return sc;
527  }
528  return sc;
529 }
530 
531 //---------------------------------------------------------------------------
532 
533 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
534 
535 //---------------------------------------------------------------------------
540 {
541  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
542  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
543  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
544  // << " active: " << m_isActive << endmsg;
545  return StatusCode::FAILURE;
546  } else {
547  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
548  // << " active: " << m_isActive << endmsg;
549  m_finishedEvents.pop( eventContext );
550  ++m_freeSlots;
551  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
552  return StatusCode::SUCCESS;
553  }
554 }
555 
556 //---------------------------------------------------------------------------
561 {
562  if ( m_finishedEvents.try_pop( eventContext ) ) {
563  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
564  << endmsg;
565  ++m_freeSlots;
566  return StatusCode::SUCCESS;
567  }
568  return StatusCode::FAILURE;
569 }
570 
571 //--------------------------------------------------------------------------
572 // States Management
573 
583 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, const int sub_slot,
584  const int source_slot )
585 {
586 
587  StatusCode global_sc( StatusCode::SUCCESS );
588 
589  // Sort from the oldest to the newest event
590  // Prepare a vector of pointers to the slots to avoid copies
591  std::vector<EventSlot*> eventSlotsPtrs;
592 
593  // Consider all slots if si <0 or just one otherwise
594  if ( si < 0 ) {
595  const int eventsSlotsSize( m_eventSlots.size() );
596  eventSlotsPtrs.reserve( eventsSlotsSize );
597  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
598  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
599  }
600  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
601  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
602  } else {
603  eventSlotsPtrs.push_back( &m_eventSlots[si] );
604  }
605 
606  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
607  int iSlot = thisSlotPtr->eventContext->slot();
608 
609  // Cache the states of the algos to improve readability and performance
610  auto& thisSlot = m_eventSlots[iSlot];
611  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
612 
613  // Perform the I->CR->DR transitions
614  if ( algo_index >= 0 ) {
615  Cause cs = {Cause::source::Task, index2algname( algo_index )};
616 
617  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
618  if ( sub_slot == -1 || iSlot != source_slot ) {
619  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
620  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
621  global_sc = StatusCode::FAILURE;
622  }
623  } else {
624  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
625  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
626  global_sc = StatusCode::FAILURE;
627  }
628  }
629  }
630 
631  StatusCode partial_sc( StatusCode::FAILURE, true );
632 
633  // Perform DR->SCHEDULED
634  if ( !m_optimizationMode.empty() ) {
635  auto comp_nodes = [this]( const uint& i, const uint& j ) {
636  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
637  };
639  comp_nodes, std::vector<uint>() );
640  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
641  buffer.push( *it );
642  while ( !buffer.empty() ) {
643  bool IOBound = false;
644  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
645 
646  if ( !IOBound )
647  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
648  else
649  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
650 
651  ON_VERBOSE if ( partial_sc.isFailure() ) verbose() << "Could not apply transition from " << AState::DATAREADY
652  << " for algorithm " << index2algname( buffer.top() )
653  << " on processing slot " << iSlot << endmsg;
654 
655  buffer.pop();
656  }
657 
658  } else {
659  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
660  uint algIndex = *it;
661 
662  bool IOBound = false;
663  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
664 
665  if ( !IOBound )
666  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
667  else
668  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
669 
670  ON_VERBOSE if ( partial_sc.isFailure() ) verbose() << "Could not apply transition from " << AState::DATAREADY
671  << " for algorithm " << index2algname( algIndex )
672  << " on processing slot " << iSlot << endmsg;
673  }
674  }
675 
676  // Check for algorithms ready in sub-slots
677  for ( auto& subslot : thisSlot.allSubSlots ) {
678  auto& subslotStates = subslot.algsStates;
679  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
680  uint algIndex{*it};
681  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
682  // The following verbosity is expensive when the number of sub-slots is high
683  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
684  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
685  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
686  }
687  }
688 
689  if ( m_dumpIntraEventDynamics ) {
691  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
692  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
693  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
694  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
695  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
696  std::ofstream myfile;
697  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
698  myfile << s.str();
699  myfile.close();
700  }
701 
702  // Not complete because this would mean that the slot is already free!
703  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
704  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
705  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
706  !thisSlot.complete ) {
707 
708  thisSlot.complete = true;
709  // if the event did not fail, add it to the finished events
710  // otherwise it is taken care of in the error handling
711  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
712  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
713  << thisSlot.eventContext->slot() << ")." << endmsg;
714  m_finishedEvents.push( thisSlot.eventContext.release() );
715  }
716 
717  // now let's return the fully evaluated result of the control flow
718  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
719 
720  thisSlot.eventContext.reset( nullptr );
721 
722  } else if ( isStalled( thisSlot ) ) {
723  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
724  eventFailed( thisSlot.eventContext.get() ); // can't release yet
725  }
726  } // end loop on slots
727 
728  ON_VERBOSE verbose() << "States Updated." << endmsg;
729 
730  return global_sc;
731 }
732 
733 //---------------------------------------------------------------------------
734 
742 {
743 
744  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
745  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
746  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
747 
748  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
749 
750  return true;
751  }
752  return false;
753 }
754 
755 //---------------------------------------------------------------------------
756 
762 {
763  const uint slotIdx = eventContext->slot();
764 
765  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
766 
767  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
768 
769  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
770  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
771 
772  // Push into the finished events queue the failed context
773  m_eventSlots[slotIdx].complete = true;
774  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
775 }
776 
777 //---------------------------------------------------------------------------
778 
784 {
785 
786  // To have just one big message
787  std::ostringstream outputMS;
788 
789  outputMS << "Dumping scheduler state\n"
790  << "=========================================================================================\n"
791  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
792  << "=========================================================================================\n\n";
793 
794  //===========================================================================
795 
796  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
797  << "------------------\n\n";
798 
799  // Figure if TimelineSvc is available (used below to detect threads IDs)
800  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
801  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
802  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
803  } else {
804 
805  // Figure optimal printout layout
806  size_t indt( 0 );
807  for ( auto& slot : m_eventSlots )
808  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
809  if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
810 
811  // Figure the last running schedule across all slots
812  for ( auto& slot : m_eventSlots ) {
813  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
814  ++it ) {
815 
816  const std::string algoName{index2algname( (uint)*it )};
817 
818  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
819  << slot.eventContext->slot();
820 
821  // Try to get POSIX threads IDs the currently running tasks are scheduled to
822  if ( timelineSvc.isValid() ) {
823  TimelineEvent te{};
824  te.algorithm = algoName;
825  te.slot = slot.eventContext->slot();
826  te.event = slot.eventContext->evt();
827 
828  if ( timelineSvc->getTimelineEvent( te ) )
829  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
830  else
831  outputMS << " thread.id: [unknown]"; // this means a task has just
832  // been signed off as SCHEDULED,
833  // but has not been assigned to a thread yet
834  // (i.e., not running yet)
835  }
836  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
837  }
838  }
839  }
840 
841  //===========================================================================
842 
843  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
844  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
845 
846  int slotCount = -1;
847  for ( auto& slot : m_eventSlots ) {
848  ++slotCount;
849  if ( slot.complete ) continue;
850 
851  outputMS << "[ slot: "
852  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
853  << " event: "
854  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
855  << " ]:\n\n";
856 
857  if ( 0 > iSlot || iSlot == slotCount ) {
858 
859  // Snapshot of the Control Flow and FSM states
860  outputMS << m_precSvc->printState( slot ) << "\n";
861 
862  // Mention sub slots (this is expensive if the number of sub-slots is high)
863  /*if ( !slot.allSubSlots.empty() ) {
864  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
865  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
866  for ( auto& ss : slot.allSubSlots ) {
867  outputMS << "[ slot: " << slotID << " sub-slot entry: " << ss.entryPoint << " event: "
868  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
869  << " ]:\n\n";
870  outputMS << m_precSvc->printState( ss ) << "\n";
871  }
872  }*/
873  }
874  }
875 
876  //===========================================================================
877 
878  if ( 0 <= iSlot ) {
879  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
880  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
881  }
882 
883  outputMS << "\n=========================================================================================\n"
884  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
885  << "=========================================================================================\n\n";
886 
887  info() << outputMS.str() << endmsg;
888 }
889 
890 //---------------------------------------------------------------------------
891 
892 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
893 {
894 
895  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
896 
897  const std::string& algName( index2algname( iAlgo ) );
898  IAlgorithm* ialgoPtr = nullptr;
899  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
900 
901  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
902 
903  ++m_algosInFlight;
904  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
905  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
906  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
907  } );
908  return StatusCode::SUCCESS;
909  };
910 
911  // Avoid to use tbb if the pool size is 1 and run in this thread
912  if ( -100 != m_threadPoolSize ) {
913  // the child task that executes an Algorithm
914  tbb::task* algoTask = new ( tbb::task::allocate_root() )
915  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
916  // schedule the algoTask
917  tbb::task::enqueue( *algoTask );
918 
919  } else {
920  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
921  promote2ExecutedClosure );
922  theTask.execute();
923  }
924 
925  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
926  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
927 
928  // Update states in the appropriate event slot
929  StatusCode updateSc;
930  EventSlot& thisSlot = m_eventSlots[si];
931  if ( eventContext->usesSubSlot() ) {
932  // Sub-slot
933  size_t const subSlotIndex = eventContext->subSlot();
934  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
935  } else {
936  // Event level (standard behaviour)
937  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
938  }
939 
940  ON_VERBOSE dumpSchedulerState( -1 );
941 
942  if ( updateSc.isSuccess() )
943  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
944  return updateSc;
945  } else {
946  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
947  << endmsg;
948  return sc;
949  }
950 }
951 
952 //---------------------------------------------------------------------------
953 
954 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
955 {
956 
957  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
958 
959  // bool IOBound = m_precSvc->isBlocking(algName);
960 
961  const std::string& algName( index2algname( iAlgo ) );
962  IAlgorithm* ialgoPtr = nullptr;
963  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
964 
965  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
966 
967  ++m_IOBoundAlgosInFlight;
968  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
969  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
970  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
971  eventContext );
972  } );
973  return StatusCode::SUCCESS;
974  };
975  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
976  // tbb::task)? it seems it works..
977  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
978  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
979  m_IOBoundAlgScheduler->push( *theTask );
980 
981  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
982  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
983 
984  // Update states in the appropriate event slot
985  StatusCode updateSc;
986  EventSlot& thisSlot = m_eventSlots[si];
987  if ( eventContext->usesSubSlot() ) {
988  // Sub-slot
989  size_t const subSlotIndex = eventContext->subSlot();
990  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
991  } else {
992  // Event level (standard behaviour)
993  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
994  }
995 
996  ON_VERBOSE if ( updateSc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algName
997  << " to SCHEDULED on slot " << si << endmsg;
998  return updateSc;
999  } else {
1000  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1001  << " on slot " << si << endmsg;
1002  return sc;
1003  }
1004 }
1005 
1006 //---------------------------------------------------------------------------
1007 
1012  EventContext* eventContext )
1013 {
1014  Gaudi::Hive::setCurrentContext( eventContext );
1015  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1016 
1017  if ( sc.isFailure() ) {
1018  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1019  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1020  return StatusCode::FAILURE;
1021  }
1022 
1023  --m_algosInFlight;
1024 
1025  EventSlot& thisSlot = m_eventSlots[si];
1026 
1027  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1028 
1029  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1030  AState state = algstate.execStatus().isSuccess()
1031  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1032  : AState::ERROR;
1033 
1034  // Update states in the appropriate slot
1035  int subSlotIndex = -1;
1036  if ( eventContext->usesSubSlot() ) {
1037  // Sub-slot
1038  subSlotIndex = eventContext->subSlot();
1039  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1040  } else {
1041  // Event level (standard behaviour)
1042  sc = thisSlot.algsStates.set( iAlgo, state );
1043  }
1044 
1045  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "Promoting " << algo->name() << " on slot " << si << " to " << state
1046  << endmsg;
1047 
1048  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1049  << m_algosInFlight << endmsg;
1050 
1051  // Schedule an update of the status of the algorithms
1052  ++m_actionsCounts[si];
1053  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1054  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1055  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1056  } );
1057 
1058  return sc;
1059 }
1060 
1061 //---------------------------------------------------------------------------
1062 
1067  EventContext* eventContext )
1068 {
1069  Gaudi::Hive::setCurrentContext( eventContext );
1070  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1071 
1072  if ( sc.isFailure() ) {
1073  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1074  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1075  return StatusCode::FAILURE;
1076  }
1077 
1078  --m_IOBoundAlgosInFlight;
1079 
1080  EventSlot& thisSlot = m_eventSlots[si];
1081 
1082  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1083  << endmsg;
1084 
1085  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1086  AState state = algstate.execStatus().isSuccess()
1087  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1088  : AState::ERROR;
1089 
1090  // Update states in the appropriate slot
1091  int subSlotIndex = -1;
1092  if ( eventContext->usesSubSlot() ) {
1093  // Sub-slot
1094  subSlotIndex = eventContext->subSlot();
1095  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1096  } else {
1097  // Event level (standard behaviour)
1098  sc = thisSlot.algsStates.set( iAlgo, state );
1099  }
1100 
1101  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si
1102  << " to " << state << endmsg;
1103 
1104  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1105  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1106 
1107  // Schedule an update of the status of the algorithms
1108  ++m_actionsCounts[si];
1109  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1110  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1111  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1112  } );
1113 
1114  return sc;
1115 }
1116 
1117 //---------------------------------------------------------------------------
1118 
1119 // Method to inform the scheduler about event views
1120 
1122  std::unique_ptr<EventContext> viewContext )
1123 {
1124  // Prevent view nesting
1125  if ( sourceContext->usesSubSlot() ) {
1126  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1127  return StatusCode::FAILURE;
1128  }
1129 
1130  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1131 
1132  // It's not possible to create an std::functional from a move-capturing lambda
1133  // So, we have to release the unique pointer
1134  auto action =
1135  [ this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(), &nodeName ]()->StatusCode
1136  {
1137 
1138  // Attach the sub-slot to the top-level slot
1139  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1140 
1141  if ( viewContextPtr ) {
1142  // Re-create the unique pointer
1143  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1144  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1145  return StatusCode::SUCCESS;
1146  } else {
1147  // Disable the view node if there are no views
1148  topSlot.disableSubSlots( nodeName );
1149  return StatusCode::SUCCESS;
1150  }
1151  };
1152 
1153  m_actionsQueue.push( std::move( action ) );
1154 
1155  return StatusCode::SUCCESS;
1156 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:65
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
Definition: EventSlot.h:14
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:759
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
Definition: EventSlot.h:71
StatusCode finalize() override
Definition: Service.cpp:173
ContextID_t slot() const
Definition: EventContext.h:50
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
Definition: EventSlot.h:53
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Definition: StatusCode.h:287
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T...args)
std::string algorithm
Definition: ITimelineSvc.h:21
void activate()
Activate scheduler.
T end(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:94
bool usesSubSlot() const
Definition: EventContext.h:52
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:31
STL class.
bool isFailure() const
Definition: StatusCode.h:139
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
T release(T...args)
T setw(T...args)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:49
STL class.
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
T push_back(T...args)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T get(T...args)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
Iterator begin(State kind)
T any_of(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
string s
Definition: gaudirun.py:253
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
Definition: EventSlot.h:83
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:40
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
Definition: DataObjID.cpp:99
ContextID_t subSlot() const
Definition: EventContext.h:51
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:77
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:79
T reserve(T...args)
#define ON_VERBOSE
Iterator end(State kind)