The Gaudi Framework  v32r0 (3325bb39)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
12 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
13 
14 // C++
15 #include <algorithm>
16 #include <map>
17 #include <queue>
18 #include <sstream>
19 #include <unordered_set>
20 
21 // External libs
22 #include "boost/algorithm/string.hpp"
23 #include "boost/thread.hpp"
24 #include "boost/tokenizer.hpp"
25 // DP waiting for the TBB service
26 #include "tbb/task_scheduler_init.h"
27 
28 // Instantiation of a static factory class used by clients to create instances of this service
30 
31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
33 
34 namespace {
35  struct DataObjIDSorter {
36  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
37  };
38 
39  // Sort a DataObjIDColl in a well-defined, reproducible manner.
40  // Used for making debugging dumps.
41  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
43  v.reserve( coll.size() );
44  for ( const DataObjID& id : coll ) v.push_back( &id );
45  std::sort( v.begin(), v.end(), DataObjIDSorter() );
46  return v;
47  }
48 
49  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
50  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
51  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
52  }
53 } // namespace
54 
55 //---------------------------------------------------------------------------
56 
64 
65  // Initialise mother class (read properties, ...)
67  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
68 
69  // Get hold of the TBBSvc. This should initialize the thread pool
70  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
71  if ( !m_threadPoolSvc.isValid() ) {
72  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
73  return StatusCode::FAILURE;
74  }
75 
76  // Activate the scheduler in another thread.
77  info() << "Activating scheduler in a separate thread" << endmsg;
78  m_thread = std::thread( [this]() { this->activate(); } );
79 
80  while ( m_isActive != ACTIVE ) {
81  if ( m_isActive == FAILURE ) {
82  fatal() << "Terminating initialization" << endmsg;
83  return StatusCode::FAILURE;
84  } else {
85  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
86  sleep( 1 );
87  }
88  }
89 
90  if ( m_enableCondSvc ) {
91  // Get hold of the CondSvc
92  m_condSvc = serviceLocator()->service( "CondSvc" );
93  if ( !m_condSvc.isValid() ) {
94  warning() << "No CondSvc found, or not enabled. "
95  << "Will not manage CondAlgorithms" << endmsg;
96  m_enableCondSvc = false;
97  }
98  }
99 
100  // Get the algo resource pool
101  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
102  if ( !m_algResourcePool.isValid() ) {
103  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
104  return StatusCode::FAILURE;
105  }
106 
107  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
108  if ( !m_algExecStateSvc.isValid() ) {
109  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
110  return StatusCode::FAILURE;
111  }
112 
113  // Get Whiteboard
114  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
115  if ( !m_whiteboard.isValid() ) {
116  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
117  return StatusCode::FAILURE;
118  }
119 
120  // Get dedicated scheduler for I/O-bound algorithms
121  if ( m_useIOBoundAlgScheduler ) {
122  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
123  if ( !m_IOBoundAlgScheduler.isValid() )
124  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
125  }
126 
127  // Set the MaxEventsInFlight parameters from the number of WB stores
128  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
129 
130  // Set the number of free slots
131  m_freeSlots = m_maxEventsInFlight;
132 
133  // Get the list of algorithms
134  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
135  const unsigned int algsNumber = algos.size();
136  if ( algsNumber != 0 ) {
137  info() << "Found " << algsNumber << " algorithms" << endmsg;
138  } else {
139  error() << "No algorithms found" << endmsg;
140  return StatusCode::FAILURE;
141  }
142 
143  /* Dependencies
144  1) Look for handles in algo, if none
145  2) Assume none are required
146  */
147 
148  DataObjIDColl globalInp, globalOutp;
149 
150  // figure out all outputs
151  for ( IAlgorithm* ialgoPtr : algos ) {
152  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
153  if ( !algoPtr ) {
154  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
155  return StatusCode::FAILURE;
156  }
157  for ( auto id : algoPtr->outputDataObjs() ) {
158  auto r = globalOutp.insert( id );
159  if ( !r.second ) {
160  warning() << "multiple algorithms declare " << id
161  << " as output! could be a single instance in multiple paths "
162  "though, or control flow may guarantee only one runs...!"
163  << endmsg;
164  }
165  }
166  }
167 
168  std::ostringstream ostdd;
169  ostdd << "Data Dependencies for Algorithms:";
170 
171  std::map<std::string, DataObjIDColl> algosDependenciesMap;
172  for ( IAlgorithm* ialgoPtr : algos ) {
173  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
174  if ( nullptr == algoPtr ) {
175  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176  << ": this will result in a crash." << endmsg;
177  return StatusCode::FAILURE;
178  }
179 
180  ostdd << "\n " << algoPtr->name();
181 
182  DataObjIDColl algoDependencies;
183  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
184  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
185  DataObjID id = *idp;
186  ostdd << "\n o INPUT " << id;
187  if ( id.key().find( ":" ) != std::string::npos ) {
188  ostdd << " contains alternatives which require resolution...\n";
189  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
190  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
191  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
192  } );
193  if ( itok != tokens.end() ) {
194  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
195  id.updateKey( *itok );
196  } else {
197  error() << "failed to find alternate in global output list"
198  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
199  m_showDataDeps = true;
200  }
201  }
202  algoDependencies.insert( id );
203  globalInp.insert( id );
204  }
205  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
206  ostdd << "\n o OUTPUT " << *id;
207  if ( id->key().find( ":" ) != std::string::npos ) {
208  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
209  << endmsg;
210  m_showDataDeps = true;
211  }
212  }
213  } else {
214  ostdd << "\n none";
215  }
216  algosDependenciesMap[algoPtr->name()] = algoDependencies;
217  }
218 
219  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
263  << endmsg;
264  return StatusCode::FAILURE;
265  }
266 
267  for ( auto& id : unmetDep ) {
268  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
269  << dataLoaderAlg->name() << endmsg;
271  }
272 
273  } else {
274  fatal() << "Auto DataLoading not requested, "
275  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
276  return StatusCode::FAILURE;
277  }
278 
279  } else {
280  info() << "No unmet INPUT data dependencies were found" << endmsg;
281  }
282  }
283 
284  // Get the precedence service
285  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
286  if ( !m_precSvc.isValid() ) {
287  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
291  if ( !precSvc ) {
292  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  // Fill the containers to convert algo names to index
297  m_algname_vect.resize( algsNumber );
298  for ( IAlgorithm* algo : algos ) {
299  const std::string& name = algo->name();
300  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
301  m_algname_index_map[name] = index;
302  m_algname_vect.at( index ) = name;
303  }
304 
305  // Shortcut for the message service
306  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
307  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
308 
309  m_eventSlots.reserve( m_maxEventsInFlight );
310  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
312  m_eventSlots.back().complete = true;
313  }
314  m_actionsCounts.assign( m_maxEventsInFlight, 0 );
315 
316  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
317 
318  // Clearly inform about the level of concurrency
319  info() << "Concurrency level information:" << endmsg;
320  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
321  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
322 
323  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
324 
325  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
326 
327  // Simulate execution flow
328  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
329 
330  return sc;
331 }
332 //---------------------------------------------------------------------------
333 
338 
340  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
341 
342  sc = deactivate();
343  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
344 
345  info() << "Joining Scheduler thread" << endmsg;
346  m_thread.join();
347 
348  // Final error check after thread pool termination
349  if ( m_isActive == FAILURE ) {
350  error() << "problems in scheduler thread" << endmsg;
351  return StatusCode::FAILURE;
352  }
353 
354  return sc;
355 }
356 //---------------------------------------------------------------------------
357 
369 
370  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
371 
372  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
373  error() << "problems initializing ThreadPoolSvc" << endmsg;
374  m_isActive = FAILURE;
375  return;
376  }
377 
378  // Wait for actions pushed into the queue by finishing tasks.
379  action thisAction;
381 
382  m_isActive = ACTIVE;
383 
384  // Continue to wait if the scheduler is running or there is something to do
385  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
386  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
387  m_actionsQueue.pop( thisAction );
388  sc = thisAction();
389  ON_VERBOSE {
390  if ( sc.isFailure() )
391  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
392  else
393  verbose() << "Action succeeded." << endmsg;
394  }
395  else sc.ignore();
396  }
397 
398  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
399  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
400  error() << "Problems terminating thread pool" << endmsg;
401  m_isActive = FAILURE;
402  }
403 }
404 
405 //---------------------------------------------------------------------------
406 
414 
415  if ( m_isActive == ACTIVE ) {
416 
417  // Set the number of slots available to an error code
418  m_freeSlots.store( 0 );
419 
420  // Empty queue
421  action thisAction;
422  while ( m_actionsQueue.try_pop( thisAction ) ) {};
423 
424  // This would be the last action
425  m_actionsQueue.push( [this]() -> StatusCode {
426  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
427  m_isActive = INACTIVE;
428  return StatusCode::SUCCESS;
429  } );
430  }
431 
432  return StatusCode::SUCCESS;
433 }
434 
435 //---------------------------------------------------------------------------
436 
437 // EventSlot management
445 
446  if ( !eventContext ) {
447  fatal() << "Event context is nullptr" << endmsg;
448  return StatusCode::FAILURE;
449  }
450 
451  if ( m_freeSlots.load() == 0 ) {
452  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
453  return StatusCode::FAILURE;
454  }
455 
456  // no problem as push new event is only called from one thread (event loop manager)
457  --m_freeSlots;
458 
459  auto action = [this, eventContext]() -> StatusCode {
460  // Event processing slot forced to be the same as the wb slot
461  const unsigned int thisSlotNum = eventContext->slot();
462  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
463  if ( !thisSlot.complete ) {
464  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
465  return StatusCode::FAILURE;
466  }
467 
468  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
469  thisSlot.reset( eventContext );
470 
471  // Result status code:
473 
474  // promote to CR and DR the initial set of algorithms
475  Cause cs = {Cause::source::Root, "RootDecisionHub"};
476  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
477  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
478  result = StatusCode::FAILURE;
479  }
480 
481  if ( this->updateStates( thisSlotNum ).isFailure() ) {
482  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
483  result = StatusCode::FAILURE;
484  }
485 
486  return result;
487  }; // end of lambda
488 
489  // Kick off the scheduling!
490  ON_VERBOSE {
491  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
492  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
493  }
494 
495  m_actionsQueue.push( action );
496 
497  return StatusCode::SUCCESS;
498 }
499 
500 //---------------------------------------------------------------------------
501 
503  StatusCode sc;
504  for ( auto context : eventContexts ) {
505  sc = pushNewEvent( context );
506  if ( sc != StatusCode::SUCCESS ) return sc;
507  }
508  return sc;
509 }
510 
511 //---------------------------------------------------------------------------
512 
513 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
514 
515 //---------------------------------------------------------------------------
520 
521  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
522  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
523  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
524  // << " active: " << m_isActive << endmsg;
525  return StatusCode::FAILURE;
526  } else {
527  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
528  // << " active: " << m_isActive << endmsg;
529  m_finishedEvents.pop( eventContext );
530  ++m_freeSlots;
531  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
532  return StatusCode::SUCCESS;
533  }
534 }
535 
536 //---------------------------------------------------------------------------
541 
542  if ( m_finishedEvents.try_pop( eventContext ) ) {
543  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
544  << endmsg;
545  ++m_freeSlots;
546  return StatusCode::SUCCESS;
547  }
548  return StatusCode::FAILURE;
549 }
550 
551 //--------------------------------------------------------------------------
552 // States Management
553 
563 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, const int sub_slot,
564  const int source_slot ) {
565 
566  StatusCode global_sc( StatusCode::SUCCESS );
567 
568  // Retry algs
569  algQueueEntry queuePop;
570  const size_t retries = m_retryQueue.size();
571  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
572 
573  queuePop = m_retryQueue.front();
574  m_retryQueue.pop();
575 
576  global_sc = enqueue( queuePop.algIndex, queuePop.slotIndex, queuePop.contextPtr );
577  }
578 
579  // Sort from the oldest to the newest event
580  // Prepare a vector of pointers to the slots to avoid copies
581  std::vector<EventSlot*> eventSlotsPtrs;
582 
583  // Consider all slots if si <0 or just one otherwise
584  if ( si < 0 ) {
585  const int eventsSlotsSize( m_eventSlots.size() );
586  eventSlotsPtrs.reserve( eventsSlotsSize );
587  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
588  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
589  }
590  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
591  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
592  } else {
593  eventSlotsPtrs.push_back( &m_eventSlots[si] );
594  }
595 
596  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
597  int iSlot = thisSlotPtr->eventContext->slot();
598 
599  // Cache the states of the algos to improve readability and performance
600  auto& thisSlot = m_eventSlots[iSlot];
601  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
602 
603  // Perform the I->CR->DR transitions
604  if ( algo_index >= 0 ) {
605  Cause cs = {Cause::source::Task, index2algname( algo_index )};
606 
607  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
608  if ( sub_slot == -1 || iSlot != source_slot ) {
609  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
610  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
611  global_sc = StatusCode::FAILURE;
612  }
613  } else {
614  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
615  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
616  global_sc = StatusCode::FAILURE;
617  }
618  }
619  }
620 
621  StatusCode partial_sc( StatusCode::FAILURE, true );
622 
623  // Perform DR->SCHEDULED
624  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
625  uint algIndex = *it;
626 
627  bool IOBound = false;
628  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
629 
630  if ( !IOBound )
631  partial_sc = enqueue( algIndex, iSlot, thisSlotPtr->eventContext.get() );
632  else
633  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
634 
635  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
636  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
637  << " on processing slot " << iSlot << endmsg;
638  }
639 
640  // Check for algorithms ready in sub-slots
641  for ( auto& subslot : thisSlot.allSubSlots ) {
642  auto& subslotStates = subslot.algsStates;
643  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
644  uint algIndex{*it};
645  partial_sc = enqueue( algIndex, iSlot, subslot.eventContext.get() );
646  // The following verbosity is expensive when the number of sub-slots is high
647  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
648  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
649  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
650  }
651  }
652 
653  if ( m_dumpIntraEventDynamics ) {
655  s << ( algo_index != -1 ? index2algname( algo_index ) : "START" ) << ", "
656  << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
657  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
658  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
659  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
660  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
661  std::ofstream myfile;
662  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
663  myfile << s.str();
664  myfile.close();
665  }
666 
667  // Not complete because this would mean that the slot is already free!
668  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
669  !thisSlot.algsStates.containsAny(
670  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
671  !subSlotAlgsInStates( thisSlot,
672  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
673  !thisSlot.complete ) {
674 
675  thisSlot.complete = true;
676  // if the event did not fail, add it to the finished events
677  // otherwise it is taken care of in the error handling
678  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
679  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
680  << thisSlot.eventContext->slot() << ")." << endmsg;
681  m_finishedEvents.push( thisSlot.eventContext.release() );
682  }
683 
684  // now let's return the fully evaluated result of the control flow
685  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
686 
687  thisSlot.eventContext.reset( nullptr );
688 
689  } else if ( isStalled( thisSlot ) ) {
690  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
691  eventFailed( thisSlot.eventContext.get() ); // can't release yet
692  }
693  partial_sc.ignore();
694  } // end loop on slots
695 
696  ON_VERBOSE verbose() << "States Updated." << endmsg;
697 
698  return global_sc;
699 }
700 
701 //---------------------------------------------------------------------------
702 // Update states in the appropriate event slot
704 
705  StatusCode updateSc;
706  EventSlot& thisSlot = m_eventSlots[contextPtr->slot()];
707  if ( contextPtr->usesSubSlot() ) {
708  // Sub-slot
709  size_t const subSlotIndex = contextPtr->subSlot();
710  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
711  } else {
712  // Event level (standard behaviour)
713  updateSc = thisSlot.algsStates.set( iAlgo, state );
714  }
715  return updateSc;
716 }
717 
718 //---------------------------------------------------------------------------
719 
726 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
727 
728  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
729  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
730  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
731 
732  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
733 
734  return true;
735  }
736  return false;
737 }
738 
739 //---------------------------------------------------------------------------
740 
746  const uint slotIdx = eventContext->slot();
747 
748  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
749 
750  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
751 
752  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
753  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
754 
755  // Push into the finished events queue the failed context
756  m_eventSlots[slotIdx].complete = true;
757  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
758 }
759 
760 //---------------------------------------------------------------------------
761 
767 
768  // To have just one big message
769  std::ostringstream outputMS;
770 
771  outputMS << "Dumping scheduler state\n"
772  << "=========================================================================================\n"
773  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
774  << "=========================================================================================\n\n";
775 
776  //===========================================================================
777 
778  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
779  << "------------------\n\n";
780 
781  // Figure if TimelineSvc is available (used below to detect threads IDs)
782  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
783  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
784  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
785  } else {
786 
787  // Figure optimal printout layout
788  size_t indt( 0 );
789  for ( auto& slot : m_eventSlots )
790  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
791  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
792 
793  // Figure the last running schedule across all slots
794  for ( auto& slot : m_eventSlots ) {
795  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
796  ++it ) {
797 
798  const std::string algoName{index2algname( *it )};
799 
800  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
801  << slot.eventContext->slot();
802 
803  // Try to get POSIX threads IDs the currently running tasks are scheduled to
804  if ( timelineSvc.isValid() ) {
805  TimelineEvent te{};
806  te.algorithm = algoName;
807  te.slot = slot.eventContext->slot();
808  te.event = slot.eventContext->evt();
809 
810  if ( timelineSvc->getTimelineEvent( te ) )
811  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
812  else
813  outputMS << " thread.id: [unknown]"; // this means a task has just
814  // been signed off as SCHEDULED,
815  // but has not been assigned to a thread yet
816  // (i.e., not running yet)
817  }
818  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
819  }
820  }
821  }
822 
823  //===========================================================================
824 
825  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
826  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
827 
828  int slotCount = -1;
829  for ( auto& slot : m_eventSlots ) {
830  ++slotCount;
831  if ( slot.complete ) continue;
832 
833  outputMS << "[ slot: "
834  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
835  << " event: "
836  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
837  << " ]:\n\n";
838 
839  if ( 0 > iSlot || iSlot == slotCount ) {
840 
841  // Snapshot of the Control Flow and FSM states
842  outputMS << m_precSvc->printState( slot ) << "\n";
843 
844  // Mention sub slots (this is expensive if the number of sub-slots is high)
845  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
846  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
847  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
848  for ( auto& ss : slot.allSubSlots ) {
849  outputMS << "[ slot: " << slotID << " sub-slot entry: " << ss.entryPoint << " event: "
850  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
851  << " ]:\n\n";
852  outputMS << m_precSvc->printState( ss ) << "\n";
853  }
854  }
855  }
856  }
857 
858  //===========================================================================
859 
860  if ( 0 <= iSlot ) {
861  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
862  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
863  }
864 
865  outputMS << "\n=========================================================================================\n"
866  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
867  << "=========================================================================================\n\n";
868 
869  info() << outputMS.str() << endmsg;
870 }
871 
872 //---------------------------------------------------------------------------
873 
874 StatusCode AvalancheSchedulerSvc::enqueue( unsigned int iAlgo, int si, EventContext* eventContext ) {
875 
876  // Use the algorithm rank to sort the queue
877  const std::string& algName( index2algname( iAlgo ) );
878  unsigned int rank = 0;
879  if ( !m_optimizationMode.empty() ) { rank = m_precSvc->getPriority( algName ); }
880 
881  // Get algorithm pointer
882  IAlgorithm* iAlgoPtr = nullptr;
883  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( algName, iAlgoPtr ) );
884 
885  // Check if the algorithm is available
886  AState state;
887  if ( getAlgSC.isSuccess() ) {
888 
889  // Add the algorithm to the scheduled queue
890  m_scheduledQueue.push( {iAlgo, si, eventContext, rank, iAlgoPtr} );
891  ++m_algosInFlight;
892 
893  // Avoid to use tbb if the pool size is 1 and run in this thread
894  if ( -100 != m_threadPoolSize ) {
895 
896  // the child task that executes an Algorithm
897  tbb::task* algoTask =
898  new ( tbb::task::allocate_root() ) AlgoExecutionTask( this, serviceLocator(), m_algExecStateSvc );
899  // schedule the algoTask
900  tbb::task::enqueue( *algoTask );
901  } else {
902 
903  AlgoExecutionTask theTask( this, serviceLocator(), m_algExecStateSvc );
904  theTask.execute();
905  }
906 
907  ON_DEBUG debug() << "Algorithm " << index2algname( iAlgo ) << " was submitted on event " << eventContext->evt()
908  << " in slot " << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
909 
910  state = AState::SCHEDULED;
911  } else {
912 
913  // Add the algorithm to the retry queue
914  m_retryQueue.push( {iAlgo, si, eventContext, rank, nullptr} );
915 
916  state = AState::RESOURCELESS;
917  }
918 
919  // Update alg state
920  StatusCode updateSc = setAlgState( iAlgo, eventContext, state );
921 
922  ON_VERBOSE dumpSchedulerState( -1 );
923 
924  if ( updateSc.isSuccess() )
925  ON_VERBOSE verbose() << "Promoting " << index2algname( iAlgo ) << " to " << state << " on slot " << si << endmsg;
926  return updateSc;
927 }
928 
929 //---------------------------------------------------------------------------
930 
931 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
932 
933  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
934 
935  // bool IOBound = m_precSvc->isBlocking(algName);
936 
937  const std::string& algName( index2algname( iAlgo ) );
938  IAlgorithm* ialgoPtr = nullptr;
939  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
940 
941  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
942 
943  ++m_IOBoundAlgosInFlight;
944  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
945  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
946  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
947  eventContext );
948  } );
949  return StatusCode::SUCCESS;
950  };
951  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
952  // tbb::task)? it seems it works..
953  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
954  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
955  m_IOBoundAlgScheduler->push( *theTask );
956 
957  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
958  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
959 
960  // Update alg state
961  StatusCode updateSc = setAlgState( iAlgo, eventContext, AState::SCHEDULED );
962 
963  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
964  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
965  return updateSc;
966  } else {
967  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
968  << " on slot " << si << endmsg;
969  return sc;
970  }
971 }
972 
973 //---------------------------------------------------------------------------
974 
978 StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si, EventContext* eventContext ) {
979 
980  const std::string& algName( index2algname( iAlgo ) );
981 
982  Gaudi::Hive::setCurrentContext( eventContext );
983 
984  --m_algosInFlight;
985 
986  ON_DEBUG debug() << "Trying to handle execution result of " << algName << " on slot " << si << endmsg;
987 
988  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algName, *eventContext );
989  AState state = algstate.execStatus().isSuccess()
990  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
991  : AState::ERROR;
992 
993  // Update alg state
994  StatusCode sc = setAlgState( iAlgo, eventContext, state );
995 
996  ON_VERBOSE if ( sc.isSuccess() ) verbose()
997  << "Promoting " << algName << " on slot " << si << " to " << state << endmsg;
998 
999  ON_DEBUG debug() << "Algorithm " << algName << " executed in slot " << si << ". Algorithms scheduled are "
1000  << m_algosInFlight << endmsg;
1001 
1002  // Schedule an update of the status of the algorithms
1003  int subSlotIndex = -1;
1004  if ( eventContext->usesSubSlot() ) subSlotIndex = eventContext->subSlot();
1005  ++m_actionsCounts[si];
1006 
1007  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1008  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1009  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1010  } );
1011 
1012  return sc;
1013 }
1014 
1015 //---------------------------------------------------------------------------
1016 
1021  EventContext* eventContext ) {
1022  Gaudi::Hive::setCurrentContext( eventContext );
1023  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1024 
1025  if ( sc.isFailure() ) {
1026  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1027  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1028  return StatusCode::FAILURE;
1029  }
1030 
1031  --m_IOBoundAlgosInFlight;
1032 
1033  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1034  << endmsg;
1035 
1036  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1037  AState state = algstate.execStatus().isSuccess()
1038  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1039  : AState::ERROR;
1040 
1041  // Update states in the appropriate slot
1042  sc = setAlgState( iAlgo, eventContext, state );
1043 
1044  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1045  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1046 
1047  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1048  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1049 
1050  // Schedule an update of the status of the algorithms
1051  ++m_actionsCounts[si];
1052  int subSlotIndex = -1;
1053  if ( eventContext->usesSubSlot() ) subSlotIndex = eventContext->subSlot();
1054 
1055  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1056  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1057  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1058  } );
1059 
1060  return sc;
1061 }
1062 
1063 //---------------------------------------------------------------------------
1064 
1065 // Method to inform the scheduler about event views
1066 
1068  std::unique_ptr<EventContext> viewContext ) {
1069  // Prevent view nesting
1070  if ( sourceContext->usesSubSlot() ) {
1071  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1072  return StatusCode::FAILURE;
1073  }
1074 
1075  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1076 
1077  // It's not possible to create an std::functional from a move-capturing lambda
1078  // So, we have to release the unique pointer
1079  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1080  &nodeName]() -> StatusCode {
1081  // Attach the sub-slot to the top-level slot
1082  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1083 
1084  if ( viewContextPtr ) {
1085  // Re-create the unique pointer
1086  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1087  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1088  return StatusCode::SUCCESS;
1089  } else {
1090  // Disable the view node if there are no views
1091  topSlot.disableSubSlots( nodeName );
1092  return StatusCode::SUCCESS;
1093  }
1094  };
1095 
1096  m_actionsQueue.push( std::move( action ) );
1097 
1098  return StatusCode::SUCCESS;
1099 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:63
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
Definition: Service.cpp:60
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
Definition: EventSlot.h:14
T empty(T...args)
T open(T...args)
unsigned int algIndex
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
Definition: EventSlot.h:68
StatusCode finalize() override
Definition: Service.cpp:164
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
ContextID_t slot() const
Definition: EventContext.h:41
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
Definition: EventSlot.h:51
int slotIndex
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:643
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Definition: StatusCode.h:267
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::string algorithm
Definition: ITimelineSvc.h:21
void activate()
Activate scheduler.
T end(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:43
EventContext * contextPtr
Struct to hold entries in the alg queues.
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
STL class.
bool isFailure() const
Definition: StatusCode.h:130
StatusCode setAlgState(unsigned int iAlgo, EventContext *contextPtr, AState state)
T release(T...args)
T setw(T...args)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
T push_back(T...args)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T...args)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T get(T...args)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
Iterator begin(State kind)
T any_of(T...args)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:153
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
string s
Definition: gaudirun.py:316
StatusCode enqueue(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
constexpr static const auto FAILURE
Definition: StatusCode.h:86
T hex(T...args)
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
class MergingTransformer< Out(const vector_of_const_< In > false
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
Definition: DataObjID.cpp:88
ContextID_t subSlot() const
Definition: EventContext.h:42
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
T reserve(T...args)
StatusCode promoteToExecuted(unsigned int iAlgo, int si, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
#define ON_VERBOSE
Iterator end(State kind)