The Gaudi Framework  v30r1 (5d4f4ae2)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 #include "AlgoExecutionTask.h"
3 #include "IOBoundAlgTask.h"
4 
5 // Framework includes
7 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
10 #include "GaudiKernel/IAlgorithm.h"
12 #include "GaudiKernel/SvcFactory.h"
14 
15 // C++
16 #include <algorithm>
17 #include <map>
18 #include <queue>
19 #include <sstream>
20 #include <unordered_set>
21 
22 // External libs
23 #include "boost/algorithm/string.hpp"
24 #include "boost/thread.hpp"
25 #include "boost/tokenizer.hpp"
26 // DP waiting for the TBB service
27 #include "tbb/task_scheduler_init.h"
28 
31 
32 // Instantiation of a static factory class used by clients to create instances of this service
34 
35 namespace
36 {
37  struct DataObjIDSorter {
38  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
39  };
40 
41  // Sort a DataObjIDColl in a well-defined, reproducible manner.
42  // Used for making debugging dumps.
43  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
44  {
46  v.reserve( coll.size() );
47  for ( const DataObjID& id : coll ) v.push_back( &id );
48  std::sort( v.begin(), v.end(), DataObjIDSorter() );
49  return v;
50  }
51 }
52 
53 //===========================================================================
54 // Infrastructure methods
55 
62 {
63 
64  // Initialise mother class (read properties, ...)
66  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
67 
68  // Get hold of the TBBSvc. This should initialize the thread pool
69  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
70  if ( !m_threadPoolSvc.isValid() ) {
71  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
72  return StatusCode::FAILURE;
73  }
74 
75  // Activate the scheduler in another thread.
76  info() << "Activating scheduler in a separate thread" << endmsg;
77  m_thread = std::thread( [this]() { this->activate(); } );
78 
79  while ( m_isActive != ACTIVE ) {
80  if ( m_isActive == FAILURE ) {
81  fatal() << "Terminating initialization" << endmsg;
82  return StatusCode::FAILURE;
83  } else {
84  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
85  sleep( 1 );
86  }
87  }
88 
89  if ( m_enableCondSvc ) {
90  // Get hold of the CondSvc
91  m_condSvc = serviceLocator()->service( "CondSvc" );
92  if ( !m_condSvc.isValid() ) {
93  warning() << "No CondSvc found, or not enabled. "
94  << "Will not manage CondAlgorithms" << endmsg;
95  m_enableCondSvc = false;
96  }
97  }
98 
99  // Get the algo resource pool
100  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
101  if ( !m_algResourcePool.isValid() ) {
102  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
107  if ( !m_algExecStateSvc.isValid() ) {
108  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
109  return StatusCode::FAILURE;
110  }
111 
112  // Get Whiteboard
113  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
114  if ( !m_whiteboard.isValid() ) {
115  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Get dedicated scheduler for I/O-bound algorithms
120  if ( m_useIOBoundAlgScheduler ) {
121  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
122  if ( !m_IOBoundAlgScheduler.isValid() )
123  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
124  }
125 
126  // Set the MaxEventsInFlight parameters from the number of WB stores
127  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
128 
129  // Set the number of free slots
130  m_freeSlots = m_maxEventsInFlight;
131 
132  // set global concurrency flags
134 
135  // Get the list of algorithms
136  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
137  const unsigned int algsNumber = algos.size();
138  info() << "Found " << algsNumber << " algorithms" << endmsg;
139 
140  /* Dependencies
141  1) Look for handles in algo, if none
142  2) Assume none are required
143  */
144 
145  DataObjIDColl globalInp, globalOutp;
146 
147  // figure out all outputs
148  for ( IAlgorithm* ialgoPtr : algos ) {
149  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
150  if ( !algoPtr ) {
151  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
152  }
153  for ( auto id : algoPtr->outputDataObjs() ) {
154  auto r = globalOutp.insert( id );
155  if ( !r.second ) {
156  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
157  "though, or control flow may guarantee only one runs...!"
158  << endmsg;
159  }
160  }
161  }
162 
163  std::ostringstream ostdd;
164  ostdd << "Data Dependencies for Algorithms:";
165 
167  for ( IAlgorithm* ialgoPtr : algos ) {
168  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
169  if ( nullptr == algoPtr ) {
170  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
171  << ": this will result in a crash." << endmsg;
172  return StatusCode::FAILURE;
173  }
174 
175  ostdd << "\n " << algoPtr->name();
176 
177  DataObjIDColl algoDependencies;
178  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
179  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
180  DataObjID id = *idp;
181  ostdd << "\n o INPUT " << id;
182  if ( id.key().find( ":" ) != std::string::npos ) {
183  ostdd << " contains alternatives which require resolution...\n";
184  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
185  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
186  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
187  } );
188  if ( itok != tokens.end() ) {
189  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
190  id.updateKey( *itok );
191  } else {
192  error() << "failed to find alternate in global output list"
193  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
194  m_showDataDeps = true;
195  }
196  }
197  algoDependencies.insert( id );
198  globalInp.insert( id );
199  }
200  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
201  ostdd << "\n o OUTPUT " << *id;
202  if ( id->key().find( ":" ) != std::string::npos ) {
203  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
204  << endmsg;
205  m_showDataDeps = true;
206  }
207  }
208  } else {
209  ostdd << "\n none";
210  }
211  algosDependenciesMap[algoPtr->name()] = algoDependencies;
212  }
213 
214  if ( m_showDataDeps ) {
215  info() << ostdd.str() << endmsg;
216  }
217 
218  // Check if we have unmet global input dependencies, and, optionally, heal them
219  // WARNING: this step must be done BEFORE the Precedence Service is initialized
220  if ( m_checkDeps ) {
221  DataObjIDColl unmetDep;
222  for ( auto o : globalInp )
223  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
224 
225  if ( unmetDep.size() > 0 ) {
226 
227  std::ostringstream ost;
228  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
229  ost << "\n o " << *o << " required by Algorithm: ";
230 
231  for ( const auto& p : algosDependenciesMap )
232  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
233  }
234 
235  if ( !m_useDataLoader.empty() ) {
236 
237  // Find the DataLoader Alg
238  IAlgorithm* dataLoaderAlg( nullptr );
239  for ( IAlgorithm* algo : algos )
240  if ( algo->name() == m_useDataLoader ) {
241  dataLoaderAlg = algo;
242  break;
243  }
244 
245  if ( dataLoaderAlg == nullptr ) {
246  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
247  << "\" found, and unmet INPUT dependencies "
248  << "detected:\n"
249  << ost.str() << endmsg;
250  return StatusCode::FAILURE;
251  }
252 
253  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
254  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
255 
256  // Set the property Load of DataLoader Alg
257  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
258  if ( !dataAlg ) {
259  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
260  return StatusCode::FAILURE;
261  }
262 
263  for ( auto& id : unmetDep ) {
264  debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
265  << endmsg;
267  }
268 
269  } else {
270  fatal() << "Auto DataLoading not requested, "
271  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  } else {
276  info() << "No unmet INPUT data dependencies were found" << endmsg;
277  }
278  }
279 
280  // Get the precedence service
281  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
282  if ( !m_precSvc.isValid() ) {
283  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
284  return StatusCode::FAILURE;
285  }
286  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
287  if ( !precSvc ) {
288  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
289  return StatusCode::FAILURE;
290  }
291 
292  // Fill the containers to convert algo names to index
293  m_algname_vect.resize( algsNumber );
294  for ( IAlgorithm* algo : algos ) {
295  const std::string& name = algo->name();
296  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
297  m_algname_index_map[name] = index;
298  m_algname_vect.at( index ) = name;
299  }
300 
301  // Shortcut for the message service
302  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
303  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
304 
305  m_eventSlots.assign( m_maxEventsInFlight,
306  EventSlot( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc ) );
307  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
308 
309  if ( m_threadPoolSize > 1 ) {
310  m_maxAlgosInFlight = (size_t)m_threadPoolSize;
311  }
312 
313  // Clearly inform about the level of concurrency
314  info() << "Concurrency level information:" << endmsg;
315  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
316  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
317 
318  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
319 
320  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
321 
322  // Simulate execution flow
323  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
324 
325  return sc;
326 }
327 //---------------------------------------------------------------------------
328 
333 {
334 
336  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
337 
338  sc = deactivate();
339  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
340 
341  info() << "Joining Scheduler thread" << endmsg;
342  m_thread.join();
343 
344  // Final error check after thread pool termination
345  if ( m_isActive == FAILURE ) {
346  error() << "problems in scheduler thread" << endmsg;
347  return StatusCode::FAILURE;
348  }
349 
350  return sc;
351 }
352 //---------------------------------------------------------------------------
364 {
365 
366  if ( msgLevel( MSG::DEBUG ) ) debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
367 
368  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369  error() << "problems initializing ThreadPoolSvc" << endmsg;
370  m_isActive = FAILURE;
371  return;
372  }
373 
374  // Wait for actions pushed into the queue by finishing tasks.
375  action thisAction;
377 
378  m_isActive = ACTIVE;
379 
380  // Continue to wait if the scheduler is running or there is something to do
381  info() << "Start checking the actionsQueue" << endmsg;
382  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383  m_actionsQueue.pop( thisAction );
384  sc = thisAction();
385  if ( sc != StatusCode::SUCCESS )
386  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
387  else
388  verbose() << "Action succeeded." << endmsg;
389  }
390 
391  info() << "Terminating thread-pool resources" << endmsg;
392  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393  error() << "Problems terminating thread pool" << endmsg;
394  m_isActive = FAILURE;
395  }
396 }
397 
398 //---------------------------------------------------------------------------
399 
407 {
408 
409  if ( m_isActive == ACTIVE ) {
410  // Drain the scheduler
411  m_actionsQueue.push( [this]() { return this->m_drain(); } );
412  // This would be the last action
413  m_actionsQueue.push( [this]() -> StatusCode {
414  m_isActive = INACTIVE;
415  return StatusCode::SUCCESS;
416  } );
417  }
418 
419  return StatusCode::SUCCESS;
420 }
421 
422 //===========================================================================
423 
424 //===========================================================================
425 // Utils and shortcuts
426 
427 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
428 
429 //---------------------------------------------------------------------------
430 
431 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname )
432 {
433  unsigned int index = m_algname_index_map[algoname];
434  return index;
435 }
436 
437 //===========================================================================
438 // EventSlot management
446 {
447 
448  if ( m_first ) {
449  m_first = false;
450  }
451 
452  if ( !eventContext ) {
453  fatal() << "Event context is nullptr" << endmsg;
454  return StatusCode::FAILURE;
455  }
456 
457  if ( m_freeSlots.load() == 0 ) {
458  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
459  return StatusCode::FAILURE;
460  }
461 
462  // no problem as push new event is only called from one thread (event loop manager)
463  m_freeSlots--;
464 
465  auto action = [this, eventContext]() -> StatusCode {
466  // Event processing slot forced to be the same as the wb slot
467  const unsigned int thisSlotNum = eventContext->slot();
468  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
469  if ( !thisSlot.complete ) {
470  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
475  thisSlot.reset( eventContext );
476 
477  // Result status code:
479 
480  // promote to CR and DR the initial set of algorithms
481  Cause cs = {Cause::source::Root, "RootDecisionHub"};
482  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
484  result = StatusCode::FAILURE;
485  }
486 
487  if ( this->updateStates( thisSlotNum ).isFailure() ) {
488  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
489  result = StatusCode::FAILURE;
490  }
491 
492  return result;
493  }; // end of lambda
494 
495  // Kick off the scheduling!
496  if ( msgLevel( MSG::VERBOSE ) ) {
497  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
498  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
499  }
500  m_actionsQueue.push( action );
501 
502  return StatusCode::SUCCESS;
503 }
504 
505 //---------------------------------------------------------------------------
507 {
508  StatusCode sc;
509  for ( auto context : eventContexts ) {
510  sc = pushNewEvent( context );
511  if ( sc != StatusCode::SUCCESS ) return sc;
512  }
513  return sc;
514 }
515 
516 //---------------------------------------------------------------------------
517 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
518 
519 //---------------------------------------------------------------------------
524 {
525 
526  unsigned int slotNum = 0;
527  for ( auto& thisSlot : m_eventSlots ) {
528  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
529  updateStates( slotNum );
530  }
531  slotNum++;
532  }
533  return StatusCode::SUCCESS;
534 }
535 
536 //---------------------------------------------------------------------------
541 {
542  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
543  if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
544  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
545  // << " active: " << m_isActive << endmsg;
546  return StatusCode::FAILURE;
547  } else {
548  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
549  // << " active: " << m_isActive << endmsg;
550  m_finishedEvents.pop( eventContext );
551  m_freeSlots++;
552  if ( msgLevel( MSG::DEBUG ) )
553  debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
554  return StatusCode::SUCCESS;
555  }
556 }
557 
558 //---------------------------------------------------------------------------
563 {
564  if ( m_finishedEvents.try_pop( eventContext ) ) {
565  if ( msgLevel( MSG::DEBUG ) )
566  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
567  << endmsg;
568  m_freeSlots++;
569  return StatusCode::SUCCESS;
570  }
571  return StatusCode::FAILURE;
572 }
573 
574 //---------------------------------------------------------------------------
581 {
582 
583  // Set the number of slots available to an error code
584  m_freeSlots.store( 0 );
585 
586  fatal() << "*** Event " << eventContext->evt() << " on slot " << eventContext->slot() << " failed! ***" << endmsg;
587 
588  std::ostringstream ost;
589  m_algExecStateSvc->dump( ost, *eventContext );
590 
591  info() << "Dumping Alg Exec State for slot " << eventContext->slot() << ":\n" << ost.str() << endmsg;
592 
593  dumpSchedulerState( -1 );
594  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
595  m_precSvc->dumpPrecedenceRules( m_eventSlots[eventContext->slot()] );
596 
597  // Empty queue and deactivate the service
598  action thisAction;
599  while ( m_actionsQueue.try_pop( thisAction ) ) {
600  };
601  deactivate();
602 
603  // Push into the finished events queue the failed context
604  EventContext* thisEvtContext;
605  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
606  m_finishedEvents.push( thisEvtContext );
607  };
608  m_finishedEvents.push( eventContext );
609 
610  return StatusCode::FAILURE;
611 }
612 
613 //===========================================================================
614 
615 //===========================================================================
616 // States Management
617 
627 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, EventContext* inputContext )
628 {
629 
630  StatusCode global_sc( StatusCode::SUCCESS );
631 
632  // Sort from the oldest to the newest event
633  // Prepare a vector of pointers to the slots to avoid copies
634  std::vector<EventSlot*> eventSlotsPtrs;
635 
636  // Consider all slots if si <0 or just one otherwise
637  if ( si < 0 ) {
638  const int eventsSlotsSize( m_eventSlots.size() );
639  eventSlotsPtrs.reserve( eventsSlotsSize );
640  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
641  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
642  }
643  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
644  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
645  } else {
646  eventSlotsPtrs.push_back( &m_eventSlots[si] );
647  }
648 
649  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
650  int iSlot = thisSlotPtr->eventContext->slot();
651 
652  // Cache the states of the algos to improve readability and performance
653  auto& thisSlot = m_eventSlots[iSlot];
654  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
655 
656  // Perform the I->CR->DR transitions
657  if ( algo_index >= 0 ) {
658  Cause cs = {Cause::source::Task, index2algname( algo_index )};
659 
660  // Pass sub-slots to precedence service if necessary
661  if ( !inputContext || iSlot != (int)inputContext->slot() || inputContext == thisSlot.eventContext ) {
662  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
663  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
664  global_sc = StatusCode::FAILURE;
665  }
666  } else {
667  // An input context that doesn't match the event context for that slot number implies a sub-slot
668  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
669  if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
670  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot << endmsg;
671  global_sc = StatusCode::FAILURE;
672  }
673  }
674  }
675 
676  StatusCode partial_sc( StatusCode::FAILURE, true );
677 
678  // Perform DR->SCHEDULED
679  if ( !m_optimizationMode.empty() ) {
680  auto comp_nodes = [this]( const uint& i, const uint& j ) {
681  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
682  };
684  comp_nodes, std::vector<uint>() );
685  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
686  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
687  buffer.push( *it );
688  while ( !buffer.empty() ) {
689  bool IOBound = false;
690  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
691 
692  if ( !IOBound )
693  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
694  else
695  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
696 
697  if ( msgLevel( MSG::VERBOSE ) )
698  if ( partial_sc.isFailure() )
699  verbose() << "Could not apply transition from "
700  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
701  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
702 
703  buffer.pop();
704  }
705 
706  } else {
707  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
708  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
709  uint algIndex = *it;
710 
711  bool IOBound = false;
712  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
713 
714  if ( !IOBound )
715  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
716  else
717  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
718 
719  if ( msgLevel( MSG::VERBOSE ) )
720  if ( partial_sc.isFailure() )
721  verbose() << "Could not apply transition from "
722  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
723  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
724  }
725  }
726 
727  // Check for algorithms ready in sub-slots
728  if ( thisSlot.subSlotAlgsReady.size() ) {
729  // Any data-ready algorithms that don't get scheduled need to be retried later
731  failedAlgs.reserve( thisSlot.subSlotAlgsReady.size() );
732 
733  // Loop with iterator so we can use it for a fast append if needed
734  for ( auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
735  ++contextAlgPair ) {
736  if ( m_algosInFlight < m_maxAlgosInFlight ) {
737  partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
738 
739  // Add the alg back into the ready list if scheduling failed
740  if ( !partial_sc.isSuccess() ) failedAlgs.push_back( *contextAlgPair );
741  } else {
742  // Don't loop through all remaining algs if we're already busy
743  failedAlgs.insert( failedAlgs.end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
744  break;
745  }
746  }
747 
748  // Update ready list
749  thisSlot.subSlotAlgsReady = failedAlgs;
750  }
751 
752  if ( m_dumpIntraEventDynamics ) {
754  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
755  << thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
756  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
757  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
758  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
759  std::ofstream myfile;
760  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
761  myfile << s.str();
762  myfile.close();
763  }
764 
765  // Not complete because this would mean that the slot is already free!
766  if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
767  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
768  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
769  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
770  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
771 
772  thisSlot.complete = true;
773  // if the event did not fail, add it to the finished events
774  // otherwise it is taken care of in the error handling already
775  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
776  m_finishedEvents.push( thisSlot.eventContext );
777  if ( msgLevel( MSG::DEBUG ) )
778  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot " << thisSlot.eventContext->slot()
779  << ")." << endmsg;
780  }
781 
782  // now let's return the fully evaluated result of the control flow
783  if ( msgLevel( MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) << endmsg;
784 
785  thisSlot.eventContext = nullptr;
786  } else {
787  StatusCode eventStalledSC = isStalled( iSlot );
788  if ( !eventStalledSC.isSuccess() ) {
789  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
790  eventFailed( thisSlot.eventContext ).ignore();
791  }
792  }
793  } // end loop on slots
794 
795  verbose() << "States Updated." << endmsg;
796 
797  return global_sc;
798 }
799 
800 //---------------------------------------------------------------------------
801 
809 {
810  // Get the slot
811  EventSlot& thisSlot = m_eventSlots[iSlot];
812 
813  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
814  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
816 
817  info() << "About to declare a stall" << endmsg;
818  fatal() << "*** Stall detected! ***\n" << endmsg;
819  dumpSchedulerState( iSlot );
820  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
821 
822  return StatusCode::FAILURE;
823  }
824  return StatusCode::SUCCESS;
825 }
826 
827 //---------------------------------------------------------------------------
828 
835 {
836 
837  // To have just one big message
838  std::ostringstream outputMessageStream;
839 
840  outputMessageStream << "============================== Execution Task State ============================="
841  << std::endl;
842  dumpState( outputMessageStream );
843 
844  outputMessageStream << std::endl
845  << "============================== Scheduler State ================================="
846  << std::endl;
847 
848  int slotCount = -1;
849  for ( auto& thisSlot : m_eventSlots ) {
850  slotCount++;
851  if ( thisSlot.complete ) continue;
852 
853  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
854  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
855 
856  if ( 0 > iSlot or iSlot == slotCount ) {
857 
858  // Snapshot of the Control Flow and FSM states
859  outputMessageStream << "\nControl Flow and FSM states:" << std::endl;
860  outputMessageStream << m_precSvc->printState( thisSlot ) << std::endl;
861 
862  // Mention sub slots
863  if ( thisSlot.allSubSlots.size() ) {
864  outputMessageStream << "\nSub-slots:" << thisSlot.allSubSlots.size() << std::endl;
865  outputMessageStream << "Sub-slot algorithms ready:" << thisSlot.subSlotAlgsReady.size() << std::endl;
866  }
867  }
868  }
869 
870  outputMessageStream << "=================================== END ======================================" << std::endl;
871 
872  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
873 }
874 
875 //---------------------------------------------------------------------------
876 
877 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
878 {
879 
880  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
881 
882  const std::string& algName( index2algname( iAlgo ) );
883  IAlgorithm* ialgoPtr = nullptr;
884  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
885 
886  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
887 
888  ++m_algosInFlight;
889  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
890  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
891  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
892  } );
893  return StatusCode::SUCCESS;
894  };
895 
896  // Avoid to use tbb if the pool size is 1 and run in this thread
897  if ( -100 != m_threadPoolSize ) {
898  // the child task that executes an Algorithm
899  tbb::task* algoTask = new ( tbb::task::allocate_root() )
900  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
901  // schedule the algoTask
902  tbb::task::enqueue( *algoTask );
903 
904  } else {
905  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
906  theTask.execute();
907  }
908 
909  if ( msgLevel( MSG::DEBUG ) )
910  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
911  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
912 
913  // Update states in the appropriate event slot
914  StatusCode updateSc;
915  EventSlot& thisSlot = m_eventSlots[si];
916  if ( eventContext == thisSlot.eventContext ) {
917  // Event level (standard behaviour)
918  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
919  } else {
920  // Sub-slot
921  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
922  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
923  }
924 
925  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
926 
927  if ( updateSc.isSuccess() )
928  if ( msgLevel( MSG::VERBOSE ) ) verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
929  return updateSc;
930  } else {
931  if ( msgLevel( MSG::DEBUG ) )
932  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
933  return sc;
934  }
935 }
936 
937 //---------------------------------------------------------------------------
938 
939 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
940 {
941 
942  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
943 
944  // bool IOBound = m_precSvc->isBlocking(algName);
945 
946  const std::string& algName( index2algname( iAlgo ) );
947  IAlgorithm* ialgoPtr = nullptr;
948  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
949 
950  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
951 
952  ++m_IOBoundAlgosInFlight;
953  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
954  // tbb::task)? it seems it works..
955  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
956  IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
957  m_IOBoundAlgScheduler->push( *theTask );
958 
959  if ( msgLevel( MSG::DEBUG ) )
960  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
961  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
962 
963  // Update states in the appropriate event slot
964  StatusCode updateSc;
965  EventSlot& thisSlot = m_eventSlots[si];
966  if ( eventContext == thisSlot.eventContext ) {
967  // Event level (standard behaviour)
968  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
969  } else {
970  // Sub-slot
971  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
972  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
973  }
974 
975  if ( updateSc.isSuccess() )
976  if ( msgLevel( MSG::VERBOSE ) )
977  verbose() << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
978  return updateSc;
979  } else {
980  if ( msgLevel( MSG::DEBUG ) )
981  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
982  << si << endmsg;
983  return sc;
984  }
985 }
986 
987 //---------------------------------------------------------------------------
992  EventContext* eventContext )
993 {
994  // Check if the execution failed
995  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
996 
997  Gaudi::Hive::setCurrentContext( eventContext );
998  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
999 
1000  if ( !sc.isSuccess() ) {
1001  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1002  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1003  return StatusCode::FAILURE;
1004  }
1005 
1006  m_algosInFlight--;
1007 
1008  EventSlot& thisSlot = m_eventSlots[si];
1009 
1010  if ( msgLevel( MSG::DEBUG ) )
1011  debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1012  State state;
1013  if ( algo->filterPassed() ) {
1014  state = State::EVTACCEPTED;
1015  } else {
1016  state = State::EVTREJECTED;
1017  }
1018 
1019  // Update states in the appropriate slot
1020  if ( eventContext == thisSlot.eventContext ) {
1021  // Event level (standard behaviour)
1022  sc = thisSlot.algsStates.updateState( iAlgo, state );
1023  } else {
1024  // Sub-slot
1025  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1026  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1027  }
1028 
1029  if ( sc.isSuccess() )
1030  if ( msgLevel( MSG::VERBOSE ) )
1031  verbose() << "Promoting " << algo->name() << " on slot " << si << " to " << AlgsExecutionStates::stateNames[state]
1032  << endmsg;
1033 
1034  if ( msgLevel( MSG::DEBUG ) )
1035  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1036  << m_algosInFlight << endmsg;
1037 
1038  // Schedule an update of the status of the algorithms
1039  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1040 
1041  return sc;
1042 }
1043 
1044 //---------------------------------------------------------------------------
1049  EventContext* eventContext )
1050 {
1051  // Check if the execution failed
1052  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1053 
1054  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1055 
1056  if ( !sc.isSuccess() ) {
1057  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1058  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1059  return StatusCode::FAILURE;
1060  }
1061 
1062  m_IOBoundAlgosInFlight--;
1063 
1064  EventSlot& thisSlot = m_eventSlots[si];
1065 
1066  if ( msgLevel( MSG::DEBUG ) )
1067  debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1068  State state;
1069  if ( algo->filterPassed() ) {
1070  state = State::EVTACCEPTED;
1071  } else {
1072  state = State::EVTREJECTED;
1073  }
1074 
1075  // Update states in the appropriate slot
1076  if ( eventContext == thisSlot.eventContext ) {
1077  // Event level (standard behaviour)
1078  sc = thisSlot.algsStates.updateState( iAlgo, state );
1079  } else {
1080  // Sub-slot
1081  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1082  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1083  }
1084 
1085  if ( sc.isSuccess() )
1086  if ( msgLevel( MSG::VERBOSE ) )
1087  verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to "
1089 
1090  if ( msgLevel( MSG::DEBUG ) )
1091  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1092  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1093 
1094  // Schedule an update of the status of the algorithms
1095  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1096 
1097  return sc;
1098 }
1099 
1100 //===========================================================================
1102 {
1103 
1104  std::lock_guard<std::mutex> lock( m_ssMut );
1105  m_sState.push_back( SchedulerState( a, e, t ) );
1106 }
1107 
1108 //===========================================================================
1110 {
1111 
1112  std::lock_guard<std::mutex> lock( m_ssMut );
1113 
1114  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1115  if ( *itr == a ) {
1116  m_sState.erase( itr );
1117  return true;
1118  }
1119  }
1120 
1121  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1122  return false;
1123 }
1124 
1125 //===========================================================================
1127 {
1128 
1129  std::lock_guard<std::mutex> lock( m_ssMut );
1130 
1131  for ( auto it : m_sState ) {
1132  ost << " " << it << std::endl;
1133  }
1134 }
1135 
1136 //===========================================================================
1138 {
1139 
1140  std::lock_guard<std::mutex> lock( m_ssMut );
1141 
1142  std::ostringstream ost;
1143  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1144  dumpState( ost );
1145 
1146  info() << ost.str() << endmsg;
1147 }
1148 
1149 // Method to inform the scheduler about event views
1150 //===========================================================================
1152  EventContext* viewContext )
1153 {
1154  // Find the top-level slot, to attach the sub-slot to
1155  int const topSlotIndex = sourceContext->slot();
1156  EventSlot& topSlot = m_eventSlots[topSlotIndex];
1157 
1158  // Prevent view nesting - this doesn't work because EventContext is copied when passed to algorithm
1159  /*if ( sourceContext != topSlot.eventContext )
1160  {
1161  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1162  return StatusCode::FAILURE;
1163  }*/
1164 
1165  if ( viewContext ) {
1166  // Make new slot by copying the top slot
1167  unsigned int lastIndex = topSlot.allSubSlots.size();
1168  topSlot.allSubSlots.push_back( EventSlot( m_eventSlots[topSlotIndex], viewContext ) );
1169  topSlot.allSubSlots.back().entryPoint = nodeName;
1170 
1171  // Store index of the new slot in lookup structures
1172  topSlot.contextToSlot[viewContext] = lastIndex;
1173  topSlot.subSlotsByNode[nodeName].push_back( lastIndex );
1174  } else {
1175  // Disable the view node if there are no views
1176  topSlot.subSlotsByNode[nodeName] = std::vector<unsigned int>( 0 );
1177  }
1178 
1179  return StatusCode::SUCCESS;
1180 }
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:68
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
Definition: Service.cpp:64
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:737
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
Header file for class GaudiAlgorithm.
T to_string(T...args)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
T end(T...args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
Definition: EventSlot.h:58
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:61
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
virtual StatusCode scheduleEventView(EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
Method to inform the scheduler about event views.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
T push_back(T...args)
static std::list< SchedulerState > m_sState
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
Definition: EventSlot.h:62
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
StatusCode finalize() override
Finalise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
Definition: EventSlot.h:56
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
STL class.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:33
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
T back(T...args)
string s
Definition: gaudirun.py:253
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void ignore() const
Definition: StatusCode.h:84
T for_each(T...args)
std::string fullKey() const
Definition: DataObjID.cpp:84
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
T reserve(T...args)
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)