Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
12 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
13 
14 // C++
15 #include <algorithm>
16 #include <map>
17 #include <queue>
18 #include <sstream>
19 #include <unordered_set>
20 
21 // External libs
22 #include "boost/algorithm/string.hpp"
23 #include "boost/thread.hpp"
24 #include "boost/tokenizer.hpp"
25 // DP waiting for the TBB service
26 #include "tbb/task_scheduler_init.h"
27 
28 // Instantiation of a static factory class used by clients to create instances of this service
30 
31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
33 
34 namespace {
35  struct DataObjIDSorter {
36  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
37  };
38 
39  // Sort a DataObjIDColl in a well-defined, reproducible manner.
40  // Used for making debugging dumps.
41  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
43  v.reserve( coll.size() );
44  for ( const DataObjID& id : coll ) v.push_back( &id );
45  std::sort( v.begin(), v.end(), DataObjIDSorter() );
46  return v;
47  }
48 
49  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
50  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
51  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
52  }
53 } // namespace
54 
55 //---------------------------------------------------------------------------
56 
63 
64  // Initialise mother class (read properties, ...)
66  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
67 
68  // Get hold of the TBBSvc. This should initialize the thread pool
69  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
70  if ( !m_threadPoolSvc.isValid() ) {
71  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
72  return StatusCode::FAILURE;
73  }
74 
75  // Activate the scheduler in another thread.
76  info() << "Activating scheduler in a separate thread" << endmsg;
77  m_thread = std::thread( [this]() { this->activate(); } );
78 
79  while ( m_isActive != ACTIVE ) {
80  if ( m_isActive == FAILURE ) {
81  fatal() << "Terminating initialization" << endmsg;
82  return StatusCode::FAILURE;
83  } else {
84  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
85  sleep( 1 );
86  }
87  }
88 
89  if ( m_enableCondSvc ) {
90  // Get hold of the CondSvc
91  m_condSvc = serviceLocator()->service( "CondSvc" );
92  if ( !m_condSvc.isValid() ) {
93  warning() << "No CondSvc found, or not enabled. "
94  << "Will not manage CondAlgorithms" << endmsg;
95  m_enableCondSvc = false;
96  }
97  }
98 
99  // Get the algo resource pool
100  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
101  if ( !m_algResourcePool.isValid() ) {
102  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
107  if ( !m_algExecStateSvc.isValid() ) {
108  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
109  return StatusCode::FAILURE;
110  }
111 
112  // Get Whiteboard
113  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
114  if ( !m_whiteboard.isValid() ) {
115  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Get dedicated scheduler for I/O-bound algorithms
120  if ( m_useIOBoundAlgScheduler ) {
121  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
122  if ( !m_IOBoundAlgScheduler.isValid() )
123  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
124  }
125 
126  // Set the MaxEventsInFlight parameters from the number of WB stores
127  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
128 
129  // Set the number of free slots
130  m_freeSlots = m_maxEventsInFlight;
131 
132  // Get the list of algorithms
133  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
134  const unsigned int algsNumber = algos.size();
135  info() << "Found " << algsNumber << " algorithms" << endmsg;
136 
137  /* Dependencies
138  1) Look for handles in algo, if none
139  2) Assume none are required
140  */
141 
142  DataObjIDColl globalInp, globalOutp;
143 
144  // figure out all outputs
145  for ( IAlgorithm* ialgoPtr : algos ) {
146  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
147  if ( !algoPtr ) {
148  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
149  return StatusCode::FAILURE;
150  }
151  for ( auto id : algoPtr->outputDataObjs() ) {
152  auto r = globalOutp.insert( id );
153  if ( !r.second ) {
154  warning() << "multiple algorithms declare " << id
155  << " as output! could be a single instance in multiple paths "
156  "though, or control flow may guarantee only one runs...!"
157  << endmsg;
158  }
159  }
160  }
161 
162  std::ostringstream ostdd;
163  ostdd << "Data Dependencies for Algorithms:";
164 
165  std::map<std::string, DataObjIDColl> algosDependenciesMap;
166  for ( IAlgorithm* ialgoPtr : algos ) {
167  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
168  if ( nullptr == algoPtr ) {
169  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
170  << ": this will result in a crash." << endmsg;
171  return StatusCode::FAILURE;
172  }
173 
174  ostdd << "\n " << algoPtr->name();
175 
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
179  DataObjID id = *idp;
180  ostdd << "\n o INPUT " << id;
181  if ( id.key().find( ":" ) != std::string::npos ) {
182  ostdd << " contains alternatives which require resolution...\n";
183  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
184  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if ( itok != tokens.end() ) {
188  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
189  id.updateKey( *itok );
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
193  m_showDataDeps = true;
194  }
195  }
196  algoDependencies.insert( id );
197  globalInp.insert( id );
198  }
199  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
200  ostdd << "\n o OUTPUT " << *id;
201  if ( id->key().find( ":" ) != std::string::npos ) {
202  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
203  << endmsg;
204  m_showDataDeps = true;
205  }
206  }
207  } else {
208  ostdd << "\n none";
209  }
210  algosDependenciesMap[algoPtr->name()] = algoDependencies;
211  }
212 
213  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
214 
215  // Check if we have unmet global input dependencies, and, optionally, heal them
216  // WARNING: this step must be done BEFORE the Precedence Service is initialized
217  if ( m_checkDeps ) {
218  DataObjIDColl unmetDep;
219  for ( auto o : globalInp )
220  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
221 
222  if ( unmetDep.size() > 0 ) {
223 
224  std::ostringstream ost;
225  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
226  ost << "\n o " << *o << " required by Algorithm: ";
227 
228  for ( const auto& p : algosDependenciesMap )
229  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
230  }
231 
232  if ( !m_useDataLoader.empty() ) {
233 
234  // Find the DataLoader Alg
235  IAlgorithm* dataLoaderAlg( nullptr );
236  for ( IAlgorithm* algo : algos )
237  if ( algo->name() == m_useDataLoader ) {
238  dataLoaderAlg = algo;
239  break;
240  }
241 
242  if ( dataLoaderAlg == nullptr ) {
243  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
244  << "\" found, and unmet INPUT dependencies "
245  << "detected:\n"
246  << ost.str() << endmsg;
247  return StatusCode::FAILURE;
248  }
249 
250  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
251  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
252 
253  // Set the property Load of DataLoader Alg
254  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
255  if ( !dataAlg ) {
256  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
257  << endmsg;
258  return StatusCode::FAILURE;
259  }
260 
261  for ( auto& id : unmetDep ) {
262  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
263  << dataLoaderAlg->name() << endmsg;
265  }
266 
267  } else {
268  fatal() << "Auto DataLoading not requested, "
269  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
270  return StatusCode::FAILURE;
271  }
272 
273  } else {
274  info() << "No unmet INPUT data dependencies were found" << endmsg;
275  }
276  }
277 
278  // Get the precedence service
279  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
280  if ( !m_precSvc.isValid() ) {
281  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
282  return StatusCode::FAILURE;
283  }
284  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
285  if ( !precSvc ) {
286  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
287  return StatusCode::FAILURE;
288  }
289 
290  // Fill the containers to convert algo names to index
291  m_algname_vect.resize( algsNumber );
292  for ( IAlgorithm* algo : algos ) {
293  const std::string& name = algo->name();
294  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
295  m_algname_index_map[name] = index;
296  m_algname_vect.at( index ) = name;
297  }
298 
299  // Shortcut for the message service
300  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
301  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
302 
303  m_eventSlots.reserve( m_maxEventsInFlight );
304  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
305  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
306  m_eventSlots.back().complete = true;
307  }
308  m_actionsCounts.assign( m_maxEventsInFlight, 0 );
309 
310  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
311 
312  // Clearly inform about the level of concurrency
313  info() << "Concurrency level information:" << endmsg;
314  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
315  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
316 
317  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
318 
319  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
320 
321  // Simulate execution flow
322  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
323 
324  return sc;
325 }
326 //---------------------------------------------------------------------------
327 
332 
334  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
335 
336  sc = deactivate();
337  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
338 
339  info() << "Joining Scheduler thread" << endmsg;
340  m_thread.join();
341 
342  // Final error check after thread pool termination
343  if ( m_isActive == FAILURE ) {
344  error() << "problems in scheduler thread" << endmsg;
345  return StatusCode::FAILURE;
346  }
347 
348  return sc;
349 }
350 //---------------------------------------------------------------------------
351 
363 
364  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
365 
366  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
367  error() << "problems initializing ThreadPoolSvc" << endmsg;
368  m_isActive = FAILURE;
369  return;
370  }
371 
372  // Wait for actions pushed into the queue by finishing tasks.
373  action thisAction;
375 
376  m_isActive = ACTIVE;
377 
378  // Continue to wait if the scheduler is running or there is something to do
379  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
380  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
381  m_actionsQueue.pop( thisAction );
382  sc = thisAction();
383  ON_VERBOSE {
384  if ( sc != StatusCode::SUCCESS )
385  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
386  else
387  verbose() << "Action succeeded." << endmsg;
388  }
389  }
390 
391  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
392  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393  error() << "Problems terminating thread pool" << endmsg;
394  m_isActive = FAILURE;
395  }
396 }
397 
398 //---------------------------------------------------------------------------
399 
407 
408  if ( m_isActive == ACTIVE ) {
409 
410  // Set the number of slots available to an error code
411  m_freeSlots.store( 0 );
412 
413  // Empty queue
414  action thisAction;
415  while ( m_actionsQueue.try_pop( thisAction ) ) {};
416 
417  // This would be the last action
418  m_actionsQueue.push( [this]() -> StatusCode {
419  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
420  m_isActive = INACTIVE;
421  return StatusCode::SUCCESS;
422  } );
423  }
424 
425  return StatusCode::SUCCESS;
426 }
427 
428 //---------------------------------------------------------------------------
429 
430 // Utils and shortcuts
431 
432 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
433 
434 //---------------------------------------------------------------------------
435 
436 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname ) {
437  unsigned int index = m_algname_index_map[algoname];
438  return index;
439 }
440 
441 //---------------------------------------------------------------------------
442 
443 // EventSlot management
451 
452  if ( !eventContext ) {
453  fatal() << "Event context is nullptr" << endmsg;
454  return StatusCode::FAILURE;
455  }
456 
457  if ( m_freeSlots.load() == 0 ) {
458  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
459  return StatusCode::FAILURE;
460  }
461 
462  // no problem as push new event is only called from one thread (event loop manager)
463  --m_freeSlots;
464 
465  auto action = [this, eventContext]() -> StatusCode {
466  // Event processing slot forced to be the same as the wb slot
467  const unsigned int thisSlotNum = eventContext->slot();
468  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
469  if ( !thisSlot.complete ) {
470  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
475  thisSlot.reset( eventContext );
476 
477  // Result status code:
479 
480  // promote to CR and DR the initial set of algorithms
481  Cause cs = {Cause::source::Root, "RootDecisionHub"};
482  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
484  result = StatusCode::FAILURE;
485  }
486 
487  if ( this->updateStates( thisSlotNum ).isFailure() ) {
488  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
489  result = StatusCode::FAILURE;
490  }
491 
492  return result;
493  }; // end of lambda
494 
495  // Kick off the scheduling!
496  ON_VERBOSE {
497  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
498  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
499  }
500 
501  m_actionsQueue.push( action );
502 
503  return StatusCode::SUCCESS;
504 }
505 
506 //---------------------------------------------------------------------------
507 
509  StatusCode sc;
510  for ( auto context : eventContexts ) {
511  sc = pushNewEvent( context );
512  if ( sc != StatusCode::SUCCESS ) return sc;
513  }
514  return sc;
515 }
516 
517 //---------------------------------------------------------------------------
518 
519 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
520 
521 //---------------------------------------------------------------------------
526  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
527  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
528  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
529  // << " active: " << m_isActive << endmsg;
530  return StatusCode::FAILURE;
531  } else {
532  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
533  // << " active: " << m_isActive << endmsg;
534  m_finishedEvents.pop( eventContext );
535  ++m_freeSlots;
536  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
537  return StatusCode::SUCCESS;
538  }
539 }
540 
541 //---------------------------------------------------------------------------
546  if ( m_finishedEvents.try_pop( eventContext ) ) {
547  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
548  << endmsg;
549  ++m_freeSlots;
550  return StatusCode::SUCCESS;
551  }
552  return StatusCode::FAILURE;
553 }
554 
555 //--------------------------------------------------------------------------
556 // States Management
557 
567 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, const int sub_slot,
568  const int source_slot ) {
569 
570  StatusCode global_sc( StatusCode::SUCCESS );
571 
572  // Sort from the oldest to the newest event
573  // Prepare a vector of pointers to the slots to avoid copies
574  std::vector<EventSlot*> eventSlotsPtrs;
575 
576  // Consider all slots if si <0 or just one otherwise
577  if ( si < 0 ) {
578  const int eventsSlotsSize( m_eventSlots.size() );
579  eventSlotsPtrs.reserve( eventsSlotsSize );
580  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
581  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
582  }
583  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
584  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
585  } else {
586  eventSlotsPtrs.push_back( &m_eventSlots[si] );
587  }
588 
589  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
590  int iSlot = thisSlotPtr->eventContext->slot();
591 
592  // Cache the states of the algos to improve readability and performance
593  auto& thisSlot = m_eventSlots[iSlot];
594  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
595 
596  // Perform the I->CR->DR transitions
597  if ( algo_index >= 0 ) {
598  Cause cs = {Cause::source::Task, index2algname( algo_index )};
599 
600  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
601  if ( sub_slot == -1 || iSlot != source_slot ) {
602  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
603  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
604  global_sc = StatusCode::FAILURE;
605  }
606  } else {
607  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
608  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
609  global_sc = StatusCode::FAILURE;
610  }
611  }
612  }
613 
614  StatusCode partial_sc( StatusCode::FAILURE, true );
615 
616  // Perform DR->SCHEDULED
617  if ( !m_optimizationMode.empty() ) {
618  auto comp_nodes = [this]( const uint& i, const uint& j ) {
619  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
620  };
622  comp_nodes, std::vector<uint>() );
623  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
624  buffer.push( *it );
625  while ( !buffer.empty() ) {
626  bool IOBound = false;
627  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
628 
629  if ( !IOBound )
630  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
631  else
632  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
633 
634  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
635  << "Could not apply transition from " << AState::DATAREADY << " for algorithm "
636  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
637 
638  buffer.pop();
639  }
640 
641  } else {
642  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
643  uint algIndex = *it;
644 
645  bool IOBound = false;
646  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
647 
648  if ( !IOBound )
649  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
650  else
651  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
652 
653  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
654  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
655  << " on processing slot " << iSlot << endmsg;
656  }
657  }
658 
659  // Check for algorithms ready in sub-slots
660  for ( auto& subslot : thisSlot.allSubSlots ) {
661  auto& subslotStates = subslot.algsStates;
662  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
663  uint algIndex{*it};
664  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
665  // The following verbosity is expensive when the number of sub-slots is high
666  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
667  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
668  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
669  }
670  }
671 
672  if ( m_dumpIntraEventDynamics ) {
674  s << ( algo_index != -1 ? index2algname( algo_index ) : "START" ) << ", "
675  << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
676  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
677  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
678  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
679  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
680  std::ofstream myfile;
681  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
682  myfile << s.str();
683  myfile.close();
684  }
685 
686  // Not complete because this would mean that the slot is already free!
687  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
688  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
689  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
690  !thisSlot.complete ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
697  << thisSlot.eventContext->slot() << ")." << endmsg;
698  m_finishedEvents.push( thisSlot.eventContext.release() );
699  }
700 
701  // now let's return the fully evaluated result of the control flow
702  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
703 
704  thisSlot.eventContext.reset( nullptr );
705 
706  } else if ( isStalled( thisSlot ) ) {
707  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
708  eventFailed( thisSlot.eventContext.get() ); // can't release yet
709  }
710  } // end loop on slots
711 
712  ON_VERBOSE verbose() << "States Updated." << endmsg;
713 
714  return global_sc;
715 }
716 
717 //---------------------------------------------------------------------------
718 
725 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
726 
727  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
728  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
729  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
730 
731  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
732 
733  return true;
734  }
735  return false;
736 }
737 
738 //---------------------------------------------------------------------------
739 
745  const uint slotIdx = eventContext->slot();
746 
747  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
748 
749  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
750 
751  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
752  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
753 
754  // Push into the finished events queue the failed context
755  m_eventSlots[slotIdx].complete = true;
756  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
757 }
758 
759 //---------------------------------------------------------------------------
760 
766 
767  // To have just one big message
768  std::ostringstream outputMS;
769 
770  outputMS << "Dumping scheduler state\n"
771  << "=========================================================================================\n"
772  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
773  << "=========================================================================================\n\n";
774 
775  //===========================================================================
776 
777  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
778  << "------------------\n\n";
779 
780  // Figure if TimelineSvc is available (used below to detect threads IDs)
781  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
782  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
783  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
784  } else {
785 
786  // Figure optimal printout layout
787  size_t indt( 0 );
788  for ( auto& slot : m_eventSlots )
789  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
790  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
791 
792  // Figure the last running schedule across all slots
793  for ( auto& slot : m_eventSlots ) {
794  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
795  ++it ) {
796 
797  const std::string algoName{index2algname( *it )};
798 
799  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
800  << slot.eventContext->slot();
801 
802  // Try to get POSIX threads IDs the currently running tasks are scheduled to
803  if ( timelineSvc.isValid() ) {
804  TimelineEvent te{};
805  te.algorithm = algoName;
806  te.slot = slot.eventContext->slot();
807  te.event = slot.eventContext->evt();
808 
809  if ( timelineSvc->getTimelineEvent( te ) )
810  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
811  else
812  outputMS << " thread.id: [unknown]"; // this means a task has just
813  // been signed off as SCHEDULED,
814  // but has not been assigned to a thread yet
815  // (i.e., not running yet)
816  }
817  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
818  }
819  }
820  }
821 
822  //===========================================================================
823 
824  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
825  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
826 
827  int slotCount = -1;
828  for ( auto& slot : m_eventSlots ) {
829  ++slotCount;
830  if ( slot.complete ) continue;
831 
832  outputMS << "[ slot: "
833  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
834  << " event: "
835  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
836  << " ]:\n\n";
837 
838  if ( 0 > iSlot || iSlot == slotCount ) {
839 
840  // Snapshot of the Control Flow and FSM states
841  outputMS << m_precSvc->printState( slot ) << "\n";
842 
843  // Mention sub slots (this is expensive if the number of sub-slots is high)
844  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
845  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
846  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
847  for ( auto& ss : slot.allSubSlots ) {
848  outputMS << "[ slot: " << slotID << " sub-slot entry: " << ss.entryPoint << " event: "
849  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
850  << " ]:\n\n";
851  outputMS << m_precSvc->printState( ss ) << "\n";
852  }
853  }
854  }
855  }
856 
857  //===========================================================================
858 
859  if ( 0 <= iSlot ) {
860  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
861  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
862  }
863 
864  outputMS << "\n=========================================================================================\n"
865  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
866  << "=========================================================================================\n\n";
867 
868  info() << outputMS.str() << endmsg;
869 }
870 
871 //---------------------------------------------------------------------------
872 
873 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
874 
875  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
876 
877  const std::string& algName( index2algname( iAlgo ) );
878  IAlgorithm* ialgoPtr = nullptr;
879  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
880 
881  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
882 
883  ++m_algosInFlight;
884  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
885  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
886  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
887  } );
888  return StatusCode::SUCCESS;
889  };
890 
891  // Avoid to use tbb if the pool size is 1 and run in this thread
892  if ( -100 != m_threadPoolSize ) {
893  // the child task that executes an Algorithm
894  tbb::task* algoTask = new ( tbb::task::allocate_root() )
895  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
896  // schedule the algoTask
897  tbb::task::enqueue( *algoTask );
898 
899  } else {
900  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
901  promote2ExecutedClosure );
902  theTask.execute();
903  }
904 
905  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
906  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
907 
908  // Update states in the appropriate event slot
909  StatusCode updateSc;
910  EventSlot& thisSlot = m_eventSlots[si];
911  if ( eventContext->usesSubSlot() ) {
912  // Sub-slot
913  size_t const subSlotIndex = eventContext->subSlot();
914  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
915  } else {
916  // Event level (standard behaviour)
917  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
918  }
919 
920  ON_VERBOSE dumpSchedulerState( -1 );
921 
922  if ( updateSc.isSuccess() )
923  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
924  return updateSc;
925  } else {
926  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
927  << endmsg;
928  return sc;
929  }
930 }
931 
932 //---------------------------------------------------------------------------
933 
934 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
935 
936  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
937 
938  // bool IOBound = m_precSvc->isBlocking(algName);
939 
940  const std::string& algName( index2algname( iAlgo ) );
941  IAlgorithm* ialgoPtr = nullptr;
942  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
943 
944  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
945 
946  ++m_IOBoundAlgosInFlight;
947  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
948  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
949  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
950  eventContext );
951  } );
952  return StatusCode::SUCCESS;
953  };
954  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
955  // tbb::task)? it seems it works..
956  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
957  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
958  m_IOBoundAlgScheduler->push( *theTask );
959 
960  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
961  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
962 
963  // Update states in the appropriate event slot
964  StatusCode updateSc;
965  EventSlot& thisSlot = m_eventSlots[si];
966  if ( eventContext->usesSubSlot() ) {
967  // Sub-slot
968  size_t const subSlotIndex = eventContext->subSlot();
969  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
970  } else {
971  // Event level (standard behaviour)
972  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
973  }
974 
975  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
976  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
977  return updateSc;
978  } else {
979  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
980  << " on slot " << si << endmsg;
981  return sc;
982  }
983 }
984 
985 //---------------------------------------------------------------------------
986 
991  EventContext* eventContext ) {
992  Gaudi::Hive::setCurrentContext( eventContext );
993  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
994 
995  if ( sc.isFailure() ) {
996  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
997  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
998  return StatusCode::FAILURE;
999  }
1000 
1001  --m_algosInFlight;
1002 
1003  EventSlot& thisSlot = m_eventSlots[si];
1004 
1005  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1006 
1007  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1008  AState state = algstate.execStatus().isSuccess()
1009  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1010  : AState::ERROR;
1011 
1012  // Update states in the appropriate slot
1013  int subSlotIndex = -1;
1014  if ( eventContext->usesSubSlot() ) {
1015  // Sub-slot
1016  subSlotIndex = eventContext->subSlot();
1017  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1018  } else {
1019  // Event level (standard behaviour)
1020  sc = thisSlot.algsStates.set( iAlgo, state );
1021  }
1022 
1023  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1024  << "Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1025 
1026  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1027  << m_algosInFlight << endmsg;
1028 
1029  // Schedule an update of the status of the algorithms
1030  ++m_actionsCounts[si];
1031  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1032  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1033  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1034  } );
1035 
1036  return sc;
1037 }
1038 
1039 //---------------------------------------------------------------------------
1040 
1045  EventContext* eventContext ) {
1046  Gaudi::Hive::setCurrentContext( eventContext );
1047  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1048 
1049  if ( sc.isFailure() ) {
1050  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1051  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1052  return StatusCode::FAILURE;
1053  }
1054 
1055  --m_IOBoundAlgosInFlight;
1056 
1057  EventSlot& thisSlot = m_eventSlots[si];
1058 
1059  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1060  << endmsg;
1061 
1062  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1063  AState state = algstate.execStatus().isSuccess()
1064  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1065  : AState::ERROR;
1066 
1067  // Update states in the appropriate slot
1068  int subSlotIndex = -1;
1069  if ( eventContext->usesSubSlot() ) {
1070  // Sub-slot
1071  subSlotIndex = eventContext->subSlot();
1072  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1073  } else {
1074  // Event level (standard behaviour)
1075  sc = thisSlot.algsStates.set( iAlgo, state );
1076  }
1077 
1078  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1079  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1080 
1081  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1082  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1083 
1084  // Schedule an update of the status of the algorithms
1085  ++m_actionsCounts[si];
1086  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1087  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1088  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1089  } );
1090 
1091  return sc;
1092 }
1093 
1094 //---------------------------------------------------------------------------
1095 
1096 // Method to inform the scheduler about event views
1097 
1099  std::unique_ptr<EventContext> viewContext ) {
1100  // Prevent view nesting
1101  if ( sourceContext->usesSubSlot() ) {
1102  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1103  return StatusCode::FAILURE;
1104  }
1105 
1106  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1107 
1108  // It's not possible to create an std::functional from a move-capturing lambda
1109  // So, we have to release the unique pointer
1110  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1111  &nodeName]() -> StatusCode {
1112  // Attach the sub-slot to the top-level slot
1113  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1114 
1115  if ( viewContextPtr ) {
1116  // Re-create the unique pointer
1117  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1118  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1119  return StatusCode::SUCCESS;
1120  } else {
1121  // Disable the view node if there are no views
1122  topSlot.disableSubSlots( nodeName );
1123  return StatusCode::SUCCESS;
1124  }
1125  };
1126 
1127  m_actionsQueue.push( std::move( action ) );
1128 
1129  return StatusCode::SUCCESS;
1130 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:63
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
Definition: Service.cpp:60
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
Definition: EventSlot.h:14
T empty(T...args)
T open(T...args)
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
Definition: EventSlot.h:68
StatusCode finalize() override
Definition: Service.cpp:164
ContextID_t slot() const
Definition: EventContext.h:48
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
Definition: EventSlot.h:51
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:635
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Definition: StatusCode.h:267
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::string algorithm
Definition: ITimelineSvc.h:21
void activate()
Activate scheduler.
T end(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:50
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:31
STL class.
bool isFailure() const
Definition: StatusCode.h:130
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
T release(T...args)
T setw(T...args)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:47
STL class.
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
T push_back(T...args)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T...args)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T get(T...args)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
Iterator begin(State kind)
T any_of(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
string s
Definition: gaudirun.py:312
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
Definition: DataObjID.cpp:88
ContextID_t subSlot() const
Definition: EventContext.h:49
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
T reserve(T...args)
#define ON_VERBOSE
Iterator end(State kind)