The Gaudi Framework  v32r2 (46d42edc)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 
3 #include "AlgoExecutionTask.h"
4 #include "IOBoundAlgTask.h"
5 
6 // Framework includes
12 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
13 
14 // C++
15 #include <algorithm>
16 #include <map>
17 #include <queue>
18 #include <sstream>
19 #include <unordered_set>
20 
21 // External libs
22 #include "boost/algorithm/string.hpp"
23 #include "boost/thread.hpp"
24 #include "boost/tokenizer.hpp"
25 // DP waiting for the TBB service
26 #include "tbb/task_scheduler_init.h"
27 
28 // Instantiation of a static factory class used by clients to create instances of this service
30 
31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
33 
34 namespace {
35  struct DataObjIDSorter {
36  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
37  };
38 
39  // Sort a DataObjIDColl in a well-defined, reproducible manner.
40  // Used for making debugging dumps.
41  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
43  v.reserve( coll.size() );
44  for ( const DataObjID& id : coll ) v.push_back( &id );
45  std::sort( v.begin(), v.end(), DataObjIDSorter() );
46  return v;
47  }
48 
49  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
50  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
51  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
52  }
53 } // namespace
54 
55 //---------------------------------------------------------------------------
56 
64 
65  // Initialise mother class (read properties, ...)
67  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
68 
69  // Get hold of the TBBSvc. This should initialize the thread pool
70  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
71  if ( !m_threadPoolSvc.isValid() ) {
72  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
73  return StatusCode::FAILURE;
74  }
75 
76  // Activate the scheduler in another thread.
77  info() << "Activating scheduler in a separate thread" << endmsg;
78  m_thread = std::thread( [this]() { this->activate(); } );
79 
80  while ( m_isActive != ACTIVE ) {
81  if ( m_isActive == FAILURE ) {
82  fatal() << "Terminating initialization" << endmsg;
83  return StatusCode::FAILURE;
84  } else {
85  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
86  sleep( 1 );
87  }
88  }
89 
90  if ( m_enableCondSvc ) {
91  // Get hold of the CondSvc
92  m_condSvc = serviceLocator()->service( "CondSvc" );
93  if ( !m_condSvc.isValid() ) {
94  warning() << "No CondSvc found, or not enabled. "
95  << "Will not manage CondAlgorithms" << endmsg;
96  m_enableCondSvc = false;
97  }
98  }
99 
100  // Get the algo resource pool
101  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
102  if ( !m_algResourcePool.isValid() ) {
103  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
104  return StatusCode::FAILURE;
105  }
106 
107  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
108  if ( !m_algExecStateSvc.isValid() ) {
109  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
110  return StatusCode::FAILURE;
111  }
112 
113  // Get Whiteboard
115  if ( !m_whiteboard.isValid() ) {
116  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
117  return StatusCode::FAILURE;
118  }
119 
120  // Get dedicated scheduler for I/O-bound algorithms
121  if ( m_useIOBoundAlgScheduler ) {
124  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
125  }
126 
127  // Set the MaxEventsInFlight parameters from the number of WB stores
129 
130  // Set the number of free slots
132 
133  // Get the list of algorithms
135  const unsigned int algsNumber = algos.size();
136  if ( algsNumber != 0 ) {
137  info() << "Found " << algsNumber << " algorithms" << endmsg;
138  } else {
139  error() << "No algorithms found" << endmsg;
140  return StatusCode::FAILURE;
141  }
142 
143  /* Dependencies
144  1) Look for handles in algo, if none
145  2) Assume none are required
146  */
147 
148  DataObjIDColl globalInp, globalOutp;
149 
150  // figure out all outputs
151  for ( IAlgorithm* ialgoPtr : algos ) {
152  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
153  if ( !algoPtr ) {
154  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
155  return StatusCode::FAILURE;
156  }
157  for ( auto id : algoPtr->outputDataObjs() ) {
158  auto r = globalOutp.insert( id );
159  if ( !r.second ) {
160  warning() << "multiple algorithms declare " << id
161  << " as output! could be a single instance in multiple paths "
162  "though, or control flow may guarantee only one runs...!"
163  << endmsg;
164  }
165  }
166  }
167 
168  std::ostringstream ostdd;
169  ostdd << "Data Dependencies for Algorithms:";
170 
171  std::map<std::string, DataObjIDColl> algosDependenciesMap;
172  for ( IAlgorithm* ialgoPtr : algos ) {
173  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
174  if ( nullptr == algoPtr ) {
175  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176  << ": this will result in a crash." << endmsg;
177  return StatusCode::FAILURE;
178  }
179 
180  ostdd << "\n " << algoPtr->name();
181 
182  DataObjIDColl algoDependencies;
183  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
184  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
185  DataObjID id = *idp;
186  ostdd << "\n o INPUT " << id;
187  if ( id.key().find( ":" ) != std::string::npos ) {
188  ostdd << " contains alternatives which require resolution...\n";
189  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
190  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
191  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
192  } );
193  if ( itok != tokens.end() ) {
194  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
195  id.updateKey( *itok );
196  } else {
197  error() << "failed to find alternate in global output list"
198  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
199  m_showDataDeps = true;
200  }
201  }
202  algoDependencies.insert( id );
203  globalInp.insert( id );
204  }
205  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
206  ostdd << "\n o OUTPUT " << *id;
207  if ( id->key().find( ":" ) != std::string::npos ) {
208  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
209  << endmsg;
210  m_showDataDeps = true;
211  }
212  }
213  } else {
214  ostdd << "\n none";
215  }
216  algosDependenciesMap[algoPtr->name()] = algoDependencies;
217  }
218 
219  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
263  << endmsg;
264  return StatusCode::FAILURE;
265  }
266 
267  for ( auto& id : unmetDep ) {
268  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
269  << dataLoaderAlg->name() << endmsg;
271  }
272 
273  } else {
274  fatal() << "Auto DataLoading not requested, "
275  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
276  return StatusCode::FAILURE;
277  }
278 
279  } else {
280  info() << "No unmet INPUT data dependencies were found" << endmsg;
281  }
282  }
283 
284  // Get the precedence service
285  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
286  if ( !m_precSvc.isValid() ) {
287  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
291  if ( !precSvc ) {
292  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  // Fill the containers to convert algo names to index
297  m_algname_vect.resize( algsNumber );
298  for ( IAlgorithm* algo : algos ) {
299  const std::string& name = algo->name();
300  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
301  m_algname_index_map[name] = index;
302  m_algname_vect.at( index ) = name;
303  }
304 
305  // Shortcut for the message service
306  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
307  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
308 
310  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
312  m_eventSlots.back().complete = true;
313  }
315 
316  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
317 
318  // Clearly inform about the level of concurrency
319  info() << "Concurrency level information:" << endmsg;
320  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
321  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
322 
324 
326 
327  // Simulate execution flow
329 
330  return sc;
331 }
332 //---------------------------------------------------------------------------
333 
338 
340  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
341 
342  sc = deactivate();
343  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
344 
345  info() << "Joining Scheduler thread" << endmsg;
346  m_thread.join();
347 
348  // Final error check after thread pool termination
349  if ( m_isActive == FAILURE ) {
350  error() << "problems in scheduler thread" << endmsg;
351  return StatusCode::FAILURE;
352  }
353 
354  return sc;
355 }
356 //---------------------------------------------------------------------------
357 
369 
370  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
371 
373  error() << "problems initializing ThreadPoolSvc" << endmsg;
375  return;
376  }
377 
378  // Wait for actions pushed into the queue by finishing tasks.
379  action thisAction;
381 
382  m_isActive = ACTIVE;
383 
384  // Continue to wait if the scheduler is running or there is something to do
385  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
386  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
387  m_actionsQueue.pop( thisAction );
388  sc = thisAction();
389  ON_VERBOSE {
390  if ( sc.isFailure() )
391  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
392  else
393  verbose() << "Action succeeded." << endmsg;
394  }
395  else sc.ignore();
396  }
397 
398  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
400  error() << "Problems terminating thread pool" << endmsg;
402  }
403 }
404 
405 //---------------------------------------------------------------------------
406 
414 
415  if ( m_isActive == ACTIVE ) {
416 
417  // Set the number of slots available to an error code
418  m_freeSlots.store( 0 );
419 
420  // Empty queue
421  action thisAction;
422  while ( m_actionsQueue.try_pop( thisAction ) ) {};
423 
424  // This would be the last action
425  m_actionsQueue.push( [this]() -> StatusCode {
426  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
428  return StatusCode::SUCCESS;
429  } );
430  }
431 
432  return StatusCode::SUCCESS;
433 }
434 
435 //---------------------------------------------------------------------------
436 
437 // Utils and shortcuts
438 
439 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
440 
441 //---------------------------------------------------------------------------
442 
443 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname ) {
444  unsigned int index = m_algname_index_map[algoname];
445  return index;
446 }
447 
448 //---------------------------------------------------------------------------
449 
450 // EventSlot management
458 
459  if ( !eventContext ) {
460  fatal() << "Event context is nullptr" << endmsg;
461  return StatusCode::FAILURE;
462  }
463 
464  if ( m_freeSlots.load() == 0 ) {
465  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
466  return StatusCode::FAILURE;
467  }
468 
469  // no problem as push new event is only called from one thread (event loop manager)
470  --m_freeSlots;
471 
472  auto action = [this, eventContext]() -> StatusCode {
473  // Event processing slot forced to be the same as the wb slot
474  const unsigned int thisSlotNum = eventContext->slot();
475  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
476  if ( !thisSlot.complete ) {
477  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
478  return StatusCode::FAILURE;
479  }
480 
481  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
482  thisSlot.reset( eventContext );
483 
484  // Result status code:
486 
487  // promote to CR and DR the initial set of algorithms
488  Cause cs = {Cause::source::Root, "RootDecisionHub"};
489  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
490  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
491  result = StatusCode::FAILURE;
492  }
493 
494  if ( this->updateStates( thisSlotNum ).isFailure() ) {
495  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
496  result = StatusCode::FAILURE;
497  }
498 
499  return result;
500  }; // end of lambda
501 
502  // Kick off the scheduling!
503  ON_VERBOSE {
504  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
505  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
506  }
507 
508  m_actionsQueue.push( action );
509 
510  return StatusCode::SUCCESS;
511 }
512 
513 //---------------------------------------------------------------------------
514 
516  StatusCode sc;
517  for ( auto context : eventContexts ) {
518  sc = pushNewEvent( context );
519  if ( sc != StatusCode::SUCCESS ) return sc;
520  }
521  return sc;
522 }
523 
524 //---------------------------------------------------------------------------
525 
526 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
527 
528 //---------------------------------------------------------------------------
533 
534  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
535  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
536  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
537  // << " active: " << m_isActive << endmsg;
538  return StatusCode::FAILURE;
539  } else {
540  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
541  // << " active: " << m_isActive << endmsg;
542  m_finishedEvents.pop( eventContext );
543  ++m_freeSlots;
544  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
545  return StatusCode::SUCCESS;
546  }
547 }
548 
549 //---------------------------------------------------------------------------
554 
555  if ( m_finishedEvents.try_pop( eventContext ) ) {
556  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
557  << endmsg;
558  ++m_freeSlots;
559  return StatusCode::SUCCESS;
560  }
561  return StatusCode::FAILURE;
562 }
563 
564 //--------------------------------------------------------------------------
565 // States Management
566 
576 StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index, const int sub_slot,
577  const int source_slot ) {
578 
579  StatusCode global_sc( StatusCode::SUCCESS );
580 
581  // Sort from the oldest to the newest event
582  // Prepare a vector of pointers to the slots to avoid copies
583  std::vector<EventSlot*> eventSlotsPtrs;
584 
585  // Consider all slots if si <0 or just one otherwise
586  if ( si < 0 ) {
587  const int eventsSlotsSize( m_eventSlots.size() );
588  eventSlotsPtrs.reserve( eventsSlotsSize );
589  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
590  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
591  }
592  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
593  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
594  } else {
595  eventSlotsPtrs.push_back( &m_eventSlots[si] );
596  }
597 
598  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
599  int iSlot = thisSlotPtr->eventContext->slot();
600 
601  // Cache the states of the algos to improve readability and performance
602  auto& thisSlot = m_eventSlots[iSlot];
603  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
604 
605  // Perform the I->CR->DR transitions
606  if ( algo_index >= 0 ) {
607  Cause cs = {Cause::source::Task, index2algname( algo_index )};
608 
609  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
610  if ( sub_slot == -1 || iSlot != source_slot ) {
611  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
612  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
613  global_sc = StatusCode::FAILURE;
614  }
615  } else {
616  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
617  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
618  global_sc = StatusCode::FAILURE;
619  }
620  }
621  }
622 
623  StatusCode partial_sc( StatusCode::FAILURE, true );
624 
625  // Perform DR->SCHEDULED
626  if ( !m_optimizationMode.empty() ) {
627  auto comp_nodes = [this]( const uint& i, const uint& j ) {
628  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
629  };
630  std::priority_queue<uint, std::vector<uint>, std::function<bool( const uint&, const uint& )>> buffer(
631  comp_nodes, std::vector<uint>() );
632  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
633  buffer.push( *it );
634  while ( !buffer.empty() ) {
635  bool IOBound = false;
636  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
637 
638  if ( !IOBound )
639  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
640  else
641  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
642 
643  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
644  << "Could not apply transition from " << AState::DATAREADY << " for algorithm "
645  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
646 
647  buffer.pop();
648  }
649 
650  } else {
651  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
652  uint algIndex = *it;
653 
654  bool IOBound = false;
655  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
656 
657  if ( !IOBound )
658  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
659  else
660  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
661 
662  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
663  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
664  << " on processing slot " << iSlot << endmsg;
665  }
666  }
667 
668  // Check for algorithms ready in sub-slots
669  for ( auto& subslot : thisSlot.allSubSlots ) {
670  auto& subslotStates = subslot.algsStates;
671  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
672  uint algIndex{*it};
673  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
674  // The following verbosity is expensive when the number of sub-slots is high
675  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
676  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
677  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
678  }
679  }
680 
681  if ( m_dumpIntraEventDynamics ) {
683  s << ( algo_index != -1 ? index2algname( algo_index ) : "START" ) << ", "
684  << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
685  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
686  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
688  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
689  std::ofstream myfile;
690  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
691  myfile << s.str();
692  myfile.close();
693  }
694 
695  // Not complete because this would mean that the slot is already free!
696  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
697  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
698  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
699  !thisSlot.complete ) {
700 
701  thisSlot.complete = true;
702  // if the event did not fail, add it to the finished events
703  // otherwise it is taken care of in the error handling
704  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
705  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
706  << thisSlot.eventContext->slot() << ")." << endmsg;
707  m_finishedEvents.push( thisSlot.eventContext.release() );
708  }
709 
710  // now let's return the fully evaluated result of the control flow
711  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
712 
713  thisSlot.eventContext.reset( nullptr );
714 
715  } else if ( isStalled( thisSlot ) ) {
716  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
717  eventFailed( thisSlot.eventContext.get() ); // can't release yet
718  }
719  partial_sc.ignore();
720  } // end loop on slots
721 
722  ON_VERBOSE verbose() << "States Updated." << endmsg;
723 
724  return global_sc;
725 }
726 
727 //---------------------------------------------------------------------------
728 
735 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
736 
737  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
738  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
739  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
740 
741  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
742 
743  return true;
744  }
745  return false;
746 }
747 
748 //---------------------------------------------------------------------------
749 
755  const uint slotIdx = eventContext->slot();
756 
757  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
758 
759  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
760 
761  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
763 
764  // Push into the finished events queue the failed context
765  m_eventSlots[slotIdx].complete = true;
766  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
767 }
768 
769 //---------------------------------------------------------------------------
770 
776 
777  // To have just one big message
778  std::ostringstream outputMS;
779 
780  outputMS << "Dumping scheduler state\n"
781  << "=========================================================================================\n"
782  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
783  << "=========================================================================================\n\n";
784 
785  //===========================================================================
786 
787  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
788  << "------------------\n\n";
789 
790  // Figure if TimelineSvc is available (used below to detect threads IDs)
791  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
792  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
793  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
794  } else {
795 
796  // Figure optimal printout layout
797  size_t indt( 0 );
798  for ( auto& slot : m_eventSlots )
799  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
800  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
801 
802  // Figure the last running schedule across all slots
803  for ( auto& slot : m_eventSlots ) {
804  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
805  ++it ) {
806 
807  const std::string algoName{index2algname( *it )};
808 
809  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
810  << slot.eventContext->slot();
811 
812  // Try to get POSIX threads IDs the currently running tasks are scheduled to
813  if ( timelineSvc.isValid() ) {
814  TimelineEvent te{};
815  te.algorithm = algoName;
816  te.slot = slot.eventContext->slot();
817  te.event = slot.eventContext->evt();
818 
819  if ( timelineSvc->getTimelineEvent( te ) )
820  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
821  else
822  outputMS << " thread.id: [unknown]"; // this means a task has just
823  // been signed off as SCHEDULED,
824  // but has not been assigned to a thread yet
825  // (i.e., not running yet)
826  }
827  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
828  }
829  }
830  }
831 
832  //===========================================================================
833 
834  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
835  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
836 
837  int slotCount = -1;
838  for ( auto& slot : m_eventSlots ) {
839  ++slotCount;
840  if ( slot.complete ) continue;
841 
842  outputMS << "[ slot: "
843  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
844  << " event: "
845  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
846  << " ]:\n\n";
847 
848  if ( 0 > iSlot || iSlot == slotCount ) {
849 
850  // Snapshot of the Control Flow and FSM states
851  outputMS << m_precSvc->printState( slot ) << "\n";
852 
853  // Mention sub slots (this is expensive if the number of sub-slots is high)
854  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
855  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
856  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
857  for ( auto& ss : slot.allSubSlots ) {
858  outputMS << "[ slot: " << slotID << ", sub-slot: "
859  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
860  << ", entry: " << ss.entryPoint << ", event: "
861  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
862  << " ]:\n\n";
863  outputMS << m_precSvc->printState( ss ) << "\n";
864  }
865  }
866  }
867  }
868 
869  //===========================================================================
870 
871  if ( 0 <= iSlot ) {
872  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
873  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
874  }
875 
876  outputMS << "\n=========================================================================================\n"
877  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
878  << "=========================================================================================\n\n";
879 
880  info() << outputMS.str() << endmsg;
881 }
882 
883 //---------------------------------------------------------------------------
884 
885 StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
886 
888 
889  const std::string& algName( index2algname( iAlgo ) );
890  IAlgorithm* ialgoPtr = nullptr;
891  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
892 
893  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
894 
895  ++m_algosInFlight;
896  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
897  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
898  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
899  } );
900  return StatusCode::SUCCESS;
901  };
902 
903  // Avoid to use tbb if the pool size is 1 and run in this thread
904  if ( -100 != m_threadPoolSize ) {
905  // the child task that executes an Algorithm
906  tbb::task* algoTask = new ( tbb::task::allocate_root() )
907  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
908  // schedule the algoTask
909  tbb::task::enqueue( *algoTask );
910 
911  } else {
912  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
913  promote2ExecutedClosure );
914  theTask.execute();
915  }
916 
917  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
918  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
919 
920  // Update states in the appropriate event slot
921  StatusCode updateSc;
922  EventSlot& thisSlot = m_eventSlots[si];
923  if ( eventContext->usesSubSlot() ) {
924  // Sub-slot
925  size_t const subSlotIndex = eventContext->subSlot();
926  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
927  } else {
928  // Event level (standard behaviour)
929  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
930  }
931 
933 
934  if ( updateSc.isSuccess() )
935  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
936  return updateSc;
937  } else {
938  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
939  << endmsg;
940  return sc;
941  }
942 }
943 
944 //---------------------------------------------------------------------------
945 
946 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
947 
949 
950  // bool IOBound = m_precSvc->isBlocking(algName);
951 
952  const std::string& algName( index2algname( iAlgo ) );
953  IAlgorithm* ialgoPtr = nullptr;
954  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
955 
956  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
957 
959  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
960  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
961  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
962  eventContext );
963  } );
964  return StatusCode::SUCCESS;
965  };
966  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
967  // tbb::task)? it seems it works..
968 
969  // FIXME - The memory allocation here is causing memory leaks as detected by the gcc leak sanitizer
970  //
971  // clang-format off
972  // Direct leak of 224 byte(s) in 7 object(s) allocated from:
973  // #0 0x7fc0cb524da8 in operator new(unsigned long) /afs/cern.ch/cms/CAF/CMSCOMM/COMM_ECAL/dkonst/GCC/build/contrib/gcc-8.2.0/src/gcc/8.2.0/libsanitizer/lsan/lsan_interceptors.cc:229
974  // #1 0x7fc0ba979f7b in function<AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*)::<lambda()> > /cvmfs/lhcb.cern.ch/lib/lcg/releases/gcc/8.2.0-3fa06/x86_64-centos7/include/c++/8.2.0/bits/std_function.h:249
975  // #2 0x7fc0ba97d181 in AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:969
976  // #3 0x7fc0ba98354d in AvalancheSchedulerSvc::updateStates(int, int, int, int) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:660
977  // clang-format on
978  //
979  // These leaks are currently suppressed in Gaudi/job/Gaudi-LSan.supp - remove entry there to reactivate
980  //
981  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
982  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
983  m_IOBoundAlgScheduler->push( *theTask );
984  //
985  // FIXME
986 
987  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
988  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
989 
990  // Update states in the appropriate event slot
991  StatusCode updateSc;
992  EventSlot& thisSlot = m_eventSlots[si];
993  if ( eventContext->usesSubSlot() ) {
994  // Sub-slot
995  size_t const subSlotIndex = eventContext->subSlot();
996  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
997  } else {
998  // Event level (standard behaviour)
999  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
1000  }
1001 
1002  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
1003  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
1004  return updateSc;
1005  } else {
1006  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1007  << " on slot " << si << endmsg;
1008  return sc;
1009  }
1010 }
1011 
1012 //---------------------------------------------------------------------------
1013 
1018  EventContext* eventContext ) {
1019  Gaudi::Hive::setCurrentContext( eventContext );
1020  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1021 
1022  if ( sc.isFailure() ) {
1023  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1024  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1025  return StatusCode::FAILURE;
1026  }
1027 
1028  --m_algosInFlight;
1029 
1030  EventSlot& thisSlot = m_eventSlots[si];
1031 
1032  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1033 
1034  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1035  AState state = algstate.execStatus().isSuccess()
1036  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1037  : AState::ERROR;
1038 
1039  // Update states in the appropriate slot
1040  int subSlotIndex = -1;
1041  if ( eventContext->usesSubSlot() ) {
1042  // Sub-slot
1043  subSlotIndex = eventContext->subSlot();
1044  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1045  } else {
1046  // Event level (standard behaviour)
1047  sc = thisSlot.algsStates.set( iAlgo, state );
1048  }
1049 
1050  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1051  << "Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1052 
1053  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1054  << m_algosInFlight << endmsg;
1055 
1056  // Schedule an update of the status of the algorithms
1057  ++m_actionsCounts[si];
1058  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1059  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1060  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1061  } );
1062 
1063  return sc;
1064 }
1065 
1066 //---------------------------------------------------------------------------
1067 
1072  EventContext* eventContext ) {
1073  Gaudi::Hive::setCurrentContext( eventContext );
1074  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1075 
1076  if ( sc.isFailure() ) {
1077  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1078  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1079  return StatusCode::FAILURE;
1080  }
1081 
1083 
1084  EventSlot& thisSlot = m_eventSlots[si];
1085 
1086  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1087  << endmsg;
1088 
1089  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1090  AState state = algstate.execStatus().isSuccess()
1091  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1092  : AState::ERROR;
1093 
1094  // Update states in the appropriate slot
1095  int subSlotIndex = -1;
1096  if ( eventContext->usesSubSlot() ) {
1097  // Sub-slot
1098  subSlotIndex = eventContext->subSlot();
1099  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1100  } else {
1101  // Event level (standard behaviour)
1102  sc = thisSlot.algsStates.set( iAlgo, state );
1103  }
1104 
1105  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1106  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1107 
1108  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1109  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1110 
1111  // Schedule an update of the status of the algorithms
1112  ++m_actionsCounts[si];
1113  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1114  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1115  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1116  } );
1117 
1118  return sc;
1119 }
1120 
1121 //---------------------------------------------------------------------------
1122 
1123 // Method to inform the scheduler about event views
1124 
1126  std::unique_ptr<EventContext> viewContext ) {
1127  // Prevent view nesting
1128  if ( sourceContext->usesSubSlot() ) {
1129  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1130  return StatusCode::FAILURE;
1131  }
1132 
1133  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1134 
1135  // It's not possible to create an std::functional from a move-capturing lambda
1136  // So, we have to release the unique pointer
1137  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1138  &nodeName]() -> StatusCode {
1139  // Attach the sub-slot to the top-level slot
1140  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1141 
1142  if ( viewContextPtr ) {
1143  // Re-create the unique pointer
1144  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1145  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1146  return StatusCode::SUCCESS;
1147  } else {
1148  // Disable the view node if there are no views
1149  topSlot.disableSubSlots( nodeName );
1150  return StatusCode::SUCCESS;
1151  }
1152  };
1153 
1154  m_actionsQueue.push( std::move( action ) );
1155 
1156  return StatusCode::SUCCESS;
1157 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
Definition: Service.cpp:60
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
Class representing an event slot.
Definition: EventSlot.h:14
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:41
T empty(T... args)
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
T open(T... args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
ContextEvt_t evt() const
Definition: EventContext.h:40
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:68
StatusCode finalize() override
Definition: Service.cpp:164
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:51
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
T to_string(T... args)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
T pop(T... args)
T top(T... args)
std::string algorithm
Definition: ITimelineSvc.h:21
bool filterPassed() const
void activate()
Activate scheduler.
T end(T... args)
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
T push(T... args)
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
STL class.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
std::string fullKey() const
Definition: DataObjID.cpp:88
T release(T... args)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
T setw(T... args)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T... args)
tbb::task * execute() override
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T... args)
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:274
unsigned int m_algosInFlight
Number of algorithms presently in flight.
T push_back(T... args)
STL class.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
T join(T... args)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T close(T... args)
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
T str(T... args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
bool isSuccess() const
Definition: StatusCode.h:267
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
T max(T... args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T... args)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
ContextID_t subSlot() const
Definition: EventContext.h:42
T get(T... args)
T insert(T... args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
T find_if(T... args)
T size(T... args)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:153
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
T assign(T... args)
STL class.
Gaudi::Property< bool > m_simulateExecution
T begin(T... args)
Iterator begin(State kind)
T any_of(T... args)
Gaudi::Property< bool > m_enableCondSvc
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
virtual uint getPriority(const std::string &) const =0
Get task priority.
T back(T... args)
string s
Definition: gaudirun.py:318
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T... args)
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
T sort(T... args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool isFailure() const
Definition: StatusCode.h:130
virtual StatusCode push(IAlgTask &task)=0
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:63
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:645
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
T reserve(T... args)
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE
Iterator end(State kind)