The Gaudi Framework  v30r2 (9eca68f7)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
7 #include "GaudiKernel/Algorithm.h" // can be removed ASA dynamic casts to Algorithm are removed
10 #include "GaudiKernel/IAlgorithm.h"
13 
14 // C++
15 #include <algorithm>
16 #include <map>
17 #include <queue>
18 #include <sstream>
19 #include <unordered_set>
20 
21 // External libs
22 #include "boost/algorithm/string.hpp"
23 #include "boost/thread.hpp"
24 #include "boost/tokenizer.hpp"
25 // DP waiting for the TBB service
26 #include "tbb/task_scheduler_init.h"
27 
28 // Instantiation of a static factory class used by clients to create instances of this service
30 
31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
33 
34 namespace
35 {
36  struct DataObjIDSorter {
37  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
38  };
39 
40  // Sort a DataObjIDColl in a well-defined, reproducible manner.
41  // Used for making debugging dumps.
42  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
43  {
45  v.reserve( coll.size() );
46  for ( const DataObjID& id : coll ) v.push_back( &id );
47  std::sort( v.begin(), v.end(), DataObjIDSorter() );
48  return v;
49  }
50 }
51 
52 //===========================================================================
53 // Infrastructure methods
54 
61 {
62 
63  // Initialise mother class (read properties, ...)
65  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
66 
67  // Get hold of the TBBSvc. This should initialize the thread pool
68  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
69  if ( !m_threadPoolSvc.isValid() ) {
70  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
71  return StatusCode::FAILURE;
72  }
73 
74  // Activate the scheduler in another thread.
75  info() << "Activating scheduler in a separate thread" << endmsg;
76  m_thread = std::thread( [this]() { this->activate(); } );
77 
78  while ( m_isActive != ACTIVE ) {
79  if ( m_isActive == FAILURE ) {
80  fatal() << "Terminating initialization" << endmsg;
81  return StatusCode::FAILURE;
82  } else {
83  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
84  sleep( 1 );
85  }
86  }
87 
88  if ( m_enableCondSvc ) {
89  // Get hold of the CondSvc
90  m_condSvc = serviceLocator()->service( "CondSvc" );
91  if ( !m_condSvc.isValid() ) {
92  warning() << "No CondSvc found, or not enabled. "
93  << "Will not manage CondAlgorithms" << endmsg;
94  m_enableCondSvc = false;
95  }
96  }
97 
98  // Get the algo resource pool
99  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
100  if ( !m_algResourcePool.isValid() ) {
101  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
102  return StatusCode::FAILURE;
103  }
104 
105  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
106  if ( !m_algExecStateSvc.isValid() ) {
107  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
108  return StatusCode::FAILURE;
109  }
110 
111  // Get Whiteboard
112  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
113  if ( !m_whiteboard.isValid() ) {
114  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
115  return StatusCode::FAILURE;
116  }
117 
118  // Get dedicated scheduler for I/O-bound algorithms
119  if ( m_useIOBoundAlgScheduler ) {
120  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
121  if ( !m_IOBoundAlgScheduler.isValid() )
122  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
123  }
124 
125  // Set the MaxEventsInFlight parameters from the number of WB stores
126  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
127 
128  // Set the number of free slots
129  m_freeSlots = m_maxEventsInFlight;
130 
131  // set global concurrency flags
133 
134  // Get the list of algorithms
135  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
136  const unsigned int algsNumber = algos.size();
137  info() << "Found " << algsNumber << " algorithms" << endmsg;
138 
139  /* Dependencies
140  1) Look for handles in algo, if none
141  2) Assume none are required
142  */
143 
144  DataObjIDColl globalInp, globalOutp;
145 
146  // figure out all outputs
147  for ( IAlgorithm* ialgoPtr : algos ) {
148  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
149  if ( !algoPtr ) {
150  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
151  }
152  for ( auto id : algoPtr->outputDataObjs() ) {
153  auto r = globalOutp.insert( id );
154  if ( !r.second ) {
155  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
156  "though, or control flow may guarantee only one runs...!"
157  << endmsg;
158  }
159  }
160  }
161 
162  std::ostringstream ostdd;
163  ostdd << "Data Dependencies for Algorithms:";
164 
165  std::map<std::string, DataObjIDColl> algosDependenciesMap;
166  for ( IAlgorithm* ialgoPtr : algos ) {
167  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
168  if ( nullptr == algoPtr ) {
169  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
170  << ": this will result in a crash." << endmsg;
171  return StatusCode::FAILURE;
172  }
173 
174  ostdd << "\n " << algoPtr->name();
175 
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
179  DataObjID id = *idp;
180  ostdd << "\n o INPUT " << id;
181  if ( id.key().find( ":" ) != std::string::npos ) {
182  ostdd << " contains alternatives which require resolution...\n";
183  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
184  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if ( itok != tokens.end() ) {
188  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
189  id.updateKey( *itok );
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
193  m_showDataDeps = true;
194  }
195  }
196  algoDependencies.insert( id );
197  globalInp.insert( id );
198  }
199  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
200  ostdd << "\n o OUTPUT " << *id;
201  if ( id->key().find( ":" ) != std::string::npos ) {
202  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
203  << endmsg;
204  m_showDataDeps = true;
205  }
206  }
207  } else {
208  ostdd << "\n none";
209  }
210  algosDependenciesMap[algoPtr->name()] = algoDependencies;
211  }
212 
213  if ( m_showDataDeps ) {
214  info() << ostdd.str() << endmsg;
215  }
216 
217  // Check if we have unmet global input dependencies, and, optionally, heal them
218  // WARNING: this step must be done BEFORE the Precedence Service is initialized
219  if ( m_checkDeps ) {
220  DataObjIDColl unmetDep;
221  for ( auto o : globalInp )
222  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
223 
224  if ( unmetDep.size() > 0 ) {
225 
226  std::ostringstream ost;
227  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
228  ost << "\n o " << *o << " required by Algorithm: ";
229 
230  for ( const auto& p : algosDependenciesMap )
231  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
232  }
233 
234  if ( !m_useDataLoader.empty() ) {
235 
236  // Find the DataLoader Alg
237  IAlgorithm* dataLoaderAlg( nullptr );
238  for ( IAlgorithm* algo : algos )
239  if ( algo->name() == m_useDataLoader ) {
240  dataLoaderAlg = algo;
241  break;
242  }
243 
244  if ( dataLoaderAlg == nullptr ) {
245  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
246  << "\" found, and unmet INPUT dependencies "
247  << "detected:\n"
248  << ost.str() << endmsg;
249  return StatusCode::FAILURE;
250  }
251 
252  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
253  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
254 
255  // Set the property Load of DataLoader Alg
256  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
257  if ( !dataAlg ) {
258  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
259  return StatusCode::FAILURE;
260  }
261 
262  for ( auto& id : unmetDep ) {
263  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
264  << dataLoaderAlg->name() << endmsg;
266  }
267 
268  } else {
269  fatal() << "Auto DataLoading not requested, "
270  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
271  return StatusCode::FAILURE;
272  }
273 
274  } else {
275  info() << "No unmet INPUT data dependencies were found" << endmsg;
276  }
277  }
278 
279  // Get the precedence service
280  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
281  if ( !m_precSvc.isValid() ) {
282  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
283  return StatusCode::FAILURE;
284  }
285  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
286  if ( !precSvc ) {
287  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290 
291  // Fill the containers to convert algo names to index
292  m_algname_vect.resize( algsNumber );
293  for ( IAlgorithm* algo : algos ) {
294  const std::string& name = algo->name();
295  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
296  m_algname_index_map[name] = index;
297  m_algname_vect.at( index ) = name;
298  }
299 
300  // Shortcut for the message service
301  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
302  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
303 
304  m_eventSlots.assign( m_maxEventsInFlight,
305  EventSlot( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc ) );
306  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
307 
308  if ( m_threadPoolSize > 1 ) {
309  m_maxAlgosInFlight = (size_t)m_threadPoolSize;
310  }
311 
312  // Clearly inform about the level of concurrency
313  info() << "Concurrency level information:" << endmsg;
314  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
315  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
316 
317  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
318 
319  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
320 
321  // Simulate execution flow
322  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
323 
324  return sc;
325 }
326 //---------------------------------------------------------------------------
327 
332 {
333 
335  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
336 
337  sc = deactivate();
338  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
339 
340  info() << "Joining Scheduler thread" << endmsg;
341  m_thread.join();
342 
343  // Final error check after thread pool termination
344  if ( m_isActive == FAILURE ) {
345  error() << "problems in scheduler thread" << endmsg;
346  return StatusCode::FAILURE;
347  }
348 
349  return sc;
350 }
351 //---------------------------------------------------------------------------
363 {
364 
365  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
366 
367  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
368  error() << "problems initializing ThreadPoolSvc" << endmsg;
369  m_isActive = FAILURE;
370  return;
371  }
372 
373  // Wait for actions pushed into the queue by finishing tasks.
374  action thisAction;
376 
377  m_isActive = ACTIVE;
378 
379  // Continue to wait if the scheduler is running or there is something to do
380  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
381  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
382  m_actionsQueue.pop( thisAction );
383  sc = thisAction();
384  ON_VERBOSE
385  {
386  if ( sc != StatusCode::SUCCESS )
387  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
388  else
389  verbose() << "Action succeeded." << endmsg;
390  }
391  }
392 
393  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
394  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
395  error() << "Problems terminating thread pool" << endmsg;
396  m_isActive = FAILURE;
397  }
398 }
399 
400 //---------------------------------------------------------------------------
401 
409 {
410 
411  if ( m_isActive == ACTIVE ) {
412  // Drain the scheduler
413  m_actionsQueue.push( [this]() { return this->m_drain(); } );
414  // This would be the last action
415  m_actionsQueue.push( [this]() -> StatusCode {
416  m_isActive = INACTIVE;
417  return StatusCode::SUCCESS;
418  } );
419  }
420 
421  return StatusCode::SUCCESS;
422 }
423 
424 //===========================================================================
425 
426 //===========================================================================
427 // Utils and shortcuts
428 
429 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
430 
431 //---------------------------------------------------------------------------
432 
433 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname )
434 {
435  unsigned int index = m_algname_index_map[algoname];
436  return index;
437 }
438 
439 //===========================================================================
440 // EventSlot management
448 {
449 
450  if ( m_first ) {
451  m_first = false;
452  }
453 
454  if ( !eventContext ) {
455  fatal() << "Event context is nullptr" << endmsg;
456  return StatusCode::FAILURE;
457  }
458 
459  if ( m_freeSlots.load() == 0 ) {
460  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
461  return StatusCode::FAILURE;
462  }
463 
464  // no problem as push new event is only called from one thread (event loop manager)
465  m_freeSlots--;
466 
467  auto action = [this, eventContext]() -> StatusCode {
468  // Event processing slot forced to be the same as the wb slot
469  const unsigned int thisSlotNum = eventContext->slot();
470  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
471  if ( !thisSlot.complete ) {
472  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
473  return StatusCode::FAILURE;
474  }
475 
476  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
477  thisSlot.reset( eventContext );
478 
479  // Result status code:
481 
482  // promote to CR and DR the initial set of algorithms
483  Cause cs = {Cause::source::Root, "RootDecisionHub"};
484  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
485  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
486  result = StatusCode::FAILURE;
487  }
488 
489  if ( this->updateStates( thisSlotNum ).isFailure() ) {
490  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
491  result = StatusCode::FAILURE;
492  }
493 
494  return result;
495  }; // end of lambda
496 
497  // Kick off the scheduling!
498  ON_VERBOSE
499  {
500  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
501  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
502  }
503  m_actionsQueue.push( action );
504 
505  return StatusCode::SUCCESS;
506 }
507 
508 //---------------------------------------------------------------------------
510 {
511  StatusCode sc;
512  for ( auto context : eventContexts ) {
513  sc = pushNewEvent( context );
514  if ( sc != StatusCode::SUCCESS ) return sc;
515  }
516  return sc;
517 }
518 
519 //---------------------------------------------------------------------------
520 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
521 
522 //---------------------------------------------------------------------------
527 {
528 
529  unsigned int slotNum = 0;
530  for ( auto& thisSlot : m_eventSlots ) {
531  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
532  updateStates( slotNum );
533  }
534  slotNum++;
535  }
536  return StatusCode::SUCCESS;
537 }
538 
539 //---------------------------------------------------------------------------
544 {
545  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
546  if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
547  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
548  // << " active: " << m_isActive << endmsg;
549  return StatusCode::FAILURE;
550  } else {
551  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
552  // << " active: " << m_isActive << endmsg;
553  m_finishedEvents.pop( eventContext );
554  m_freeSlots++;
555  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
556  return StatusCode::SUCCESS;
557  }
558 }
559 
560 //---------------------------------------------------------------------------
565 {
566  if ( m_finishedEvents.try_pop( eventContext ) ) {
567  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
568  << endmsg;
569  m_freeSlots++;
570  return StatusCode::SUCCESS;
571  }
572  return StatusCode::FAILURE;
573 }
574 
575 //---------------------------------------------------------------------------
582 {
583 
584  // Set the number of slots available to an error code
585  m_freeSlots.store( 0 );
586 
587  const uint slotIdx = eventContext->slot();
588 
589  fatal() << "*** Event " << eventContext->evt() << " on slot " << slotIdx << " failed! ***" << endmsg;
590 
591  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
592 
593  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
594  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
595 
596  // Empty queue and deactivate the service
597  action thisAction;
598  while ( m_actionsQueue.try_pop( thisAction ) ) {
599  };
600  deactivate();
601 
602  // Push into the finished events queue the failed context
603  EventContext* thisEvtContext;
604  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
605  m_finishedEvents.push( thisEvtContext );
606  };
607  m_finishedEvents.push( eventContext );
608 
609  return StatusCode::FAILURE;
610 }
611 
612 //===========================================================================
613 
614 //===========================================================================
615 // States Management
616 
626 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, EventContext* inputContext )
627 {
628 
629  StatusCode global_sc( StatusCode::SUCCESS );
630 
631  // Sort from the oldest to the newest event
632  // Prepare a vector of pointers to the slots to avoid copies
633  std::vector<EventSlot*> eventSlotsPtrs;
634 
635  // Consider all slots if si <0 or just one otherwise
636  if ( si < 0 ) {
637  const int eventsSlotsSize( m_eventSlots.size() );
638  eventSlotsPtrs.reserve( eventsSlotsSize );
639  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
640  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
641  }
642  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
643  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
644  } else {
645  eventSlotsPtrs.push_back( &m_eventSlots[si] );
646  }
647 
648  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
649  int iSlot = thisSlotPtr->eventContext->slot();
650 
651  // Cache the states of the algos to improve readability and performance
652  auto& thisSlot = m_eventSlots[iSlot];
653  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
654 
655  // Perform the I->CR->DR transitions
656  if ( algo_index >= 0 ) {
657  Cause cs = {Cause::source::Task, index2algname( algo_index )};
658 
659  // Pass sub-slots to precedence service if necessary
660  if ( !inputContext || iSlot != (int)inputContext->slot() || inputContext == thisSlot.eventContext ) {
661  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
662  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
663  global_sc = StatusCode::FAILURE;
664  }
665  } else {
666  // An input context that doesn't match the event context for that slot number implies a sub-slot
667  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
668  if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
669  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot << endmsg;
670  global_sc = StatusCode::FAILURE;
671  }
672  }
673  }
674 
675  StatusCode partial_sc( StatusCode::FAILURE, true );
676 
677  // Perform DR->SCHEDULED
678  if ( !m_optimizationMode.empty() ) {
679  auto comp_nodes = [this]( const uint& i, const uint& j ) {
680  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
681  };
683  comp_nodes, std::vector<uint>() );
684  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
685  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
686  buffer.push( *it );
687  while ( !buffer.empty() ) {
688  bool IOBound = false;
689  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
690 
691  if ( !IOBound )
692  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
693  else
694  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
695 
696  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
697  << "Could not apply transition from "
698  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
699  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
700 
701  buffer.pop();
702  }
703 
704  } else {
705  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
706  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
707  uint algIndex = *it;
708 
709  bool IOBound = false;
710  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
711 
712  if ( !IOBound )
713  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
714  else
715  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
716 
717  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
718  << "Could not apply transition from "
719  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
720  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
721  }
722  }
723 
724  // Check for algorithms ready in sub-slots
725  if ( thisSlot.subSlotAlgsReady.size() ) {
726  // Any data-ready algorithms that don't get scheduled need to be retried later
728  failedAlgs.reserve( thisSlot.subSlotAlgsReady.size() );
729 
730  // Loop with iterator so we can use it for a fast append if needed
731  for ( auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
732  ++contextAlgPair ) {
733  if ( m_algosInFlight < m_maxAlgosInFlight ) {
734  partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
735 
736  // Add the alg back into the ready list if scheduling failed
737  if ( partial_sc.isFailure() ) failedAlgs.push_back( *contextAlgPair );
738  } else {
739  // Don't loop through all remaining algs if we're already busy
740  failedAlgs.insert( failedAlgs.end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
741  break;
742  }
743  }
744 
745  // Update ready list
746  thisSlot.subSlotAlgsReady = failedAlgs;
747  }
748 
749  if ( m_dumpIntraEventDynamics ) {
751  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
752  << thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
753  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
754  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
755  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
756  std::ofstream myfile;
757  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
758  myfile << s.str();
759  myfile.close();
760  }
761 
762  // Not complete because this would mean that the slot is already free!
763  if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
764  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
765  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
766  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
767  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
768 
769  thisSlot.complete = true;
770  // if the event did not fail, add it to the finished events
771  // otherwise it is taken care of in the error handling already
772  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
773  m_finishedEvents.push( thisSlot.eventContext );
774  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
775  << thisSlot.eventContext->slot() << ")." << endmsg;
776  }
777 
778  // now let's return the fully evaluated result of the control flow
779  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
780 
781  thisSlot.eventContext = nullptr;
782  } else {
783  StatusCode eventStalledSC = isStalled( iSlot );
784  if ( eventStalledSC.isFailure() ) {
785  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
786  eventFailed( thisSlot.eventContext ).ignore();
787  }
788  }
789  } // end loop on slots
790 
791  ON_VERBOSE verbose() << "States Updated." << endmsg;
792 
793  return global_sc;
794 }
795 
796 //---------------------------------------------------------------------------
797 
805 {
806  // Get the slot
807  EventSlot& thisSlot = m_eventSlots[iSlot];
808 
809  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
810  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
812 
813  info() << "About to declare a stall" << endmsg;
814  fatal() << "*** Stall detected! ***\n" << endmsg;
815 
816  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
817 
818  return StatusCode::FAILURE;
819  }
820  return StatusCode::SUCCESS;
821 }
822 
823 //---------------------------------------------------------------------------
824 
830 {
831 
832  // To have just one big message
833  std::ostringstream outputMS;
834 
835  outputMS << "Dumping scheduler state\n"
836  << "=========================================================================================\n"
837  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
838  << "=========================================================================================\n\n";
839 
840  //===========================================================================
841 
842  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
843  << "------------------\n\n";
844 
845  // Figure if TimelineSvc is available (used below to detect threads IDs)
846  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
847  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
848  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
849  } else {
850 
851  // Figure optimal printout layout
852  size_t indt( 0 );
853  for ( auto& slot : m_eventSlots )
854  for ( auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
855  it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it )
856  if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
857 
858  // Figure the last running schedule across all slots
859  for ( auto& slot : m_eventSlots ) {
860  for ( auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
861  it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it ) {
862 
863  const std::string algoName{index2algname( (uint)*it )};
864 
865  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
866  << slot.eventContext->slot();
867 
868  // Try to get POSIX threads IDs the currently running tasks are scheduled to
869  if ( timelineSvc.isValid() ) {
870  TimelineEvent te{};
871  te.algorithm = algoName;
872  te.slot = slot.eventContext->slot();
873  te.event = slot.eventContext->evt();
874 
875  if ( timelineSvc->getTimelineEvent( te ) )
876  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
877  else
878  outputMS << " thread.id: [unknown]"; // this means a task has just
879  // been signed off as SCHEDULED,
880  // but has not been assigned to a thread yet
881  // (i.e., not running yet)
882  }
883  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
884  }
885  }
886  }
887 
888  //===========================================================================
889 
890  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
891  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
892 
893  int slotCount = -1;
894  for ( auto& slot : m_eventSlots ) {
895  slotCount++;
896  if ( slot.complete ) continue;
897 
898  outputMS << "[ slot: "
899  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
900  << " event: "
901  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
902  << " ]:\n\n";
903 
904  if ( 0 > iSlot or iSlot == slotCount ) {
905 
906  // Snapshot of the Control Flow and FSM states
907  outputMS << m_precSvc->printState( slot ) << "\n";
908 
909  // Mention sub slots
910  if ( slot.allSubSlots.size() ) {
911  outputMS << "\nNumber of sub-slots:" << slot.allSubSlots.size() << "\n";
912  outputMS << "Sub-slot algorithms ready:" << slot.subSlotAlgsReady.size() << "\n";
913  }
914  }
915  }
916 
917  //===========================================================================
918 
919  if ( 0 <= iSlot ) {
920  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
921  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
922  }
923 
924  outputMS << "\n=========================================================================================\n"
925  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
926  << "=========================================================================================\n\n";
927 
928  info() << outputMS.str() << endmsg;
929 }
930 
931 //---------------------------------------------------------------------------
932 
933 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
934 {
935 
936  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
937 
938  const std::string& algName( index2algname( iAlgo ) );
939  IAlgorithm* ialgoPtr = nullptr;
940  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
941 
942  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
943 
944  ++m_algosInFlight;
945  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
946  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
947  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
948  } );
949  return StatusCode::SUCCESS;
950  };
951 
952  // Avoid to use tbb if the pool size is 1 and run in this thread
953  if ( -100 != m_threadPoolSize ) {
954  // the child task that executes an Algorithm
955  tbb::task* algoTask = new ( tbb::task::allocate_root() )
956  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
957  // schedule the algoTask
958  tbb::task::enqueue( *algoTask );
959 
960  } else {
961  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
962  theTask.execute();
963  }
964 
965  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
966  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
967 
968  // Update states in the appropriate event slot
969  StatusCode updateSc;
970  EventSlot& thisSlot = m_eventSlots[si];
971  if ( eventContext == thisSlot.eventContext ) {
972  // Event level (standard behaviour)
973  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
974  } else {
975  // Sub-slot
976  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
977  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
978  }
979 
980  ON_VERBOSE dumpSchedulerState( -1 );
981 
982  if ( updateSc.isSuccess() )
983  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
984  return updateSc;
985  } else {
986  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
987  << endmsg;
988  return sc;
989  }
990 }
991 
992 //---------------------------------------------------------------------------
993 
994 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
995 {
996 
997  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
998 
999  // bool IOBound = m_precSvc->isBlocking(algName);
1000 
1001  const std::string& algName( index2algname( iAlgo ) );
1002  IAlgorithm* ialgoPtr = nullptr;
1003  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
1004 
1005  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
1006 
1007  ++m_IOBoundAlgosInFlight;
1008  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
1009  // tbb::task)? it seems it works..
1010  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
1011  IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
1012  m_IOBoundAlgScheduler->push( *theTask );
1013 
1014  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
1015  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1016 
1017  // Update states in the appropriate event slot
1018  StatusCode updateSc;
1019  EventSlot& thisSlot = m_eventSlots[si];
1020  if ( eventContext == thisSlot.eventContext ) {
1021  // Event level (standard behaviour)
1022  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
1023  } else {
1024  // Sub-slot
1025  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1026  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
1027  }
1028 
1029  ON_VERBOSE if ( updateSc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algName
1030  << " to SCHEDULED on slot " << si << endmsg;
1031  return updateSc;
1032  } else {
1033  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1034  << " on slot " << si << endmsg;
1035  return sc;
1036  }
1037 }
1038 
1039 //---------------------------------------------------------------------------
1044  EventContext* eventContext )
1045 {
1046  // Check if the execution failed
1047  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1048 
1049  Gaudi::Hive::setCurrentContext( eventContext );
1050  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1051 
1052  if ( sc.isFailure() ) {
1053  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1054  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1055  return StatusCode::FAILURE;
1056  }
1057 
1058  m_algosInFlight--;
1059 
1060  EventSlot& thisSlot = m_eventSlots[si];
1061 
1062  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1063 
1064  State state = algo->filterPassed() ? State::EVTACCEPTED : State::EVTREJECTED;
1065 
1066  // Update states in the appropriate slot
1067  if ( eventContext == thisSlot.eventContext ) {
1068  // Event level (standard behaviour)
1069  sc = thisSlot.algsStates.updateState( iAlgo, state );
1070  } else {
1071  // Sub-slot
1072  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1073  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1074  }
1075 
1076  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "Promoting " << algo->name() << " on slot " << si << " to "
1078 
1079  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1080  << m_algosInFlight << endmsg;
1081 
1082  // Schedule an update of the status of the algorithms
1083  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1084 
1085  return sc;
1086 }
1087 
1088 //---------------------------------------------------------------------------
1093  EventContext* eventContext )
1094 {
1095  // Check if the execution failed
1096  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1097 
1098  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1099 
1100  if ( sc.isFailure() ) {
1101  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1102  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1103  return StatusCode::FAILURE;
1104  }
1105 
1106  m_IOBoundAlgosInFlight--;
1107 
1108  EventSlot& thisSlot = m_eventSlots[si];
1109 
1110  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1111  << endmsg;
1112 
1113  State state = algo->filterPassed() ? State::EVTACCEPTED : State::EVTREJECTED;
1114 
1115  // Update states in the appropriate slot
1116  if ( eventContext == thisSlot.eventContext ) {
1117  // Event level (standard behaviour)
1118  sc = thisSlot.algsStates.updateState( iAlgo, state );
1119  } else {
1120  // Sub-slot
1121  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1122  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1123  }
1124 
1125  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si
1126  << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1127 
1128  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1129  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1130 
1131  // Schedule an update of the status of the algorithms
1132  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1133 
1134  return sc;
1135 }
1136 
1137 // Method to inform the scheduler about event views
1138 //===========================================================================
1140  EventContext* viewContext )
1141 {
1142  // Find the top-level slot, to attach the sub-slot to
1143  int const topSlotIndex = sourceContext->slot();
1144  EventSlot& topSlot = m_eventSlots[topSlotIndex];
1145 
1146  // Prevent view nesting - this doesn't work because EventContext is copied when passed to algorithm
1147  /*if ( sourceContext != topSlot.eventContext )
1148  {
1149  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1150  return StatusCode::FAILURE;
1151  }*/
1152 
1153  if ( viewContext ) {
1154  // Make new slot by copying the top slot
1155  unsigned int lastIndex = topSlot.allSubSlots.size();
1156  topSlot.allSubSlots.push_back( EventSlot( m_eventSlots[topSlotIndex], viewContext ) );
1157  topSlot.allSubSlots.back().entryPoint = nodeName;
1158 
1159  // Store index of the new slot in lookup structures
1160  topSlot.contextToSlot[viewContext] = lastIndex;
1161  topSlot.subSlotsByNode[nodeName].push_back( lastIndex );
1162  } else {
1163  // Disable the view node if there are no views
1164  topSlot.subSlotsByNode[nodeName] = std::vector<unsigned int>( 0 );
1165  }
1166 
1167  return StatusCode::SUCCESS;
1168 }
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:65
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:746
StatusCode finalize() override
Definition: Service.cpp:173
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Definition: StatusCode.h:287
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
std::string algorithm
Definition: ITimelineSvc.h:21
void activate()
Activate scheduler.
T end(T...args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
Definition: EventSlot.h:58
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
STL class.
bool isFailure() const
Definition: StatusCode.h:139
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
#define DECLARE_COMPONENT(type)
Definition: PluginService.h:33
T setw(T...args)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
virtual StatusCode scheduleEventView(EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
Method to inform the scheduler about event views.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
T push_back(T...args)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
Definition: EventSlot.h:62
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
StatusCode finalize() override
Finalise.
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
Definition: EventSlot.h:56
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
STL class.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:33
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
T back(T...args)
string s
Definition: gaudirun.py:253
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
unsigned int freeSlots() override
Get free slots number.
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
T for_each(T...args)
std::string fullKey() const
Definition: DataObjID.cpp:99
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
T reserve(T...args)
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
#define ON_VERBOSE
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)