The Gaudi Framework  v30r4 (9b837755)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
7 #include "GaudiKernel/Algorithm.h" // can be removed ASA dynamic casts to Algorithm are removed
12 
13 // C++
14 #include <algorithm>
15 #include <map>
16 #include <queue>
17 #include <sstream>
18 #include <unordered_set>
19 
20 // External libs
21 #include "boost/algorithm/string.hpp"
22 #include "boost/thread.hpp"
23 #include "boost/tokenizer.hpp"
24 // DP waiting for the TBB service
25 #include "tbb/task_scheduler_init.h"
26 
27 // Instantiation of a static factory class used by clients to create instances of this service
29 
30 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
31 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
32 
33 namespace
34 {
35  struct DataObjIDSorter {
36  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
37  };
38 
39  // Sort a DataObjIDColl in a well-defined, reproducible manner.
40  // Used for making debugging dumps.
41  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
42  {
44  v.reserve( coll.size() );
45  for ( const DataObjID& id : coll ) v.push_back( &id );
46  std::sort( v.begin(), v.end(), DataObjIDSorter() );
47  return v;
48  }
49 
50  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates )
51  {
52  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
53  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
54  }
55 }
56 
57 //---------------------------------------------------------------------------
58 
65 {
66 
67  // Initialise mother class (read properties, ...)
69  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
70 
71  // Get hold of the TBBSvc. This should initialize the thread pool
72  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
73  if ( !m_threadPoolSvc.isValid() ) {
74  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
75  return StatusCode::FAILURE;
76  }
77 
78  // Activate the scheduler in another thread.
79  info() << "Activating scheduler in a separate thread" << endmsg;
80  m_thread = std::thread( [this]() { this->activate(); } );
81 
82  while ( m_isActive != ACTIVE ) {
83  if ( m_isActive == FAILURE ) {
84  fatal() << "Terminating initialization" << endmsg;
85  return StatusCode::FAILURE;
86  } else {
87  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
88  sleep( 1 );
89  }
90  }
91 
92  if ( m_enableCondSvc ) {
93  // Get hold of the CondSvc
94  m_condSvc = serviceLocator()->service( "CondSvc" );
95  if ( !m_condSvc.isValid() ) {
96  warning() << "No CondSvc found, or not enabled. "
97  << "Will not manage CondAlgorithms" << endmsg;
98  m_enableCondSvc = false;
99  }
100  }
101 
102  // Get the algo resource pool
103  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
104  if ( !m_algResourcePool.isValid() ) {
105  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
106  return StatusCode::FAILURE;
107  }
108 
109  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
110  if ( !m_algExecStateSvc.isValid() ) {
111  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
112  return StatusCode::FAILURE;
113  }
114 
115  // Get Whiteboard
116  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
117  if ( !m_whiteboard.isValid() ) {
118  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
119  return StatusCode::FAILURE;
120  }
121 
122  // Get dedicated scheduler for I/O-bound algorithms
123  if ( m_useIOBoundAlgScheduler ) {
124  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
125  if ( !m_IOBoundAlgScheduler.isValid() )
126  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
127  }
128 
129  // Set the MaxEventsInFlight parameters from the number of WB stores
130  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
131 
132  // Set the number of free slots
133  m_freeSlots = m_maxEventsInFlight;
134 
135  // Get the list of algorithms
136  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
137  const unsigned int algsNumber = algos.size();
138  info() << "Found " << algsNumber << " algorithms" << endmsg;
139 
140  /* Dependencies
141  1) Look for handles in algo, if none
142  2) Assume none are required
143  */
144 
145  DataObjIDColl globalInp, globalOutp;
146 
147  using AccessMode = Gaudi::v2::DataHandle::AccessMode;
148 
149  // figure out all outputs
150  for ( IAlgorithm* ialgoPtr : algos ) {
151  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
152  if ( !algoPtr ) {
153  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
154  }
155  for ( auto id : algoPtr->dataDependencies( AccessMode::Write ) ) {
156  auto r = globalOutp.insert( id );
157  if ( !r.second ) {
158  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
159  "though, or control flow may guarantee only one runs...!"
160  << endmsg;
161  }
162  }
163  }
164 
165  std::ostringstream ostdd;
166  ostdd << "Data Dependencies for Algorithms:";
167 
168  std::map<std::string, DataObjIDColl> algosDependenciesMap;
169  for ( IAlgorithm* ialgoPtr : algos ) {
170  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
171  if ( nullptr == algoPtr ) {
172  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
173  << ": this will result in a crash." << endmsg;
174  return StatusCode::FAILURE;
175  }
176 
177  ostdd << "\n " << algoPtr->name();
178 
179  DataObjIDColl algoDependencies;
180  if ( !algoPtr->dataDependencies( AccessMode::Read ).empty() ||
181  !algoPtr->dataDependencies( AccessMode::Write ).empty() ) {
182  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->dataDependencies( AccessMode::Read ) ) ) {
183  DataObjID id = *idp;
184  ostdd << "\n o INPUT " << id;
185  if ( id.key().find( ":" ) != std::string::npos ) {
186  ostdd << " contains alternatives which require resolution...\n";
187  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
188  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
189  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
190  } );
191  if ( itok != tokens.end() ) {
192  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
193  id.updateKey( *itok );
194  } else {
195  error() << "failed to find alternate in global output list"
196  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
197  m_showDataDeps = true;
198  }
199  }
200  algoDependencies.insert( id );
201  globalInp.insert( id );
202  }
203  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->dataDependencies( AccessMode::Write ) ) ) {
204  ostdd << "\n o OUTPUT " << *id;
205  if ( id->key().find( ":" ) != std::string::npos ) {
206  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
207  << endmsg;
208  m_showDataDeps = true;
209  }
210  }
211  } else {
212  ostdd << "\n none";
213  }
214  algosDependenciesMap[algoPtr->name()] = algoDependencies;
215  }
216 
217  if ( m_showDataDeps ) {
218  info() << ostdd.str() << endmsg;
219  }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
263  return StatusCode::FAILURE;
264  }
265 
266  for ( auto& id : unmetDep ) {
267  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
268  << dataLoaderAlg->name() << endmsg;
269  dataAlg->addDataDependency( id, AccessMode::Write );
270  }
271 
272  } else {
273  fatal() << "Auto DataLoading not requested, "
274  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
275  return StatusCode::FAILURE;
276  }
277 
278  } else {
279  info() << "No unmet INPUT data dependencies were found" << endmsg;
280  }
281  }
282 
283  // Get the precedence service
284  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
285  if ( !m_precSvc.isValid() ) {
286  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
287  return StatusCode::FAILURE;
288  }
289  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
290  if ( !precSvc ) {
291  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
292  return StatusCode::FAILURE;
293  }
294 
295  // Fill the containers to convert algo names to index
296  m_algname_vect.resize( algsNumber );
297  for ( IAlgorithm* algo : algos ) {
298  const std::string& name = algo->name();
299  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
300  m_algname_index_map[name] = index;
301  m_algname_vect.at( index ) = name;
302  }
303 
304  // Shortcut for the message service
305  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
306  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
307 
308  m_eventSlots.reserve( m_maxEventsInFlight );
309  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
310  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
311  m_eventSlots.back().complete = true;
312  }
313  m_actionsCounts.assign( m_maxEventsInFlight, 0 );
314 
315  if ( m_threadPoolSize > 1 ) {
316  m_maxAlgosInFlight = (size_t)m_threadPoolSize;
317  }
318 
319  // Clearly inform about the level of concurrency
320  info() << "Concurrency level information:" << endmsg;
321  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
322  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
323 
324  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
325 
326  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
327 
328  // Simulate execution flow
329  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
330 
331  return sc;
332 }
333 //---------------------------------------------------------------------------
334 
339 {
340 
342  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
343 
344  sc = deactivate();
345  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
346 
347  info() << "Joining Scheduler thread" << endmsg;
348  m_thread.join();
349 
350  // Final error check after thread pool termination
351  if ( m_isActive == FAILURE ) {
352  error() << "problems in scheduler thread" << endmsg;
353  return StatusCode::FAILURE;
354  }
355 
356  return sc;
357 }
358 //---------------------------------------------------------------------------
359 
371 {
372 
373  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
374 
375  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
376  error() << "problems initializing ThreadPoolSvc" << endmsg;
377  m_isActive = FAILURE;
378  return;
379  }
380 
381  // Wait for actions pushed into the queue by finishing tasks.
382  action thisAction;
384 
385  m_isActive = ACTIVE;
386 
387  // Continue to wait if the scheduler is running or there is something to do
388  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
389  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
390  m_actionsQueue.pop( thisAction );
391  sc = thisAction();
392  ON_VERBOSE
393  {
394  if ( sc != StatusCode::SUCCESS )
395  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
396  else
397  verbose() << "Action succeeded." << endmsg;
398  }
399  }
400 
401  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
402  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
403  error() << "Problems terminating thread pool" << endmsg;
404  m_isActive = FAILURE;
405  }
406 }
407 
408 //---------------------------------------------------------------------------
409 
417 {
418 
419  if ( m_isActive == ACTIVE ) {
420 
421  // Set the number of slots available to an error code
422  m_freeSlots.store( 0 );
423 
424  // Empty queue
425  action thisAction;
426  while ( m_actionsQueue.try_pop( thisAction ) ) {
427  };
428 
429  // This would be the last action
430  m_actionsQueue.push( [this]() -> StatusCode {
431  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
432  m_isActive = INACTIVE;
433  return StatusCode::SUCCESS;
434  } );
435  }
436 
437  return StatusCode::SUCCESS;
438 }
439 
440 //---------------------------------------------------------------------------
441 
442 // Utils and shortcuts
443 
444 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
445 
446 //---------------------------------------------------------------------------
447 
448 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname )
449 {
450  unsigned int index = m_algname_index_map[algoname];
451  return index;
452 }
453 
454 //---------------------------------------------------------------------------
455 
456 // EventSlot management
464 {
465 
466  if ( !eventContext ) {
467  fatal() << "Event context is nullptr" << endmsg;
468  return StatusCode::FAILURE;
469  }
470 
471  if ( m_freeSlots.load() == 0 ) {
472  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
473  return StatusCode::FAILURE;
474  }
475 
476  // no problem as push new event is only called from one thread (event loop manager)
477  --m_freeSlots;
478 
479  auto action = [this, eventContext]() -> StatusCode {
480  // Event processing slot forced to be the same as the wb slot
481  const unsigned int thisSlotNum = eventContext->slot();
482  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
483  if ( !thisSlot.complete ) {
484  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
485  return StatusCode::FAILURE;
486  }
487 
488  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
489  thisSlot.reset( eventContext );
490 
491  // Result status code:
493 
494  // promote to CR and DR the initial set of algorithms
495  Cause cs = {Cause::source::Root, "RootDecisionHub"};
496  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
497  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
498  result = StatusCode::FAILURE;
499  }
500 
501  if ( this->updateStates( thisSlotNum ).isFailure() ) {
502  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
503  result = StatusCode::FAILURE;
504  }
505 
506  return result;
507  }; // end of lambda
508 
509  // Kick off the scheduling!
510  ON_VERBOSE
511  {
512  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
513  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
514  }
515 
516  m_actionsQueue.push( action );
517 
518  return StatusCode::SUCCESS;
519 }
520 
521 //---------------------------------------------------------------------------
522 
524 {
525  StatusCode sc;
526  for ( auto context : eventContexts ) {
527  sc = pushNewEvent( context );
528  if ( sc != StatusCode::SUCCESS ) return sc;
529  }
530  return sc;
531 }
532 
533 //---------------------------------------------------------------------------
534 
535 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
536 
537 //---------------------------------------------------------------------------
542 {
543  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
544  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
545  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
546  // << " active: " << m_isActive << endmsg;
547  return StatusCode::FAILURE;
548  } else {
549  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
550  // << " active: " << m_isActive << endmsg;
551  m_finishedEvents.pop( eventContext );
552  ++m_freeSlots;
553  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
554  return StatusCode::SUCCESS;
555  }
556 }
557 
558 //---------------------------------------------------------------------------
563 {
564  if ( m_finishedEvents.try_pop( eventContext ) ) {
565  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
566  << endmsg;
567  ++m_freeSlots;
568  return StatusCode::SUCCESS;
569  }
570  return StatusCode::FAILURE;
571 }
572 
573 //--------------------------------------------------------------------------
574 // States Management
575 
585 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, const int sub_slot,
586  const int source_slot )
587 {
588 
589  StatusCode global_sc( StatusCode::SUCCESS );
590 
591  // Sort from the oldest to the newest event
592  // Prepare a vector of pointers to the slots to avoid copies
593  std::vector<EventSlot*> eventSlotsPtrs;
594 
595  // Consider all slots if si <0 or just one otherwise
596  if ( si < 0 ) {
597  const int eventsSlotsSize( m_eventSlots.size() );
598  eventSlotsPtrs.reserve( eventsSlotsSize );
599  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
600  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
601  }
602  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
603  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
604  } else {
605  eventSlotsPtrs.push_back( &m_eventSlots[si] );
606  }
607 
608  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
609  int iSlot = thisSlotPtr->eventContext->slot();
610 
611  // Cache the states of the algos to improve readability and performance
612  auto& thisSlot = m_eventSlots[iSlot];
613  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
614 
615  // Perform the I->CR->DR transitions
616  if ( algo_index >= 0 ) {
617  Cause cs = {Cause::source::Task, index2algname( algo_index )};
618 
619  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
620  if ( sub_slot == -1 || iSlot != source_slot ) {
621  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
622  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
623  global_sc = StatusCode::FAILURE;
624  }
625  } else {
626  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
627  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
628  global_sc = StatusCode::FAILURE;
629  }
630  }
631  }
632 
633  StatusCode partial_sc( StatusCode::FAILURE, true );
634 
635  // Perform DR->SCHEDULED
636  if ( !m_optimizationMode.empty() ) {
637  auto comp_nodes = [this]( const uint& i, const uint& j ) {
638  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
639  };
641  comp_nodes, std::vector<uint>() );
642  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
643  buffer.push( *it );
644  while ( !buffer.empty() ) {
645  bool IOBound = false;
646  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
647 
648  if ( !IOBound )
649  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
650  else
651  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
652 
653  ON_VERBOSE if ( partial_sc.isFailure() ) verbose() << "Could not apply transition from " << AState::DATAREADY
654  << " for algorithm " << index2algname( buffer.top() )
655  << " on processing slot " << iSlot << endmsg;
656 
657  buffer.pop();
658  }
659 
660  } else {
661  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
662  uint algIndex = *it;
663 
664  bool IOBound = false;
665  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
666 
667  if ( !IOBound )
668  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
669  else
670  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
671 
672  ON_VERBOSE if ( partial_sc.isFailure() ) verbose() << "Could not apply transition from " << AState::DATAREADY
673  << " for algorithm " << index2algname( algIndex )
674  << " on processing slot " << iSlot << endmsg;
675  }
676  }
677 
678  // Check for algorithms ready in sub-slots
679  for ( auto& subslot : thisSlot.allSubSlots ) {
680  auto& subslotStates = subslot.algsStates;
681  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
682  uint algIndex{*it};
683  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
684  // The following verbosity is expensive when the number of sub-slots is high
685  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
686  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
687  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
688  }
689  }
690 
691  if ( m_dumpIntraEventDynamics ) {
693  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
694  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
695  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
696  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
697  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
698  std::ofstream myfile;
699  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
700  myfile << s.str();
701  myfile.close();
702  }
703 
704  // Not complete because this would mean that the slot is already free!
705  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
706  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
707  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
708  !thisSlot.complete ) {
709 
710  thisSlot.complete = true;
711  // if the event did not fail, add it to the finished events
712  // otherwise it is taken care of in the error handling
713  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
714  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
715  << thisSlot.eventContext->slot() << ")." << endmsg;
716  m_finishedEvents.push( thisSlot.eventContext.release() );
717  }
718 
719  // now let's return the fully evaluated result of the control flow
720  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
721 
722  thisSlot.eventContext.reset( nullptr );
723 
724  } else if ( isStalled( thisSlot ) ) {
725  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
726  eventFailed( thisSlot.eventContext.get() ); // can't release yet
727  }
728  } // end loop on slots
729 
730  ON_VERBOSE verbose() << "States Updated." << endmsg;
731 
732  return global_sc;
733 }
734 
735 //---------------------------------------------------------------------------
736 
744 {
745 
746  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
747  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
748  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
749 
750  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
751 
752  return true;
753  }
754  return false;
755 }
756 
757 //---------------------------------------------------------------------------
758 
764 {
765  const uint slotIdx = eventContext->slot();
766 
767  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
768 
769  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
770 
771  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
772  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
773 
774  // Push into the finished events queue the failed context
775  m_eventSlots[slotIdx].complete = true;
776  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
777 }
778 
779 //---------------------------------------------------------------------------
780 
786 {
787 
788  // To have just one big message
789  std::ostringstream outputMS;
790 
791  outputMS << "Dumping scheduler state\n"
792  << "=========================================================================================\n"
793  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
794  << "=========================================================================================\n\n";
795 
796  //===========================================================================
797 
798  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
799  << "------------------\n\n";
800 
801  // Figure if TimelineSvc is available (used below to detect threads IDs)
802  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
803  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
804  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
805  } else {
806 
807  // Figure optimal printout layout
808  size_t indt( 0 );
809  for ( auto& slot : m_eventSlots )
810  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
811  if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
812 
813  // Figure the last running schedule across all slots
814  for ( auto& slot : m_eventSlots ) {
815  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
816  ++it ) {
817 
818  const std::string algoName{index2algname( (uint)*it )};
819 
820  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
821  << slot.eventContext->slot();
822 
823  // Try to get POSIX threads IDs the currently running tasks are scheduled to
824  if ( timelineSvc.isValid() ) {
825  TimelineEvent te{};
826  te.algorithm = algoName;
827  te.slot = slot.eventContext->slot();
828  te.event = slot.eventContext->evt();
829 
830  if ( timelineSvc->getTimelineEvent( te ) )
831  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
832  else
833  outputMS << " thread.id: [unknown]"; // this means a task has just
834  // been signed off as SCHEDULED,
835  // but has not been assigned to a thread yet
836  // (i.e., not running yet)
837  }
838  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
839  }
840  }
841  }
842 
843  //===========================================================================
844 
845  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
846  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
847 
848  int slotCount = -1;
849  for ( auto& slot : m_eventSlots ) {
850  ++slotCount;
851  if ( slot.complete ) continue;
852 
853  outputMS << "[ slot: "
854  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
855  << " event: "
856  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
857  << " ]:\n\n";
858 
859  if ( 0 > iSlot || iSlot == slotCount ) {
860 
861  // Snapshot of the Control Flow and FSM states
862  outputMS << m_precSvc->printState( slot ) << "\n";
863 
864  // Mention sub slots (this is expensive if the number of sub-slots is high)
865  /*if ( !slot.allSubSlots.empty() ) {
866  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
867  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
868  for ( auto& ss : slot.allSubSlots ) {
869  outputMS << "[ slot: " << slotID << " sub-slot entry: " << ss.entryPoint << " event: "
870  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
871  << " ]:\n\n";
872  outputMS << m_precSvc->printState( ss ) << "\n";
873  }
874  }*/
875  }
876  }
877 
878  //===========================================================================
879 
880  if ( 0 <= iSlot ) {
881  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
882  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
883  }
884 
885  outputMS << "\n=========================================================================================\n"
886  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
887  << "=========================================================================================\n\n";
888 
889  info() << outputMS.str() << endmsg;
890 }
891 
892 //---------------------------------------------------------------------------
893 
894 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
895 {
896 
897  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
898 
899  const std::string& algName( index2algname( iAlgo ) );
900  IAlgorithm* ialgoPtr = nullptr;
901  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
902 
903  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
904 
905  ++m_algosInFlight;
906  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
907  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
908  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
909  } );
910  return StatusCode::SUCCESS;
911  };
912 
913  // Avoid to use tbb if the pool size is 1 and run in this thread
914  if ( -100 != m_threadPoolSize ) {
915  // the child task that executes an Algorithm
916  tbb::task* algoTask = new ( tbb::task::allocate_root() )
917  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
918  // schedule the algoTask
919  tbb::task::enqueue( *algoTask );
920 
921  } else {
922  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
923  promote2ExecutedClosure );
924  theTask.execute();
925  }
926 
927  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
928  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
929 
930  // Update states in the appropriate event slot
931  StatusCode updateSc;
932  EventSlot& thisSlot = m_eventSlots[si];
933  if ( eventContext->usesSubSlot() ) {
934  // Sub-slot
935  size_t const subSlotIndex = eventContext->subSlot();
936  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
937  } else {
938  // Event level (standard behaviour)
939  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
940  }
941 
942  ON_VERBOSE dumpSchedulerState( -1 );
943 
944  if ( updateSc.isSuccess() )
945  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
946  return updateSc;
947  } else {
948  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
949  << endmsg;
950  return sc;
951  }
952 }
953 
954 //---------------------------------------------------------------------------
955 
956 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext )
957 {
958 
959  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
960 
961  // bool IOBound = m_precSvc->isBlocking(algName);
962 
963  const std::string& algName( index2algname( iAlgo ) );
964  IAlgorithm* ialgoPtr = nullptr;
965  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
966 
967  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
968 
969  ++m_IOBoundAlgosInFlight;
970  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
971  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
972  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
973  eventContext );
974  } );
975  return StatusCode::SUCCESS;
976  };
977  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
978  // tbb::task)? it seems it works..
979  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
980  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
981  m_IOBoundAlgScheduler->push( *theTask );
982 
983  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
984  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
985 
986  // Update states in the appropriate event slot
987  StatusCode updateSc;
988  EventSlot& thisSlot = m_eventSlots[si];
989  if ( eventContext->usesSubSlot() ) {
990  // Sub-slot
991  size_t const subSlotIndex = eventContext->subSlot();
992  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
993  } else {
994  // Event level (standard behaviour)
995  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
996  }
997 
998  ON_VERBOSE if ( updateSc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algName
999  << " to SCHEDULED on slot " << si << endmsg;
1000  return updateSc;
1001  } else {
1002  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1003  << " on slot " << si << endmsg;
1004  return sc;
1005  }
1006 }
1007 
1008 //---------------------------------------------------------------------------
1009 
1014  EventContext* eventContext )
1015 {
1016  Gaudi::Hive::setCurrentContext( eventContext );
1017  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1018 
1019  if ( sc.isFailure() ) {
1020  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1021  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1022  return StatusCode::FAILURE;
1023  }
1024 
1025  --m_algosInFlight;
1026 
1027  EventSlot& thisSlot = m_eventSlots[si];
1028 
1029  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1030 
1031  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1032  AState state = algstate.execStatus().isSuccess()
1033  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1034  : AState::ERROR;
1035 
1036  // Update states in the appropriate slot
1037  int subSlotIndex = -1;
1038  if ( eventContext->usesSubSlot() ) {
1039  // Sub-slot
1040  subSlotIndex = eventContext->subSlot();
1041  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1042  } else {
1043  // Event level (standard behaviour)
1044  sc = thisSlot.algsStates.set( iAlgo, state );
1045  }
1046 
1047  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "Promoting " << algo->name() << " on slot " << si << " to " << state
1048  << endmsg;
1049 
1050  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1051  << m_algosInFlight << endmsg;
1052 
1053  // Schedule an update of the status of the algorithms
1054  ++m_actionsCounts[si];
1055  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1056  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1057  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1058  } );
1059 
1060  return sc;
1061 }
1062 
1063 //---------------------------------------------------------------------------
1064 
1069  EventContext* eventContext )
1070 {
1071  Gaudi::Hive::setCurrentContext( eventContext );
1072  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1073 
1074  if ( sc.isFailure() ) {
1075  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1076  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1077  return StatusCode::FAILURE;
1078  }
1079 
1080  --m_IOBoundAlgosInFlight;
1081 
1082  EventSlot& thisSlot = m_eventSlots[si];
1083 
1084  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1085  << endmsg;
1086 
1087  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1088  AState state = algstate.execStatus().isSuccess()
1089  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1090  : AState::ERROR;
1091 
1092  // Update states in the appropriate slot
1093  int subSlotIndex = -1;
1094  if ( eventContext->usesSubSlot() ) {
1095  // Sub-slot
1096  subSlotIndex = eventContext->subSlot();
1097  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1098  } else {
1099  // Event level (standard behaviour)
1100  sc = thisSlot.algsStates.set( iAlgo, state );
1101  }
1102 
1103  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si
1104  << " to " << state << endmsg;
1105 
1106  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1107  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1108 
1109  // Schedule an update of the status of the algorithms
1110  ++m_actionsCounts[si];
1111  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1112  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1113  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1114  } );
1115 
1116  return sc;
1117 }
1118 
1119 //---------------------------------------------------------------------------
1120 
1121 // Method to inform the scheduler about event views
1122 
1124  std::unique_ptr<EventContext> viewContext )
1125 {
1126  // Prevent view nesting
1127  if ( sourceContext->usesSubSlot() ) {
1128  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1129  return StatusCode::FAILURE;
1130  }
1131 
1132  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1133 
1134  // It's not possible to create an std::functional from a move-capturing lambda
1135  // So, we have to release the unique pointer
1136  auto action =
1137  [ this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(), &nodeName ]()->StatusCode
1138  {
1139 
1140  // Attach the sub-slot to the top-level slot
1141  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1142 
1143  if ( viewContextPtr ) {
1144  // Re-create the unique pointer
1145  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1146  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1147  return StatusCode::SUCCESS;
1148  } else {
1149  // Disable the view node if there are no views
1150  topSlot.disableSubSlots( nodeName );
1151  return StatusCode::SUCCESS;
1152  }
1153  };
1154 
1155  m_actionsQueue.push( std::move( action ) );
1156 
1157  return StatusCode::SUCCESS;
1158 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:65
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
Definition: EventSlot.h:14
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:747
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
Definition: EventSlot.h:71
StatusCode finalize() override
Definition: Service.cpp:173
ContextID_t slot() const
Definition: EventContext.h:50
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
Definition: EventSlot.h:53
IDataHandleMetadata::AccessMode AccessMode
Definition: DataHandle.h:115
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
bool isSuccess() const
Definition: StatusCode.h:287
void addDataDependency(const DataObjID &key, AccessMode access) final override
Add a data dependency, even after initialization.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T...args)
std::string algorithm
Definition: ITimelineSvc.h:21
void activate()
Activate scheduler.
T end(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:94
bool usesSubSlot() const
Definition: EventContext.h:52
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
Definition: EventContext.h:31
STL class.
bool isFailure() const
Definition: StatusCode.h:139
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
T release(T...args)
T setw(T...args)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:49
STL class.
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
T push_back(T...args)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
T close(T...args)
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T get(T...args)
T insert(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
Iterator begin(State kind)
T any_of(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
string s
Definition: gaudirun.py:253
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
Definition: EventSlot.h:83
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
const DataObjIDColl & dataDependencies(AccessMode access) const final override
Tell which whiteboard keys the algorithm will be reading or writing.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:40
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
Definition: DataObjID.cpp:99
ContextID_t subSlot() const
Definition: EventContext.h:51
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:77
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:79
T reserve(T...args)
#define ON_VERBOSE
Iterator end(State kind)