The Gaudi Framework  v38r3 (c3fc9673)
AvalancheSchedulerSvc Class Reference

#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void dumpState () override
 Dump scheduler state for all slots. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
virtual void recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
 Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result. More...
 
bool next (TaskSpec &ts, bool blocking=false)
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service
More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service
More...
 
StatusCode sysStart () override
 Initialize Service
More...
 
StatusCode sysStop () override
 Initialize Service
More...
 
StatusCode sysFinalize () override
 Finalize Service
More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor
More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator
More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T >
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property. More...
 
virtual StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p)=0
 Set the property from a property with a different name. More...
 
virtual StatusCode setProperty (const std::string &s)=0
 Set the property by string. More...
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals. More...
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string. More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name More...
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name. More...
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected More...
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events. More...
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask. More...
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

std::chrono::duration< int64_t, std::millim_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
 
std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now()
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< int > m_maxParallelismExtra
 
Gaudi::Property< std::stringm_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< bool > m_checkOutput
 
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
 
std::atomic< ActivationStatem_isActive { INACTIVE }
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots { 0 }
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledBlockingQueue
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate { true }
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
tbb::task_arena * m_arena { nullptr }
 
size_t m_maxEventsInFlight { 0 }
 
size_t m_maxAlgosInFlight { 1 }
 

Friends

class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 Standard Destructor
More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::Property< int > m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" }
 flag indicating whether ToolHandle tools have been added to m_tools More...
 
Gaudi::Property< bool > m_auditInit { this, "AuditServices", false, "[[deprecated]] unused" }
 
Gaudi::Property< bool > m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" }
 
Gaudi::Property< bool > m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" }
 
Gaudi::Property< bool > m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" }
 
Gaudi::Property< bool > m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" }
 
Gaudi::Property< bool > m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" }
 
Gaudi::Property< bool > m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" }
 
Gaudi::Property< bool > m_autoRetrieveTools
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service
More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 112 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

Definition at line 155 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 157 of file AvalancheSchedulerSvc.h.

157 { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 434 of file AvalancheSchedulerSvc.cpp.

434  {
435 
436  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
437 
438  if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
439  error() << "problems initializing ThreadPoolSvc" << endmsg;
441  return;
442  }
443 
444  // Wait for actions pushed into the queue by finishing tasks.
445  action thisAction;
447 
448  m_isActive = ACTIVE;
449 
450  // Continue to wait if the scheduler is running or there is something to do
451  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
452  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
453  m_actionsQueue.pop( thisAction );
454  sc = thisAction();
455  ON_VERBOSE {
456  if ( sc.isFailure() )
457  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
458  else
459  verbose() << "Action succeeded." << endmsg;
460  }
461  else sc.ignore();
462 
463  // If all queued actions have been processed, update the slot states
464  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
465  sc = iterate();
466  ON_VERBOSE {
467  if ( sc.isFailure() )
468  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
469  else
470  verbose() << "Iteration succeeded." << endmsg;
471  }
472  else sc.ignore();
473  }
474  }
475 
476  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
477  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
478  error() << "Problems terminating thread pool" << endmsg;
480  }
481 }

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 228 of file AvalancheSchedulerSvc.h.

228 { return m_algname_index_map[algoname]; };

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 491 of file AvalancheSchedulerSvc.cpp.

491  {
492 
493  if ( m_isActive == ACTIVE ) {
494 
495  // Set the number of slots available to an error code
496  m_freeSlots.store( 0 );
497 
498  // Empty queue
499  action thisAction;
500  while ( m_actionsQueue.try_pop( thisAction ) ) {};
501 
502  // This would be the last action
503  m_actionsQueue.push( [this]() -> StatusCode {
504  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
506  return StatusCode::SUCCESS;
507  } );
508  }
509 
510  return StatusCode::SUCCESS;
511 }

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 856 of file AvalancheSchedulerSvc.cpp.

856  {
857 
858  // To have just one big message
859  std::ostringstream outputMS;
860 
861  outputMS << "Dumping scheduler state\n"
862  << "=========================================================================================\n"
863  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
864  << "=========================================================================================\n\n";
865 
866  //===========================================================================
867 
868  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
869  << "------------------\n\n";
870 
871  // Figure if TimelineSvc is available (used below to detect threads IDs)
872  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
873  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
874  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
875  } else {
876 
877  // Figure optimal printout layout
878  size_t indt( 0 );
879  for ( auto& slot : m_eventSlots ) {
880 
881  auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
882  for ( uint algIndex : schedAlgs ) {
883  if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
884  }
885  }
886 
887  // Figure the last running schedule across all slots
888  for ( auto& slot : m_eventSlots ) {
889 
890  auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
891  for ( uint algIndex : schedAlgs ) {
892 
893  const std::string& algoName{ index2algname( algIndex ) };
894 
895  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
896  << slot.eventContext->slot();
897 
898  // Try to get POSIX threads IDs the currently running tasks are scheduled to
899  if ( timelineSvc.isValid() ) {
900  TimelineEvent te{};
901  te.algorithm = algoName;
902  te.slot = slot.eventContext->slot();
903  te.event = slot.eventContext->evt();
904 
905  if ( timelineSvc->getTimelineEvent( te ) )
906  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
907  else
908  outputMS << " thread.id: [unknown]"; // this means a task has just
909  // been signed off as SCHEDULED,
910  // but has not been assigned to a thread yet
911  // (i.e., not running yet)
912  }
913  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
914  }
915  }
916  }
917 
918  //===========================================================================
919 
920  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
921  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
922 
923  int slotCount = -1;
924  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
925  subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
926  : false;
927 
928  for ( auto& slot : m_eventSlots ) {
929  ++slotCount;
930  if ( slot.complete ) continue;
931 
932  outputMS << "[ slot: "
933  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
934  << ", event: "
935  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
936 
937  if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
938  outputMS << " ]:\n\n";
939 
940  if ( 0 > iSlot || iSlot == slotCount ) {
941 
942  // If an alg has thrown an error then it's not a failure of the CF/DF graph
943  if ( wasAlgError ) {
944  outputMS << "ERROR alg(s):";
945  int errorCount = 0;
946  auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
947  for ( uint algIndex : errorAlgs ) {
948  outputMS << " " << index2algname( algIndex );
949  ++errorCount;
950  }
951  if ( errorCount == 0 ) outputMS << " in subslot(s)";
952  outputMS << "\n\n";
953  } else {
954  // Snapshot of the Control Flow and FSM states
955  outputMS << m_precSvc->printState( slot ) << "\n";
956  }
957 
958  // Mention sub slots (this is expensive if the number of sub-slots is high)
959  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
960  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
961  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
962  for ( auto& ss : slot.allSubSlots ) {
963  outputMS << "[ slot: " << slotID << ", sub-slot: "
964  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
965  << ", entry: " << ss.entryPoint << ", event: "
966  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
967  << " ]:\n\n";
968  if ( wasAlgError ) {
969  outputMS << "ERROR alg(s):";
970  auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
971  for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
972  outputMS << "\n\n";
973  } else {
974  // Snapshot of the Control Flow and FSM states in sub slot
975  outputMS << m_precSvc->printState( ss ) << "\n";
976  }
977  }
978  }
979  }
980  }
981 
982  //===========================================================================
983 
984  if ( 0 <= iSlot && !wasAlgError ) {
985  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
986  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
987  }
988 
989  outputMS << "\n=========================================================================================\n"
990  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
991  << "=========================================================================================\n\n";
992 
993  info() << outputMS.str() << endmsg;
994 }

◆ dumpState()

void AvalancheSchedulerSvc::dumpState ( )
override

Dump scheduler state for all slots.

Definition at line 595 of file AvalancheSchedulerSvc.cpp.

595 { dumpSchedulerState( -1 ); }

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 835 of file AvalancheSchedulerSvc.cpp.

835  {
836  const uint slotIdx = eventContext->slot();
837 
838  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
839 
840  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
841 
842  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
843  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
844 
845  // Push into the finished events queue the failed context
846  m_eventSlots[slotIdx].complete = true;
847  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
848 }

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 403 of file AvalancheSchedulerSvc.cpp.

403  {
404 
406  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
407 
408  sc = deactivate();
409  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
410 
411  info() << "Joining Scheduler thread" << endmsg;
412  m_thread.join();
413 
414  // Final error check after thread pool termination
415  if ( m_isActive == FAILURE ) {
416  error() << "problems in scheduler thread" << endmsg;
417  return StatusCode::FAILURE;
418  }
419 
420  return sc;
421 }

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 591 of file AvalancheSchedulerSvc.cpp.

591 { return std::max( m_freeSlots.load(), 0 ); }

◆ index2algname()

const std::string& AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 234 of file AvalancheSchedulerSvc.h.

234 { return m_algname_vect[index]; };

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 73 of file AvalancheSchedulerSvc.cpp.

73  {
74 
75  // Initialise mother class (read properties, ...)
77  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
78 
79  // Get hold of the TBBSvc. This should initialize the thread pool
80  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
81  if ( !m_threadPoolSvc.isValid() ) {
82  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
83  return StatusCode::FAILURE;
84  }
85  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
86  if ( !castTPS ) {
87  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
88  return StatusCode::FAILURE;
89  }
90  m_arena = castTPS->getArena();
91  if ( !m_arena ) {
92  fatal() << "Cannot find valid TBB task_arena" << endmsg;
93  return StatusCode::FAILURE;
94  }
95 
96  // Activate the scheduler in another thread.
97  info() << "Activating scheduler in a separate thread" << endmsg;
98  m_thread = std::thread( [this]() { this->activate(); } );
99 
100  while ( m_isActive != ACTIVE ) {
101  if ( m_isActive == FAILURE ) {
102  fatal() << "Terminating initialization" << endmsg;
103  return StatusCode::FAILURE;
104  } else {
105  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
106  sleep( 1 );
107  }
108  }
109 
110  if ( m_enableCondSvc ) {
111  // Get hold of the CondSvc
112  m_condSvc = serviceLocator()->service( "CondSvc" );
113  if ( !m_condSvc.isValid() ) {
114  warning() << "No CondSvc found, or not enabled. "
115  << "Will not manage CondAlgorithms" << endmsg;
116  m_enableCondSvc = false;
117  }
118  }
119 
120  // Get the algo resource pool
121  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
122  if ( !m_algResourcePool.isValid() ) {
123  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
124  return StatusCode::FAILURE;
125  }
126 
127  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
128  if ( !m_algExecStateSvc.isValid() ) {
129  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
130  return StatusCode::FAILURE;
131  }
132 
133  // Get Whiteboard
135  if ( !m_whiteboard.isValid() ) {
136  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
137  return StatusCode::FAILURE;
138  }
139 
140  // Set the MaxEventsInFlight parameters from the number of WB stores
141  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
142 
143  // Set the number of free slots
145 
146  // Get the list of algorithms
147  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
148  const unsigned int algsNumber = algos.size();
149  if ( algsNumber != 0 ) {
150  info() << "Found " << algsNumber << " algorithms" << endmsg;
151  } else {
152  error() << "No algorithms found" << endmsg;
153  return StatusCode::FAILURE;
154  }
155 
156  /* Dependencies
157  1) Look for handles in algo, if none
158  2) Assume none are required
159  */
160 
161  DataObjIDColl globalInp, globalOutp;
162 
163  // figure out all outputs
164  std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
165  for ( IAlgorithm* ialgoPtr : algos ) {
166  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
167  if ( !algoPtr ) {
168  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171 
172  DataObjIDColl algoOutputs;
173  for ( auto id : algoPtr->outputDataObjs() ) {
174  globalOutp.insert( id );
175  algoOutputs.insert( id );
176  }
177  algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
178  }
179 
180  std::ostringstream ostdd;
181  ostdd << "Data Dependencies for Algorithms:";
182 
183  std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
184  for ( IAlgorithm* ialgoPtr : algos ) {
185  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
186  if ( nullptr == algoPtr ) {
187  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
188  << ": this will result in a crash." << endmsg;
189  return StatusCode::FAILURE;
190  }
191 
192  DataObjIDColl i1, i2;
193  DHHVisitor avis( i1, i2 );
194  algoPtr->acceptDHVisitor( &avis );
195 
196  ostdd << "\n " << algoPtr->name();
197 
198  auto write_owners = [&avis, &ostdd]( const DataObjID id ) {
199  auto owners = avis.owners_names_of( id );
200  if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
201  };
202 
203  DataObjIDColl algoDependencies;
204  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
205  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
206  DataObjID id = *idp;
207  ostdd << "\n o INPUT " << id;
208  write_owners( id );
209  algoDependencies.insert( id );
210  globalInp.insert( id );
211  }
212  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
213  ostdd << "\n o OUTPUT " << *id;
214  write_owners( *id );
215  if ( id->key().find( ":" ) != std::string::npos ) {
216  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
217  << endmsg;
218  m_showDataDeps = true;
219  }
220  }
221  } else {
222  ostdd << "\n none";
223  }
224  algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
225  }
226 
227  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
228 
229  // Check if we have unmet global input dependencies, and, optionally, heal them
230  // WARNING: this step must be done BEFORE the Precedence Service is initialized
231  DataObjIDColl unmetDepInp, unusedOutp;
232  if ( m_checkDeps || m_checkOutput ) {
233  std::set<std::string> requiredInputKeys;
234  for ( auto o : globalInp ) {
235  // track aliases
236  // (assuming there should be no items with different class and same key corresponding to different objects)
237  requiredInputKeys.insert( o.key() );
238  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
239  }
240  if ( m_checkOutput ) {
241  for ( auto o : globalOutp ) {
242  if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
243  // check ignores
244  bool ignored{};
245  for ( const std::string& algoName : m_checkOutputIgnoreList ) {
246  auto it = algosOutputDependenciesMap.find( algoName );
247  if ( it != algosOutputDependenciesMap.end() ) {
248  if ( it->second.find( o ) != it->second.end() ) {
249  ignored = true;
250  break;
251  }
252  }
253  }
254  if ( !ignored ) { unusedOutp.insert( o ); }
255  }
256  }
257  }
258  }
259 
260  if ( m_checkDeps ) {
261  if ( unmetDepInp.size() > 0 ) {
262 
263  auto printUnmet = [&]( auto msg ) {
264  for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
265  msg << " o " << *o << " required by Algorithm: " << endmsg;
266 
267  for ( const auto& p : algosInputDependenciesMap )
268  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
269  }
270  };
271 
272  if ( !m_useDataLoader.empty() ) {
273 
274  // Find the DataLoader Alg
275  IAlgorithm* dataLoaderAlg( nullptr );
276  for ( IAlgorithm* algo : algos )
277  if ( m_useDataLoader == algo->name() ) {
278  dataLoaderAlg = algo;
279  break;
280  }
281 
282  if ( dataLoaderAlg == nullptr ) {
283  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
284  << "\" found, and unmet INPUT dependencies "
285  << "detected:" << endmsg;
286  printUnmet( fatal() );
287  return StatusCode::FAILURE;
288  }
289 
290  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
291  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
292  printUnmet( info() );
293 
294  // Set the property Load of DataLoader Alg
295  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
296  if ( !dataAlg ) {
297  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
298  << endmsg;
299  return StatusCode::FAILURE;
300  }
301 
302  for ( auto& id : unmetDepInp ) {
303  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
304  << dataLoaderAlg->name() << endmsg;
306  }
307 
308  } else {
309  fatal() << "Auto DataLoading not requested, "
310  << "and the following unmet INPUT dependencies were found:" << endmsg;
311  printUnmet( fatal() );
312  return StatusCode::FAILURE;
313  }
314 
315  } else {
316  info() << "No unmet INPUT data dependencies were found" << endmsg;
317  }
318  }
319 
320  if ( m_checkOutput ) {
321  if ( unusedOutp.size() > 0 ) {
322 
323  auto printUnusedOutp = [&]( auto msg ) {
324  for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
325  msg << " o " << *o << " produced by Algorithm: " << endmsg;
326 
327  for ( const auto& p : algosOutputDependenciesMap )
328  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
329  }
330  };
331 
332  fatal() << "The following unused OUTPUT items were found:" << endmsg;
333  printUnusedOutp( fatal() );
334  return StatusCode::FAILURE;
335  } else {
336  info() << "No unused OUTPUT items were found" << endmsg;
337  }
338  }
339 
340  // Get the precedence service
341  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
342  if ( !m_precSvc.isValid() ) {
343  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
344  return StatusCode::FAILURE;
345  }
346  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
347  if ( !precSvc ) {
348  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
349  return StatusCode::FAILURE;
350  }
351 
352  // Fill the containers to convert algo names to index
353  m_algname_vect.resize( algsNumber );
354  for ( IAlgorithm* algo : algos ) {
355  const std::string& name = algo->name();
356  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
359  }
360 
361  // Shortcut for the message service
362  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
363  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
364 
366  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
367  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
368  m_eventSlots.back().complete = true;
369  }
370 
371  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
372 
373  // Clearly inform about the level of concurrency
374  info() << "Concurrency level information:" << endmsg;
375  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
376  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
377 
378  // Inform about task scheduling prescriptions
379  info() << "Task scheduling settings:" << endmsg;
380  info() << " o Avalanche generation mode: "
381  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
382  info() << " o Preemptive scheduling of CPU-blocking tasks: "
384  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
385  : "disabled" )
386  << endmsg;
387  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
388 
389  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
390 
391  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
392 
393  // Simulate execution flow
394  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
395 
396  return sc;
397 }

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 817 of file AvalancheSchedulerSvc.cpp.

817  {
818 
819  if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
820  !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
821 
822  error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
823 
824  return true;
825  }
826  return false;
827 }

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 642 of file AvalancheSchedulerSvc.cpp.

642  {
643 
644  StatusCode global_sc( StatusCode::SUCCESS );
645 
646  // Retry algorithms
647  const size_t retries = m_retryQueue.size();
648  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
649  TaskSpec retryTS = std::move( m_retryQueue.front() );
650  m_retryQueue.pop();
651  global_sc = schedule( std::move( retryTS ) );
652  }
653 
654  // Loop over all slots
655  OccupancySnapshot nextSnap;
656  auto now = std::chrono::system_clock::now();
657  for ( EventSlot& thisSlot : m_eventSlots ) {
658 
659  // Ignore slots without a valid context (relevant when populating scheduler for first time)
660  if ( !thisSlot.eventContext ) continue;
661 
662  int iSlot = thisSlot.eventContext->slot();
663 
664  // Cache the states of the algorithms to improve readability and performance
665  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
666 
667  StatusCode partial_sc = StatusCode::FAILURE;
668 
669  // Make an occupancy snapshot
672 
673  // Initialise snapshot
674  if ( nextSnap.states.empty() ) {
675  nextSnap.time = now;
676  nextSnap.states.resize( m_eventSlots.size() );
677  }
678 
679  // Store alg states
680  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
681  slotStateTotals.resize( AState::MAXVALUE );
682  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
683  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
684  }
685 
686  // Add subslot alg states
687  for ( auto& subslot : thisSlot.allSubSlots ) {
688  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
689  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
690  }
691  }
692  }
693 
694  // Perform DR->SCHEDULED
695  auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
696  for ( uint algIndex : drAlgs ) {
697  const std::string& algName{ index2algname( algIndex ) };
698  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
699  bool blocking{ m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false };
700 
701  partial_sc =
702  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
703 
704  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
705  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
706  << " on processing slot " << iSlot << endmsg;
707  }
708 
709  // Check for algorithms ready in sub-slots
710  for ( auto& subslot : thisSlot.allSubSlots ) {
711  auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
712  for ( uint algIndex : drAlgsSubSlot ) {
713  const std::string& algName{ index2algname( algIndex ) };
714  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
715  bool blocking{ m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false };
716  partial_sc =
717  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
718  }
719  }
720 
721  if ( m_dumpIntraEventDynamics ) {
723  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
724  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
725  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
728  std::ofstream myfile;
729  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
730  myfile << s.str();
731  myfile.close();
732  }
733 
734  // Not complete because this would mean that the slot is already free!
735  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
736  !thisSlot.algsStates.containsAny(
737  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
738  !subSlotAlgsInStates( thisSlot,
739  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
740  !thisSlot.complete ) {
741 
742  thisSlot.complete = true;
743  // if the event did not fail, add it to the finished events
744  // otherwise it is taken care of in the error handling
745  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
746  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
747  << thisSlot.eventContext->slot() << ")." << endmsg;
748  m_finishedEvents.push( thisSlot.eventContext.release() );
749  }
750 
751  // now let's return the fully evaluated result of the control flow
752  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
753 
754  thisSlot.eventContext.reset( nullptr );
755 
756  } else if ( isStalled( thisSlot ) ) {
757  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
758  eventFailed( thisSlot.eventContext.get() ); // can't release yet
759  }
760  partial_sc.ignore();
761  } // end loop on slots
762 
763  // Process snapshot
764  if ( !nextSnap.states.empty() ) {
765  m_lastSnapshot = nextSnap.time;
766  m_snapshotCallback( std::move( nextSnap ) );
767  }
768 
769  ON_VERBOSE verbose() << "Iteration done." << endmsg;
770  m_needsUpdate.store( false );
771  return global_sc;
772 }

◆ next()

bool AvalancheSchedulerSvc::next ( TaskSpec ts,
bool  blocking = false 
)
inline

Definition at line 351 of file AvalancheSchedulerSvc.h.

351  {
352  return blocking ? m_scheduledBlockingQueue.try_pop( ts ) : m_scheduledQueue.try_pop( ts );
353  };

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 601 of file AvalancheSchedulerSvc.cpp.

601  {
602 
603  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
604  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
605  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
606  // << " active: " << m_isActive << endmsg;
607  return StatusCode::FAILURE;
608  } else {
609  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
610  // << " active: " << m_isActive << endmsg;
611  m_finishedEvents.pop( eventContext );
612  ++m_freeSlots;
613  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
614  return StatusCode::SUCCESS;
615  }
616 }

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 522 of file AvalancheSchedulerSvc.cpp.

522  {
523 
524  if ( !eventContext ) {
525  fatal() << "Event context is nullptr" << endmsg;
526  return StatusCode::FAILURE;
527  }
528 
529  if ( m_freeSlots.load() == 0 ) {
530  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
531  return StatusCode::FAILURE;
532  }
533 
534  // no problem as push new event is only called from one thread (event loop manager)
535  --m_freeSlots;
536 
537  auto action = [this, eventContext]() -> StatusCode {
538  // Event processing slot forced to be the same as the wb slot
539  const unsigned int thisSlotNum = eventContext->slot();
540  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
541  if ( !thisSlot.complete ) {
542  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
543  return StatusCode::FAILURE;
544  }
545 
546  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
547  thisSlot.reset( eventContext );
548 
549  // Result status code:
551 
552  // promote to CR and DR the initial set of algorithms
553  Cause cs = { Cause::source::Root, "RootDecisionHub" };
554  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
555  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
556  result = StatusCode::FAILURE;
557  }
558 
559  if ( this->iterate().isFailure() ) {
560  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
561  result = StatusCode::FAILURE;
562  }
563 
564  return result;
565  }; // end of lambda
566 
567  // Kick off scheduling
568  ON_VERBOSE {
569  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
570  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
571  }
572 
573  m_actionsQueue.push( action );
574 
575  return StatusCode::SUCCESS;
576 }

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 580 of file AvalancheSchedulerSvc.cpp.

580  {
581  StatusCode sc;
582  for ( auto context : eventContexts ) {
583  sc = pushNewEvent( context );
584  if ( sc != StatusCode::SUCCESS ) return sc;
585  }
586  return sc;
587 }

◆ recordOccupancy()

void AvalancheSchedulerSvc::recordOccupancy ( int  samplePeriod,
std::function< void(OccupancySnapshot)>  callback 
)
overridevirtual

Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.

Definition at line 1148 of file AvalancheSchedulerSvc.cpp.

1148  {
1149 
1150  auto action = [this, samplePeriod, callback{ std::move( callback ) }]() -> StatusCode {
1151  if ( samplePeriod < 0 ) {
1153  } else {
1156  }
1157  return StatusCode::SUCCESS;
1158  };
1159 
1160  m_actionsQueue.push( std::move( action ) );
1161 }

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int  iAlgo,
EventContext contextPtr,
AState  state,
bool  iterate = false 
)
private

Definition at line 776 of file AvalancheSchedulerSvc.cpp.

776  {
777  StatusCode sc;
778  auto slotIndex = contextPtr->slot();
779  EventSlot& slot = m_eventSlots[slotIndex];
780  Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
781 
782  if ( contextPtr->usesSubSlot() ) {
783  // Sub-slot
784  auto subSlotIndex = contextPtr->subSlot();
785  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
786 
787  sc = subSlot.algsStates.set( iAlgo, state );
788 
789  if ( sc.isSuccess() ) {
790  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
791  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
792  // Revise states of algorithms downstream the precedence graph
793  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
794  }
795  } else {
796  // Event level (standard behaviour)
797  sc = slot.algsStates.set( iAlgo, state );
798 
799  if ( sc.isSuccess() ) {
800  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
801  << ", event:" << contextPtr->evt() << "]" << endmsg;
802  // Revise states of algorithms downstream the precedence graph
803  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
804  }
805  }
806  return sc;
807 }

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec &&  ts)
private

Definition at line 998 of file AvalancheSchedulerSvc.cpp.

998  {
999 
1000  if ( ts.blocking && m_blockingAlgosInFlight == m_maxBlockingAlgosInFlight ) {
1001  m_retryQueue.push( std::move( ts ) );
1002  return StatusCode::SUCCESS;
1003  }
1004 
1005  // Check if a free Algorithm instance is available
1006  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1007 
1008  // If an instance is available, proceed to scheduling
1009  StatusCode sc;
1010  if ( getAlgSC.isSuccess() ) {
1011 
1012  // Decide how to schedule the task and schedule it
1013  if ( -100 != m_threadPoolSize ) {
1014 
1015  // Cache values before moving the TaskSpec further
1016  unsigned int algIndex{ ts.algIndex };
1017  std::string_view algName( ts.algName );
1018  unsigned int algRank{ ts.algRank };
1019  bool blocking{ ts.blocking };
1020  int slotIndex{ ts.slotIndex };
1021  EventContext* contextPtr{ ts.contextPtr };
1022 
1023  if ( !blocking ) {
1024  // Add the algorithm to the scheduled queue
1025  m_scheduledQueue.push( std::move( ts ) );
1026 
1027  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1028  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, false ) );
1029  ++m_algosInFlight;
1030 
1031  } else { // schedule blocking algorithm in independent thread
1033 
1034  // Schedule the blocking task in an independent thread
1036  std::thread _t( AlgTask( this, serviceLocator(), m_algExecStateSvc, true ) );
1037  _t.detach();
1038 
1039  } // end scheduling blocking Algorithm
1040 
1041  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1042 
1043  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1044  << ", rank:" << algRank << ", blocking:" << ( blocking ? "yes" : "no" )
1045  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1047  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1048  : "" )
1049  << endmsg;
1050 
1051  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1052  ++m_algosInFlight;
1053  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1054  AlgTask( this, serviceLocator(), m_algExecStateSvc, false )();
1055  --m_algosInFlight;
1056  }
1057  } else { // if no Algorithm instance available, retry later
1058 
1059  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1060  // Add the algorithm to the retry queue
1061  m_retryQueue.push( std::move( ts ) );
1062  }
1063 
1065 
1066  return sc;
1067 }

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1108 of file AvalancheSchedulerSvc.cpp.

1109  {
1110  // Prevent view nesting
1111  if ( sourceContext->usesSubSlot() ) {
1112  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1113  return StatusCode::FAILURE;
1114  }
1115 
1116  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1117 
1118  // It's not possible to create an std::functional from a move-capturing lambda
1119  // So, we have to release the unique pointer
1120  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1121  &nodeName]() -> StatusCode {
1122  // Attach the sub-slot to the top-level slot
1123  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1124 
1125  if ( viewContextPtr ) {
1126  // Re-create the unique pointer
1127  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1128  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1129  return StatusCode::SUCCESS;
1130  } else {
1131  // Disable the view node if there are no views
1132  topSlot.disableSubSlots( nodeName );
1133  return StatusCode::SUCCESS;
1134  }
1135  };
1136 
1137  m_actionsQueue.push( std::move( action ) );
1138 
1139  return StatusCode::SUCCESS;
1140 }

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 1074 of file AvalancheSchedulerSvc.cpp.

1074  {
1075 
1076  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1077 
1078  if ( !ts.blocking )
1079  --m_algosInFlight;
1080  else
1082 
1083  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1084  AState state = algstate.execStatus().isSuccess()
1085  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1086  : AState::ERROR;
1087 
1088  // Update algorithm state and revise the downstream states
1089  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1090 
1091  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1092  << ", rank:" << ts.algRank << ", blocking:" << ( ts.blocking ? "yes" : "no" )
1093  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1095  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1096  : "" )
1097  << endmsg;
1098 
1099  // Prompt a call to updateStates
1100  m_needsUpdate.store( true );
1101  return sc;
1102 }

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 622 of file AvalancheSchedulerSvc.cpp.

622  {
623 
624  if ( m_finishedEvents.try_pop( eventContext ) ) {
625  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
626  << endmsg;
627  ++m_freeSlots;
628  return StatusCode::SUCCESS;
629  }
630  return StatusCode::FAILURE;
631 }

Friends And Related Function Documentation

◆ AlgTask

friend class AlgTask
friend

Definition at line 114 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 295 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 255 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 231 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 237 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 261 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 290 of file AvalancheSchedulerSvc.h.

◆ m_arena

tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr }
private

Definition at line 345 of file AvalancheSchedulerSvc.h.

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 264 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps
private
Initial value:
{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }

Definition at line 186 of file AvalancheSchedulerSvc.h.

◆ m_checkOutput

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkOutput
private
Initial value:
{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }

Definition at line 188 of file AvalancheSchedulerSvc.h.

◆ m_checkOutputIgnoreList

Gaudi::Property<std::vector<std::string> > AvalancheSchedulerSvc::m_checkOutputIgnoreList
private
Initial value:
{
this,
"CheckOutputUsageIgnoreList",
{},
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }

Definition at line 190 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 258 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }

Definition at line 181 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
private

Definition at line 200 of file AvalancheSchedulerSvc.h.

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }

Definition at line 183 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 246 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 252 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 }
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 249 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive { INACTIVE }
private

Flag to track if the scheduler is active or not.

Definition at line 222 of file AvalancheSchedulerSvc.h.

◆ m_lastSnapshot

std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now()
private

Definition at line 161 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 }
private

Definition at line 347 of file AvalancheSchedulerSvc.h.

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }

Definition at line 174 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 }
private

Definition at line 346 of file AvalancheSchedulerSvc.h.

◆ m_maxParallelismExtra

Gaudi::Property<int> AvalancheSchedulerSvc::m_maxParallelismExtra
private
Initial value:
{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }

Definition at line 169 of file AvalancheSchedulerSvc.h.

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true }
private

Definition at line 339 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }

Definition at line 179 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 240 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 336 of file AvalancheSchedulerSvc.h.

◆ m_scheduledBlockingQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledBlockingQueue
private

Definition at line 335 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 334 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }

Definition at line 208 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }

Definition at line 202 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }

Definition at line 205 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }

Definition at line 176 of file AvalancheSchedulerSvc.h.

◆ m_snapshotCallback

std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback
private

Definition at line 162 of file AvalancheSchedulerSvc.h.

◆ m_snapshotInterval

std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
private

Definition at line 160 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 225 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }

Definition at line 164 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 344 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }

Definition at line 197 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
private

Definition at line 211 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 243 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
private

Definition at line 173 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files:
IOTest.evt
evt
Definition: IOTest.py:107
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:243
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
std::vector::resize
T resize(T... args)
Gaudi::Details::PropertyBase::name
const std::string name() const
property name
Definition: PropertyBase.h:39
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:197
std::string
STL class.
std::list< IAlgorithm * >
Gaudi::Algorithm::acceptDHVisitor
void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:188
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:528
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:179
std::unordered_set< DataObjID, DataObjID_Hasher >
std::vector::reserve
T reserve(T... args)
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:42
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:157
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:657
gaudirun.s
string s
Definition: gaudirun.py:346
std::vector< int >
std::unordered_set::find
T find(T... args)
std::string::length
T length(T... args)
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:642
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
AlgsExecutionStates
Definition: AlgsExecutionStates.h:38
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:86
std::chrono::duration
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:161
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
std::stringstream
STL class.
std::unique_ptr::get
T get(T... args)
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:73
std::unique_ptr::release
T release(T... args)
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
std::vector::back
T back(T... args)
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:334
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:998
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:208
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:339
DHHVisitor
Definition: DataHandleHolderVisitor.h:21
GaudiPartProp.tests.id
id
Definition: tests.py:111
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:200
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:491
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:246
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:40
concurrency::AlgorithmNode::getAlgoIndex
unsigned int getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:518
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:345
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:255
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
std::hex
T hex(T... args)
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:157
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:258
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:835
TimelineEvent
Definition: ITimelineSvc.h:23
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:164
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:73
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:346
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:174
GaudiUtils::operator<<
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Definition: SerializeSTL.h:88
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
ITimelineSvc
Definition: ITimelineSvc.h:37
std::vector::at
T at(T... args)
IAlgorithm
Definition: IAlgorithm.h:38
std::atomic::load
T load(T... args)
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::ofstream
STL class.
AvalancheSchedulerSvc::m_maxParallelismExtra
Gaudi::Property< int > m_maxParallelismExtra
Definition: AvalancheSchedulerSvc.h:169
compareRootHistos.ts
ts
Definition: compareRootHistos.py:488
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:183
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:173
AvalancheSchedulerSvc::m_checkOutput
Gaudi::Property< bool > m_checkOutput
Definition: AvalancheSchedulerSvc.h:188
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:239
std::to_string
T to_string(T... args)
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
event_timeout_check.app
app
Definition: event_timeout_check.py:41
AlgExecState::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:43
std::ofstream::close
T close(T... args)
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:176
AvalancheSchedulerSvc::m_scheduledBlockingQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledBlockingQueue
Definition: AvalancheSchedulerSvc.h:335
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:234
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:154
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:157
std::ofstream::open
T open(T... args)
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:28
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:261
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map
STL class.
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:290
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:202
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:347
DataObjID
Definition: DataObjID.h:47
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:75
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
std::chrono::duration::min
T min(T... args)
std::ostringstream
STL class.
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:41
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
std::vector::emplace_back
T emplace_back(T... args)
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:649
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:181
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:336
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:25
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
AtlasMCRecoScenario.threads
threads
Definition: AtlasMCRecoScenario.py:29
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:84
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:160
Gaudi::Decays::valid
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
Definition: Nodes.h:36
std
STL namespace.
std::unordered_set::insert
T insert(T... args)
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:344
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
EventContext
Definition: EventContext.h:34
TimelineEvent::algorithm
std::string algorithm
Definition: ITimelineSvc.h:31
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:417
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:776
AlgExecState::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:41
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:434
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:295
std::unordered_set::empty
T empty(T... args)
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:228
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:186
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:817
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:114
std::ostringstream::str
T str(T... args)
std::atomic::store
T store(T... args)
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:225
std::unordered_set::end
T end(T... args)
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:205
AlgExecState
Definition: IAlgExecStateSvc.h:37
AvalancheSchedulerSvc::m_checkOutputIgnoreList
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Definition: AvalancheSchedulerSvc.h:190
std::setw
T setw(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::max
T max(T... args)
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:89
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:249
compareRootHistos.state
state
Definition: compareRootHistos.py:496
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:264
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:162
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:522
AvalancheSchedulerSvc::action
std::function< StatusCode()> action
Definition: AvalancheSchedulerSvc.h:155
AlgsExecutionStates::algsInState
const boost::container::flat_set< int > algsInState(State state) const
Definition: AlgsExecutionStates.h:83
std::unique_ptr< EventContext >
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
Cause
Definition: PrecedenceRulesGraph.h:394
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:240
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:222
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:252
std::set< std::string >
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:234
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:856
AvalancheSchedulerSvc::m_verboseSubSlots
Gaudi::Property< bool > m_verboseSubSlots
Definition: AvalancheSchedulerSvc.h:211
std::thread::join
T join(T... args)
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
gaudirun.callback
callback
Definition: gaudirun.py:202
std::chrono::system_clock::now
T now(T... args)