The Gaudi Framework  master (d98a2936)
AvalancheSchedulerSvc Class Reference

#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void dumpState () override
 Dump scheduler state for all slots. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
virtual void recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
 Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result. More...
 
bool next (TaskSpec &ts, bool asynchronous)
 
- Public Member Functions inherited from extends< Service, IScheduler >
void const * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::string & name () const override
 Retrieve name of the service
More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service
More...
 
StatusCode sysStart () override
 Initialize Service
More...
 
StatusCode sysStop () override
 Initialize Service
More...
 
StatusCode sysFinalize () override
 Finalize Service
More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor
More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator
More...
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, const std::string &toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T >
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
 requires (!Gaudi::Details::is_gaudi_property_v< TYPE >) Gaudi
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property. More...
 
virtual StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p)=0
 Set the property from a property with a different name. More...
 
virtual StatusCode setProperty (const std::string &s)=0
 Set the property by string. More...
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals. More...
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string. More...
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name More...
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name. More...
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected More...
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

StatusCode dumpGraphFile (const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
 
void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::string & index2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events. More...
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask. More...
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

std::chrono::duration< int64_t, std::milli > m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
 
std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now()
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< int > m_maxParallelismExtra
 
Gaudi::Property< std::string > m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::string > m_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< int > m_numOffloadThreads
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< bool > m_checkOutput
 
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
 
Gaudi::Property< std::string > m_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
 
Gaudi::Property< std::string > m_dataDepsGraphFile
 
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
 
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
 
std::atomic< ActivationStatem_isActive { INACTIVE }
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::string > m_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots { 0 }
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledAsynchronousQueue
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate { true }
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
tbb::task_arena * m_arena { nullptr }
 
std::unique_ptr< FiberManagerm_fiberManager { nullptr }
 
size_t m_maxEventsInFlight { 0 }
 
size_t m_maxAlgosInFlight { 1 }
 

Friends

class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::Property< int > m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" }
 flag indicating whether ToolHandle tools have been added to m_tools More...
 
Gaudi::Property< bool > m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" }
 
Gaudi::Property< bool > m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" }
 
Gaudi::Property< bool > m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" }
 
Gaudi::Property< bool > m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" }
 
Gaudi::Property< bool > m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" }
 
Gaudi::Property< bool > m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" }
 
Gaudi::Property< bool > m_autoRetrieveTools
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service
More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 113 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

using AvalancheSchedulerSvc::action = std::function<StatusCode()>
private

Definition at line 160 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 162 of file AvalancheSchedulerSvc.h.

162 { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 458 of file AvalancheSchedulerSvc.cpp.

458  {
459 
460  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
461 
462  if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
463  error() << "problems initializing ThreadPoolSvc" << endmsg;
465  return;
466  }
467 
468  // Wait for actions pushed into the queue by finishing tasks.
469  action thisAction;
471 
472  m_isActive = ACTIVE;
473 
474  // Continue to wait if the scheduler is running or there is something to do
475  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
476  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
477  m_actionsQueue.pop( thisAction );
478  sc = thisAction();
479  ON_VERBOSE {
480  if ( sc.isFailure() )
481  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
482  else
483  verbose() << "Action succeeded." << endmsg;
484  }
485  else sc.ignore();
486 
487  // If all queued actions have been processed, update the slot states
488  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
489  sc = iterate();
490  ON_VERBOSE {
491  if ( sc.isFailure() )
492  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
493  else
494  verbose() << "Iteration succeeded." << endmsg;
495  }
496  else sc.ignore();
497  }
498  }
499 
500  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
501  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
502  error() << "Problems terminating thread pool" << endmsg;
504  }
505 }

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string &  algoname)
inlineprivate

Convert a name to an integer.

Definition at line 251 of file AvalancheSchedulerSvc.h.

251 { return m_algname_index_map[algoname]; }

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 515 of file AvalancheSchedulerSvc.cpp.

515  {
516 
517  if ( m_isActive == ACTIVE ) {
518 
519  // Set the number of slots available to an error code
520  m_freeSlots.store( 0 );
521 
522  // Empty queue
523  action thisAction;
524  while ( m_actionsQueue.try_pop( thisAction ) ) {};
525 
526  // This would be the last action
527  m_actionsQueue.push( [this]() -> StatusCode {
528  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
530  return StatusCode::SUCCESS;
531  } );
532  }
533 
534  return StatusCode::SUCCESS;
535 }

◆ dumpGraphFile()

StatusCode AvalancheSchedulerSvc::dumpGraphFile ( const std::map< std::string, DataObjIDColl > &  inDeps,
const std::map< std::string, DataObjIDColl > &  outDeps 
) const
private

Definition at line 1178 of file AvalancheSchedulerSvc.cpp.

1179  {
1180  // Both maps should have the same algorithm entries
1181  assert( inDeps.size() == outDeps.size() );
1182 
1184  info() << "Dumping data dependencies graph to file: " << g.fileName() << endmsg;
1185 
1186  // define algs and objects
1187  std::set<std::size_t> definedObjects;
1188 
1189  // Regex for selection of algs and objects
1190  std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1191  std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1192 
1193  // inDeps and outDeps should have the same entries
1194  std::size_t algoIndex = 0ul;
1195  for ( const auto& [algName, ideps] : inDeps ) {
1196  if ( not std::regex_search( algName, algNameRegex ) ) continue;
1197  std::string algIndex = "Alg_" + std::to_string( algoIndex );
1198  g.addNode( algName, algIndex );
1199 
1200  // inputs
1201  for ( const auto& dep : ideps ) {
1202  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1203 
1204  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205  std::string objIndex = "obj_" + std::to_string( dep.hash() );
1206  if ( inserted ) g.addNode( dep.key(), objIndex );
1207 
1208  g.addEdge( dep.key(), objIndex, algName, algIndex );
1209  } // loop on ideps
1210 
1211  const auto& odeps = outDeps.at( algName );
1212  for ( const auto& dep : odeps ) {
1213  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1214 
1215  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216  std::string objIndex = "obj_" + std::to_string( dep.hash() );
1217  if ( inserted ) g.addNode( dep.key(), objIndex );
1218 
1219  g.addEdge( algName, algIndex, dep.key(), objIndex );
1220  } // loop on odeps
1221 
1222  ++algoIndex;
1223  } // loop on inDeps
1224 
1225  return StatusCode::SUCCESS;
1226 }

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 880 of file AvalancheSchedulerSvc.cpp.

880  {
881 
882  // To have just one big message
883  std::ostringstream outputMS;
884 
885  outputMS << "Dumping scheduler state\n"
886  << "=========================================================================================\n"
887  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888  << "=========================================================================================\n\n";
889 
890  //===========================================================================
891 
892  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893  << "------------------\n\n";
894 
895  // Figure if TimelineSvc is available (used below to detect threads IDs)
896  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
897  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
899  } else {
900 
901  // Figure optimal printout layout
902  size_t indt( 0 );
903  for ( auto& slot : m_eventSlots ) {
904 
905  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906  for ( uint algIndex : schedAlgs ) {
907  if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
908  }
909  }
910 
911  // Figure the last running schedule across all slots
912  for ( auto& slot : m_eventSlots ) {
913 
914  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915  for ( uint algIndex : schedAlgs ) {
916 
917  const std::string& algoName{ index2algname( algIndex ) };
918 
919  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
920  << slot.eventContext->slot();
921 
922  // Try to get POSIX threads IDs the currently running tasks are scheduled to
923  if ( timelineSvc.isValid() ) {
924  TimelineEvent te{};
925  te.algorithm = algoName;
926  te.slot = slot.eventContext->slot();
927  te.event = slot.eventContext->evt();
928 
929  if ( timelineSvc->getTimelineEvent( te ) )
930  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
931  else
932  outputMS << " thread.id: [unknown]"; // this means a task has just
933  // been signed off as SCHEDULED,
934  // but has not been assigned to a thread yet
935  // (i.e., not running yet)
936  }
937  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
938  }
939  }
940  }
941 
942  //===========================================================================
943 
944  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
945  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
946 
947  int slotCount = -1;
948  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949  subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
950  : false;
951 
952  for ( auto& slot : m_eventSlots ) {
953  ++slotCount;
954  if ( slot.complete ) continue;
955 
956  outputMS << "[ slot: "
957  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
958  << ", event: "
959  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
960 
961  if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
962  outputMS << " ]:\n\n";
963 
964  if ( 0 > iSlot || iSlot == slotCount ) {
965 
966  // If an alg has thrown an error then it's not a failure of the CF/DF graph
967  if ( wasAlgError ) {
968  outputMS << "ERROR alg(s):";
969  int errorCount = 0;
970  const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971  for ( uint algIndex : errorAlgs ) {
972  outputMS << " " << index2algname( algIndex );
973  ++errorCount;
974  }
975  if ( errorCount == 0 ) outputMS << " in subslot(s)";
976  outputMS << "\n\n";
977  } else {
978  // Snapshot of the Control Flow and FSM states
979  outputMS << m_precSvc->printState( slot ) << "\n";
980  }
981 
982  // Mention sub slots (this is expensive if the number of sub-slots is high)
983  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
985  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
986  for ( auto& ss : slot.allSubSlots ) {
987  outputMS << "[ slot: " << slotID << ", sub-slot: "
988  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
989  << ", entry: " << ss.entryPoint << ", event: "
990  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
991  << " ]:\n\n";
992  if ( wasAlgError ) {
993  outputMS << "ERROR alg(s):";
994  const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995  for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
996  outputMS << "\n\n";
997  } else {
998  // Snapshot of the Control Flow and FSM states in sub slot
999  outputMS << m_precSvc->printState( ss ) << "\n";
1000  }
1001  }
1002  }
1003  }
1004  }
1005 
1006  //===========================================================================
1007 
1008  if ( 0 <= iSlot && !wasAlgError ) {
1009  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1011  }
1012 
1013  outputMS << "\n=========================================================================================\n"
1014  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015  << "=========================================================================================\n\n";
1016 
1017  info() << outputMS.str() << endmsg;
1018 }

◆ dumpState()

void AvalancheSchedulerSvc::dumpState ( )
override

Dump scheduler state for all slots.

Definition at line 619 of file AvalancheSchedulerSvc.cpp.

619 { dumpSchedulerState( -1 ); }

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 859 of file AvalancheSchedulerSvc.cpp.

859  {
860  const uint slotIdx = eventContext->slot();
861 
862  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
863 
864  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
865 
866  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
867  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
868 
869  // Push into the finished events queue the failed context
870  m_eventSlots[slotIdx].complete = true;
871  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
872 }

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 424 of file AvalancheSchedulerSvc.cpp.

424  {
425 
427  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
428 
429  sc = deactivate();
430  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
431 
432  debug() << "Deleting FiberManager" << endmsg;
433  m_fiberManager.reset();
434 
435  info() << "Joining Scheduler thread" << endmsg;
436  m_thread.join();
437 
438  // Final error check after thread pool termination
439  if ( m_isActive == FAILURE ) {
440  error() << "problems in scheduler thread" << endmsg;
441  return StatusCode::FAILURE;
442  }
443 
444  return sc;
445 }

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 615 of file AvalancheSchedulerSvc.cpp.

615 { return std::max( m_freeSlots.load(), 0 ); }

◆ index2algname()

const std::string& AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 257 of file AvalancheSchedulerSvc.h.

257 { return m_algname_vect[index]; }

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 78 of file AvalancheSchedulerSvc.cpp.

78  {
79 
80  // Initialise mother class (read properties, ...)
82  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83 
84  // Get hold of the TBBSvc. This should initialize the thread pool
85  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86  if ( !m_threadPoolSvc.isValid() ) {
87  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
88  return StatusCode::FAILURE;
89  }
90  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
91  if ( !castTPS ) {
92  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
93  return StatusCode::FAILURE;
94  }
95  m_arena = castTPS->getArena();
96  if ( !m_arena ) {
97  fatal() << "Cannot find valid TBB task_arena" << endmsg;
98  return StatusCode::FAILURE;
99  }
100 
101  // Activate the scheduler in another thread.
102  info() << "Activating scheduler in a separate thread" << endmsg;
103  std::binary_semaphore fiber_manager_initalized{ 0 };
104  m_thread = std::thread( [this, &fiber_manager_initalized]() {
105  // Initialize FiberManager
106  this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
107  fiber_manager_initalized.release();
108  this->activate();
109  } );
110  // Wait for initialization to complete
111  fiber_manager_initalized.acquire();
112 
113  while ( m_isActive != ACTIVE ) {
114  if ( m_isActive == FAILURE ) {
115  fatal() << "Terminating initialization" << endmsg;
116  return StatusCode::FAILURE;
117  } else {
118  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
119  sleep( 1 );
120  }
121  }
122 
123  if ( m_enableCondSvc ) {
124  // Get hold of the CondSvc
125  m_condSvc = serviceLocator()->service( "CondSvc" );
126  if ( !m_condSvc.isValid() ) {
127  warning() << "No CondSvc found, or not enabled. "
128  << "Will not manage CondAlgorithms" << endmsg;
129  m_enableCondSvc = false;
130  }
131  }
132 
133  // Get the algo resource pool
134  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
135  if ( !m_algResourcePool.isValid() ) {
136  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
137  return StatusCode::FAILURE;
138  }
139 
140  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
141  if ( !m_algExecStateSvc.isValid() ) {
142  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
143  return StatusCode::FAILURE;
144  }
145 
146  // Get Whiteboard
148  if ( !m_whiteboard.isValid() ) {
149  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
150  return StatusCode::FAILURE;
151  }
152 
153  // Set the MaxEventsInFlight parameters from the number of WB stores
154  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
155 
156  // Set the number of free slots
158 
159  // Get the list of algorithms
160  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
161  const unsigned int algsNumber = algos.size();
162  if ( algsNumber != 0 ) {
163  info() << "Found " << algsNumber << " algorithms" << endmsg;
164  } else {
165  error() << "No algorithms found" << endmsg;
166  return StatusCode::FAILURE;
167  }
168 
169  /* Dependencies
170  1) Look for handles in algo, if none
171  2) Assume none are required
172  */
173 
174  DataObjIDColl globalInp, globalOutp;
175 
176  // figure out all outputs
177  std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
178  for ( IAlgorithm* ialgoPtr : algos ) {
179  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180  if ( !algoPtr ) {
181  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
182  return StatusCode::FAILURE;
183  }
184 
185  DataObjIDColl algoOutputs;
186  for ( auto id : algoPtr->outputDataObjs() ) {
187  globalOutp.insert( id );
188  algoOutputs.insert( id );
189  }
190  algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
191  }
192 
193  std::ostringstream ostdd;
194  ostdd << "Data Dependencies for Algorithms:";
195 
196  std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
197  for ( IAlgorithm* ialgoPtr : algos ) {
198  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
199  if ( nullptr == algoPtr ) {
200  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201  << ": this will result in a crash." << endmsg;
202  return StatusCode::FAILURE;
203  }
204 
205  DataObjIDColl i1, i2;
206  DHHVisitor avis( i1, i2 );
207  algoPtr->acceptDHVisitor( &avis );
208 
209  ostdd << "\n " << algoPtr->name();
210 
211  auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
212  auto owners = avis.owners_names_of( id );
213  if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
214  };
215 
216  DataObjIDColl algoDependencies;
217  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
218  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
219  DataObjID id = *idp;
220  ostdd << "\n o INPUT " << id;
221  write_owners( id );
222  algoDependencies.insert( id );
223  globalInp.insert( id );
224  }
225  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
226  ostdd << "\n o OUTPUT " << *id;
227  write_owners( *id );
228  if ( id->key().find( ":" ) != std::string::npos ) {
229  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
230  << endmsg;
231  m_showDataDeps = true;
232  }
233  }
234  } else {
235  ostdd << "\n none";
236  }
237  algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
238  }
239 
240  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
241 
242  // If requested, dump a graph of the data dependencies in a .dot or .md file
243  if ( not m_dataDepsGraphFile.empty() ) {
244  if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
245  return StatusCode::FAILURE;
246  }
247  }
248 
249  // Check if we have unmet global input dependencies, and, optionally, heal them
250  // WARNING: this step must be done BEFORE the Precedence Service is initialized
251  DataObjIDColl unmetDepInp, unusedOutp;
252  if ( m_checkDeps || m_checkOutput ) {
253  std::set<std::string> requiredInputKeys;
254  for ( auto o : globalInp ) {
255  // track aliases
256  // (assuming there should be no items with different class and same key corresponding to different objects)
257  requiredInputKeys.insert( o.key() );
258  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
259  }
260  if ( m_checkOutput ) {
261  for ( auto o : globalOutp ) {
262  if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
263  // check ignores
264  bool ignored{};
265  for ( const std::string& algoName : m_checkOutputIgnoreList ) {
266  auto it = algosOutputDependenciesMap.find( algoName );
267  if ( it != algosOutputDependenciesMap.end() ) {
268  if ( it->second.find( o ) != it->second.end() ) {
269  ignored = true;
270  break;
271  }
272  }
273  }
274  if ( !ignored ) { unusedOutp.insert( o ); }
275  }
276  }
277  }
278  }
279 
280  if ( m_checkDeps ) {
281  if ( unmetDepInp.size() > 0 ) {
282 
283  auto printUnmet = [&]( auto msg ) {
284  for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285  msg << " o " << *o << " required by Algorithm: " << endmsg;
286 
287  for ( const auto& p : algosInputDependenciesMap )
288  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
289  }
290  };
291 
292  if ( !m_useDataLoader.empty() ) {
293 
294  // Find the DataLoader Alg
295  IAlgorithm* dataLoaderAlg( nullptr );
296  for ( IAlgorithm* algo : algos )
297  if ( m_useDataLoader == algo->name() ) {
298  dataLoaderAlg = algo;
299  break;
300  }
301 
302  if ( dataLoaderAlg == nullptr ) {
303  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
304  << "\" found, and unmet INPUT dependencies "
305  << "detected:" << endmsg;
306  printUnmet( fatal() );
307  return StatusCode::FAILURE;
308  }
309 
310  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
311  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
312  printUnmet( info() );
313 
314  // Set the property Load of DataLoader Alg
315  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
316  if ( !dataAlg ) {
317  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
318  << endmsg;
319  return StatusCode::FAILURE;
320  }
321 
322  for ( auto& id : unmetDepInp ) {
323  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
324  << dataLoaderAlg->name() << endmsg;
326  }
327 
328  } else {
329  fatal() << "Auto DataLoading not requested, "
330  << "and the following unmet INPUT dependencies were found:" << endmsg;
331  printUnmet( fatal() );
332  return StatusCode::FAILURE;
333  }
334 
335  } else {
336  info() << "No unmet INPUT data dependencies were found" << endmsg;
337  }
338  }
339 
340  if ( m_checkOutput ) {
341  if ( unusedOutp.size() > 0 ) {
342 
343  auto printUnusedOutp = [&]( auto msg ) {
344  for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345  msg << " o " << *o << " produced by Algorithm: " << endmsg;
346 
347  for ( const auto& p : algosOutputDependenciesMap )
348  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
349  }
350  };
351 
352  fatal() << "The following unused OUTPUT items were found:" << endmsg;
353  printUnusedOutp( fatal() );
354  return StatusCode::FAILURE;
355  } else {
356  info() << "No unused OUTPUT items were found" << endmsg;
357  }
358  }
359 
360  // Get the precedence service
361  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
362  if ( !m_precSvc.isValid() ) {
363  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
364  return StatusCode::FAILURE;
365  }
366  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
367  if ( !precSvc ) {
368  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
369  return StatusCode::FAILURE;
370  }
371 
372  // Fill the containers to convert algo names to index
373  m_algname_vect.resize( algsNumber );
374  for ( IAlgorithm* algo : algos ) {
375  const std::string& name = algo->name();
376  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
378  m_algname_vect.at( index ) = name;
379  }
380 
381  // Shortcut for the message service
382  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
383  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
384 
386  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
387  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
388  m_eventSlots.back().complete = true;
389  }
390 
391  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
392 
393  // Clearly inform about the level of concurrency
394  info() << "Concurrency level information:" << endmsg;
395  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
396  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
397  info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
398 
399  // Inform about task scheduling prescriptions
400  info() << "Task scheduling settings:" << endmsg;
401  info() << " o Avalanche generation mode: "
402  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
403  info() << " o Preemptive scheduling of CPU-blocking tasks: "
405  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
406  : "disabled" )
407  << endmsg;
408  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
409 
410  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
411 
412  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
413 
414  // Simulate execution flow
415  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
416 
417  return sc;
418 }

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 841 of file AvalancheSchedulerSvc.cpp.

841  {
842 
843  if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844  !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845 
846  error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
847 
848  return true;
849  }
850  return false;
851 }

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 666 of file AvalancheSchedulerSvc.cpp.

666  {
667 
668  StatusCode global_sc( StatusCode::SUCCESS );
669 
670  // Retry algorithms
671  const size_t retries = m_retryQueue.size();
672  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
673  TaskSpec retryTS = std::move( m_retryQueue.front() );
674  m_retryQueue.pop();
675  global_sc = schedule( std::move( retryTS ) );
676  }
677 
678  // Loop over all slots
679  OccupancySnapshot nextSnap;
680  auto now = std::chrono::system_clock::now();
681  for ( EventSlot& thisSlot : m_eventSlots ) {
682 
683  // Ignore slots without a valid context (relevant when populating scheduler for first time)
684  if ( !thisSlot.eventContext ) continue;
685 
686  int iSlot = thisSlot.eventContext->slot();
687 
688  // Cache the states of the algorithms to improve readability and performance
689  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
690 
691  StatusCode partial_sc = StatusCode::FAILURE;
692 
693  // Make an occupancy snapshot
694  if ( m_snapshotInterval != std::chrono::duration<int64_t, std::milli>::min() &&
696 
697  // Initialise snapshot
698  if ( nextSnap.states.empty() ) {
699  nextSnap.time = now;
700  nextSnap.states.resize( m_eventSlots.size() );
701  }
702 
703  // Store alg states
704  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705  slotStateTotals.resize( AState::MAXVALUE );
706  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
708  }
709 
710  // Add subslot alg states
711  for ( auto& subslot : thisSlot.allSubSlots ) {
712  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
714  }
715  }
716  }
717 
718  // Perform DR->SCHEDULED
719  const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
720  for ( uint algIndex : drAlgs ) {
721  const std::string& algName{ index2algname( algIndex ) };
722  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
723  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
724 
725  partial_sc =
726  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
727 
728  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
729  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
730  << " on processing slot " << iSlot << endmsg;
731  }
732 
733  // Check for algorithms ready in sub-slots
734  for ( auto& subslot : thisSlot.allSubSlots ) {
735  const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736  for ( uint algIndex : drAlgsSubSlot ) {
737  const std::string& algName{ index2algname( algIndex ) };
738  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
739  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
740  partial_sc =
741  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
742  }
743  }
744 
745  if ( m_dumpIntraEventDynamics ) {
746  std::stringstream s;
747  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
748  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
749  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
750  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
751  : std::to_string( std::thread::hardware_concurrency() );
752  std::ofstream myfile;
753  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
754  myfile << s.str();
755  myfile.close();
756  }
757 
758  // Not complete because this would mean that the slot is already free!
759  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
760  !thisSlot.algsStates.containsAny(
761  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762  !subSlotAlgsInStates( thisSlot,
763  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764  !thisSlot.complete ) {
765 
766  thisSlot.complete = true;
767  // if the event did not fail, add it to the finished events
768  // otherwise it is taken care of in the error handling
769  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
770  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
771  << thisSlot.eventContext->slot() << ")." << endmsg;
772  m_finishedEvents.push( thisSlot.eventContext.release() );
773  }
774 
775  // now let's return the fully evaluated result of the control flow
776  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
777 
778  thisSlot.eventContext.reset( nullptr );
779 
780  } else if ( isStalled( thisSlot ) ) {
781  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
782  eventFailed( thisSlot.eventContext.get() ); // can't release yet
783  }
784  partial_sc.ignore();
785  } // end loop on slots
786 
787  // Process snapshot
788  if ( !nextSnap.states.empty() ) {
789  m_lastSnapshot = nextSnap.time;
790  m_snapshotCallback( std::move( nextSnap ) );
791  }
792 
793  ON_VERBOSE verbose() << "Iteration done." << endmsg;
794  m_needsUpdate.store( false );
795  return global_sc;
796 }

◆ next()

bool AvalancheSchedulerSvc::next ( TaskSpec ts,
bool  asynchronous 
)
inline

Definition at line 376 of file AvalancheSchedulerSvc.h.

376  {
377  if ( asynchronous ) { return m_scheduledAsynchronousQueue.try_pop( ts ); }
378  return m_scheduledQueue.try_pop( ts );
379  }

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 625 of file AvalancheSchedulerSvc.cpp.

625  {
626 
627  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
628  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
629  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
630  // << " active: " << m_isActive << endmsg;
631  return StatusCode::FAILURE;
632  } else {
633  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
634  // << " active: " << m_isActive << endmsg;
635  m_finishedEvents.pop( eventContext );
636  ++m_freeSlots;
637  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
638  return StatusCode::SUCCESS;
639  }
640 }

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 546 of file AvalancheSchedulerSvc.cpp.

546  {
547 
548  if ( !eventContext ) {
549  fatal() << "Event context is nullptr" << endmsg;
550  return StatusCode::FAILURE;
551  }
552 
553  if ( m_freeSlots.load() == 0 ) {
554  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
555  return StatusCode::FAILURE;
556  }
557 
558  // no problem as push new event is only called from one thread (event loop manager)
559  --m_freeSlots;
560 
561  auto action = [this, eventContext]() -> StatusCode {
562  // Event processing slot forced to be the same as the wb slot
563  const unsigned int thisSlotNum = eventContext->slot();
564  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
565  if ( !thisSlot.complete ) {
566  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
567  return StatusCode::FAILURE;
568  }
569 
570  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
571  thisSlot.reset( eventContext );
572 
573  // Result status code:
575 
576  // promote to CR and DR the initial set of algorithms
577  Cause cs = { Cause::source::Root, "RootDecisionHub" };
578  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
580  result = StatusCode::FAILURE;
581  }
582 
583  if ( this->iterate().isFailure() ) {
584  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
585  result = StatusCode::FAILURE;
586  }
587 
588  return result;
589  }; // end of lambda
590 
591  // Kick off scheduling
592  ON_VERBOSE {
593  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
594  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
595  }
596 
597  m_actionsQueue.push( action );
598 
599  return StatusCode::SUCCESS;
600 }

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 604 of file AvalancheSchedulerSvc.cpp.

604  {
605  StatusCode sc;
606  for ( auto context : eventContexts ) {
607  sc = pushNewEvent( context );
608  if ( sc != StatusCode::SUCCESS ) return sc;
609  }
610  return sc;
611 }

◆ recordOccupancy()

void AvalancheSchedulerSvc::recordOccupancy ( int  samplePeriod,
std::function< void(OccupancySnapshot)>  callback 
)
overridevirtual

Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.

Definition at line 1163 of file AvalancheSchedulerSvc.cpp.

1163  {
1164 
1165  auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1166  if ( samplePeriod < 0 ) {
1167  this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
1168  } else {
1169  this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1170  m_snapshotCallback = std::move( callback );
1171  }
1172  return StatusCode::SUCCESS;
1173  };
1174 
1175  m_actionsQueue.push( std::move( action ) );
1176 }

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int  iAlgo,
EventContext contextPtr,
AState  state,
bool  iterate = false 
)
private

Definition at line 800 of file AvalancheSchedulerSvc.cpp.

800  {
801  StatusCode sc;
802  auto slotIndex = contextPtr->slot();
803  EventSlot& slot = m_eventSlots[slotIndex];
804  Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
805 
806  if ( contextPtr->usesSubSlot() ) {
807  // Sub-slot
808  auto subSlotIndex = contextPtr->subSlot();
809  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
810 
811  sc = subSlot.algsStates.set( iAlgo, state );
812 
813  if ( sc.isSuccess() ) {
814  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
815  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
816  // Revise states of algorithms downstream the precedence graph
817  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
818  }
819  } else {
820  // Event level (standard behaviour)
821  sc = slot.algsStates.set( iAlgo, state );
822 
823  if ( sc.isSuccess() ) {
824  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
825  << ", event:" << contextPtr->evt() << "]" << endmsg;
826  // Revise states of algorithms downstream the precedence graph
827  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
828  }
829  }
830  return sc;
831 }

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec &&  ts)
private

Definition at line 1022 of file AvalancheSchedulerSvc.cpp.

1022  {
1023 
1024  // Check if a free Algorithm instance is available
1025  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1026 
1027  // If an instance is available, proceed to scheduling
1028  StatusCode sc;
1029  if ( getAlgSC.isSuccess() ) {
1030 
1031  // Decide how to schedule the task and schedule it
1032  if ( -100 != m_threadPoolSize ) {
1033 
1034  // Cache values before moving the TaskSpec further
1035  unsigned int algIndex{ ts.algIndex };
1036  std::string_view algName( ts.algName );
1037  unsigned int algRank{ ts.algRank };
1038  bool asynchronous{ ts.asynchronous };
1039  int slotIndex{ ts.slotIndex };
1040  EventContext* contextPtr{ ts.contextPtr };
1041 
1042  if ( asynchronous ) {
1043  // Add to asynchronous scheduled queue
1044  m_scheduledAsynchronousQueue.push( std::move( ts ) );
1045 
1046  // Schedule task
1047  m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1048  }
1049 
1050  if ( !asynchronous ) {
1051  // Add the algorithm to the scheduled queue
1052  m_scheduledQueue.push( std::move( ts ) );
1053 
1054  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1055  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1056  ++m_algosInFlight;
1057  }
1058  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1059 
1060  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1061  << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1062  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1064  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1065  : "" )
1066  << endmsg;
1067 
1068  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1069  // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1070  ++m_algosInFlight;
1071  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1072  AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1073  --m_algosInFlight;
1074  }
1075  } else { // if no Algorithm instance available, retry later
1076 
1077  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1078  // Add the algorithm to the retry queue
1079  m_retryQueue.push( std::move( ts ) );
1080  }
1081 
1083 
1084  return sc;
1085 }

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string &  nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1123 of file AvalancheSchedulerSvc.cpp.

1124  {
1125  // Prevent view nesting
1126  if ( sourceContext->usesSubSlot() ) {
1127  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1128  return StatusCode::FAILURE;
1129  }
1130 
1131  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1132 
1133  // It's not possible to create an std::functional from a move-capturing lambda
1134  // So, we have to release the unique pointer
1135  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1136  &nodeName]() -> StatusCode {
1137  // Attach the sub-slot to the top-level slot
1138  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139 
1140  if ( viewContextPtr ) {
1141  // Re-create the unique pointer
1142  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1144  return StatusCode::SUCCESS;
1145  } else {
1146  // Disable the view node if there are no views
1147  topSlot.disableSubSlots( nodeName );
1148  return StatusCode::SUCCESS;
1149  }
1150  };
1151 
1152  m_actionsQueue.push( std::move( action ) );
1153 
1154  return StatusCode::SUCCESS;
1155 }

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 1092 of file AvalancheSchedulerSvc.cpp.

1092  {
1093 
1094  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1095 
1096  --m_algosInFlight;
1097 
1098  const AlgExecStateRef algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1099  AState state = algstate.execStatus().isSuccess()
1100  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1101  : AState::ERROR;
1102 
1103  // Update algorithm state and revise the downstream states
1104  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1105 
1106  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1107  << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1108  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1110  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1111  : "" )
1112  << endmsg;
1113 
1114  // Prompt a call to updateStates
1115  m_needsUpdate.store( true );
1116  return sc;
1117 }

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 646 of file AvalancheSchedulerSvc.cpp.

646  {
647 
648  if ( m_finishedEvents.try_pop( eventContext ) ) {
649  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
650  << endmsg;
651  ++m_freeSlots;
652  return StatusCode::SUCCESS;
653  }
654  return StatusCode::FAILURE;
655 }

Friends And Related Function Documentation

◆ AlgTask

friend class AlgTask
friend

Definition at line 115 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 318 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 278 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 254 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 260 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 284 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 313 of file AvalancheSchedulerSvc.h.

◆ m_arena

tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr }
private

Definition at line 368 of file AvalancheSchedulerSvc.h.

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 287 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps
private
Initial value:
{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }

Definition at line 195 of file AvalancheSchedulerSvc.h.

◆ m_checkOutput

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkOutput
private
Initial value:
{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }

Definition at line 197 of file AvalancheSchedulerSvc.h.

◆ m_checkOutputIgnoreList

Gaudi::Property<std::vector<std::string> > AvalancheSchedulerSvc::m_checkOutputIgnoreList
private
Initial value:
{
this,
"CheckOutputUsageIgnoreList",
{},
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }

Definition at line 199 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 281 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphAlgoPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
private
Initial value:
{
this, "DataDepsGraphAlgPattern", ".*",
"Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
"deps graph" }

Definition at line 227 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphFile

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphFile
private
Initial value:
{
this, "DataDepsGraphFile", "",
"Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
"Algorithms" }

Definition at line 222 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphObjectPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
private
Initial value:
{
this, "DataDepsGraphObjectPattern", ".*",
"Regex pattern for selecting desired input or output by their full key" }

Definition at line 232 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }

Definition at line 186 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
private

Definition at line 209 of file AvalancheSchedulerSvc.h.

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }

Definition at line 188 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 269 of file AvalancheSchedulerSvc.h.

◆ m_fiberManager

std::unique_ptr<FiberManager> AvalancheSchedulerSvc::m_fiberManager { nullptr }
private

Definition at line 369 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 275 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 }
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 272 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive { INACTIVE }
private

Flag to track if the scheduler is active or not.

Definition at line 245 of file AvalancheSchedulerSvc.h.

◆ m_lastSnapshot

std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now()
private

Definition at line 166 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 }
private

Definition at line 372 of file AvalancheSchedulerSvc.h.

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }

Definition at line 179 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 }
private

Definition at line 371 of file AvalancheSchedulerSvc.h.

◆ m_maxParallelismExtra

Gaudi::Property<int> AvalancheSchedulerSvc::m_maxParallelismExtra
private
Initial value:
{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }

Definition at line 174 of file AvalancheSchedulerSvc.h.

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true }
private

Definition at line 362 of file AvalancheSchedulerSvc.h.

◆ m_numOffloadThreads

Gaudi::Property<int> AvalancheSchedulerSvc::m_numOffloadThreads
private
Initial value:
{
this, "NumOffloadThreads", 2,
"Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
"and use Boost Fiber functionality to suspend while waiting for offloaded work." }

Definition at line 191 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }

Definition at line 184 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 263 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 359 of file AvalancheSchedulerSvc.h.

◆ m_scheduledAsynchronousQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
private

Definition at line 358 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 357 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }

Definition at line 217 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }

Definition at line 211 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }

Definition at line 214 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }

Definition at line 181 of file AvalancheSchedulerSvc.h.

◆ m_snapshotCallback

std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback
private

Definition at line 167 of file AvalancheSchedulerSvc.h.

◆ m_snapshotInterval

std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
private

Definition at line 165 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 248 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }

Definition at line 169 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 367 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }

Definition at line 206 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
private

Definition at line 220 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 266 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
private

Definition at line 178 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files:
IOTest.evt
evt
Definition: IOTest.py:107
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:82
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:266
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:30
MSG::hex
MsgStream & hex(MsgStream &log)
Definition: MsgStream.h:258
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AlgExecStateRef
wrapper on an Algorithm state.
Definition: IAlgExecStateSvc.h:32
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:206
Gaudi::Algorithm::acceptDHVisitor
void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:183
Read.app
app
Definition: Read.py:36
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:524
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
AlgExecStateRef::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:148
GaudiPartProp.decorators.std
std
Definition: decorators.py:32
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:184
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:47
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:162
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:660
gaudirun.s
string s
Definition: gaudirun.py:346
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:666
EventSlot
Class representing an event slot.
Definition: EventSlot.h:23
AlgsExecutionStates
Definition: AlgsExecutionStates.h:37
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:85
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
Definition: AvalancheSchedulerSvc.h:358
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:166
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:74
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:75
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Definition: AvalancheSchedulerSvc.h:227
Gaudi::Hive::Graph
utilities to dump graphs in different formats
Definition: GraphDumper.h:30
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:357
AvalancheSchedulerSvc::m_fiberManager
std::unique_ptr< FiberManager > m_fiberManager
Definition: AvalancheSchedulerSvc.h:369
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:1022
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:217
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:362
DHHVisitor
Definition: DataHandleHolderVisitor.h:20
GaudiPartProp.tests.id
id
Definition: tests.py:111
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:209
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:515
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:147
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:223
Gaudi::Property::name
std::string name
Definition: Property.h:60
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:269
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:39
concurrency::AlgorithmNode::getAlgoIndex
unsigned int getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:521
AvalancheSchedulerSvc::m_numOffloadThreads
Gaudi::Property< int > m_numOffloadThreads
Definition: AvalancheSchedulerSvc.h:191
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:368
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:278
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:88
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:162
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:281
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:859
TimelineEvent
Definition: TimelineEvent.h:16
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:169
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:60
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:75
AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Definition: AvalancheSchedulerSvc.h:232
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:371
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:69
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:179
GaudiUtils::operator<<
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Definition: SerializeSTL.h:101
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:333
StatusCode
Definition: StatusCode.h:64
ITimelineSvc
Definition: ITimelineSvc.h:20
IAlgorithm
Definition: IAlgorithm.h:36
gaudirun.g
dictionary g
Definition: gaudirun.py:582
AvalancheSchedulerSvc::m_maxParallelismExtra
Gaudi::Property< int > m_maxParallelismExtra
Definition: AvalancheSchedulerSvc.h:174
MSG::dec
MsgStream & dec(MsgStream &log)
Definition: MsgStream.h:254
compareRootHistos.ts
ts
Definition: compareRootHistos.py:488
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:188
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:87
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:178
AvalancheSchedulerSvc::m_checkOutput
Gaudi::Property< bool > m_checkOutput
Definition: AvalancheSchedulerSvc.h:197
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:48
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:229
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:77
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:181
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:257
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:99
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:159
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:162
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:28
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:284
DataObjIDColl
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition: DataObjID.h:122
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:198
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:313
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:211
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:372
DataObjID
Definition: DataObjID.h:47
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:74
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:46
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:652
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:186
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:359
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:22
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:99
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:165
Gaudi::Decays::valid
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
Definition: Nodes.h:36
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:367
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:22
AvalancheSchedulerSvc::m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphFile
Definition: AvalancheSchedulerSvc.h:222
EventContext
Definition: EventContext.h:34
TimelineEvent::algorithm
std::string algorithm
Definition: TimelineEvent.h:24
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:406
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:800
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:458
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:318
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:254
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:195
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:841
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:115
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:82
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:248
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:214
AvalancheSchedulerSvc::m_checkOutputIgnoreList
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Definition: AvalancheSchedulerSvc.h:199
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:100
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:88
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:272
compareRootHistos.state
state
Definition: compareRootHistos.py:496
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:287
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:167
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:546
AvalancheSchedulerSvc::action
std::function< StatusCode()> action
Definition: AvalancheSchedulerSvc.h:160
AvalancheSchedulerSvc::dumpGraphFile
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Definition: AvalancheSchedulerSvc.cpp:1178
AlgsExecutionStates::algsInState
const boost::container::flat_set< int > algsInState(State state) const
Definition: AlgsExecutionStates.h:82
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:84
Cause
Definition: PrecedenceRulesGraph.h:397
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:263
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:245
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:275
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:260
GPUAvalancheSchedulerSimpleTest.threads
threads
Definition: GPUAvalancheSchedulerSimpleTest.py:57
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:880
AvalancheSchedulerSvc::m_verboseSubSlots
Gaudi::Property< bool > m_verboseSubSlots
Definition: AvalancheSchedulerSvc.h:220
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:336
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:37
AlgExecStateRef::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:155
gaudirun.callback
callback
Definition: gaudirun.py:202