The Gaudi Framework  v36r1 (3e2fb5a8)
AvalancheSchedulerSvc Class Reference

#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

 ~AvalancheSchedulerSvc () noexcept override
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
virtual void recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
 Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result. More...
 
bool next (TaskSpec &ts, bool blocking=false)
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service
More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service
More...
 
StatusCode sysStart () override
 Initialize Service
More...
 
StatusCode sysStop () override
 Initialize Service
More...
 
StatusCode sysFinalize () override
 Finalize Service
More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor
More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator
More...
 
StatusCode setProperties ()
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T >
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property. More...
 
virtual StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p)=0
 Set the property from a property with a different name. More...
 
virtual StatusCode setProperty (const std::string &s)=0
 Set the property by string. More...
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals. More...
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string. More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name More...
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name. More...
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected More...
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events. More...
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask. More...
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

std::chrono::duration< int64_t, std::millim_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
 
std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now()
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledBlockingQueue
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate {true}
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
tbb::task_arena * m_arena {nullptr}
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 

Friends

class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 Standard Destructor
More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 flag indicating whether ToolHandle tools have been added to m_tools More...
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
Gaudi::Property< bool > m_autoRetrieveTools {this, "AutoRetrieveTools", true, "retrieve all AlgTools during initialize"}
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service
More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 112 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

Definition at line 156 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 158 of file AvalancheSchedulerSvc.h.

158 { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };

Constructor & Destructor Documentation

◆ ~AvalancheSchedulerSvc()

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
inlineoverridenoexcept

Destructor.

Need to enforce noexcept specification as otherwise the noexcept(false) destructor of the tbb::task_group member violates the contract

Definition at line 122 of file AvalancheSchedulerSvc.h.

122 {}

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 387 of file AvalancheSchedulerSvc.cpp.

387  {
388 
389  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
390 
391  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
392  error() << "problems initializing ThreadPoolSvc" << endmsg;
394  return;
395  }
396 
397  // Wait for actions pushed into the queue by finishing tasks.
398  action thisAction;
400 
401  m_isActive = ACTIVE;
402 
403  // Continue to wait if the scheduler is running or there is something to do
404  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
405  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
406  m_actionsQueue.pop( thisAction );
407  sc = thisAction();
408  ON_VERBOSE {
409  if ( sc.isFailure() )
410  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
411  else
412  verbose() << "Action succeeded." << endmsg;
413  }
414  else sc.ignore();
415 
416  // If all queued actions have been processed, update the slot states
417  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
418  sc = iterate();
419  ON_VERBOSE {
420  if ( sc.isFailure() )
421  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
422  else
423  verbose() << "Iteration succeeded." << endmsg;
424  }
425  else sc.ignore();
426  }
427  }
428 
429  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
430  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
431  error() << "Problems terminating thread pool" << endmsg;
433  }
434 }

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 216 of file AvalancheSchedulerSvc.h.

216 { return m_algname_index_map[algoname]; };

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 444 of file AvalancheSchedulerSvc.cpp.

444  {
445 
446  if ( m_isActive == ACTIVE ) {
447 
448  // Set the number of slots available to an error code
449  m_freeSlots.store( 0 );
450 
451  // Empty queue
452  action thisAction;
453  while ( m_actionsQueue.try_pop( thisAction ) ) {};
454 
455  // This would be the last action
456  m_actionsQueue.push( [this]() -> StatusCode {
457  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
459  return StatusCode::SUCCESS;
460  } );
461  }
462 
463  return StatusCode::SUCCESS;
464 }

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 806 of file AvalancheSchedulerSvc.cpp.

806  {
807 
808  // To have just one big message
809  std::ostringstream outputMS;
810 
811  outputMS << "Dumping scheduler state\n"
812  << "=========================================================================================\n"
813  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
814  << "=========================================================================================\n\n";
815 
816  //===========================================================================
817 
818  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
819  << "------------------\n\n";
820 
821  // Figure if TimelineSvc is available (used below to detect threads IDs)
822  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
823  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
824  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
825  } else {
826 
827  // Figure optimal printout layout
828  size_t indt( 0 );
829  for ( auto& slot : m_eventSlots )
830  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
831  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
832 
833  // Figure the last running schedule across all slots
834  for ( auto& slot : m_eventSlots ) {
835  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
836  ++it ) {
837 
838  const std::string& algoName{index2algname( *it )};
839 
840  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
841  << slot.eventContext->slot();
842 
843  // Try to get POSIX threads IDs the currently running tasks are scheduled to
844  if ( timelineSvc.isValid() ) {
845  TimelineEvent te{};
846  te.algorithm = algoName;
847  te.slot = slot.eventContext->slot();
848  te.event = slot.eventContext->evt();
849 
850  if ( timelineSvc->getTimelineEvent( te ) )
851  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
852  else
853  outputMS << " thread.id: [unknown]"; // this means a task has just
854  // been signed off as SCHEDULED,
855  // but has not been assigned to a thread yet
856  // (i.e., not running yet)
857  }
858  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
859  }
860  }
861  }
862 
863  //===========================================================================
864 
865  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
866  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
867 
868  int slotCount = -1;
869  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( {AState::ERROR} ) ||
870  subSlotAlgsInStates( m_eventSlots[iSlot], {AState::ERROR} )
871  : false;
872 
873  for ( auto& slot : m_eventSlots ) {
874  ++slotCount;
875  if ( slot.complete ) continue;
876 
877  outputMS << "[ slot: "
878  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
879  << " event: "
880  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
881  << " ]:\n\n";
882 
883  if ( 0 > iSlot || iSlot == slotCount ) {
884 
885  // If an alg has thrown an error then it's not a failure of the CF/DF graph
886  if ( wasAlgError ) {
887  outputMS << "ERROR alg(s):";
888  int errorCount = 0;
889  for ( auto it = slot.algsStates.begin( AState::ERROR ); it != slot.algsStates.end( AState::ERROR ); ++it ) {
890  outputMS << " " << index2algname( *it );
891  ++errorCount;
892  }
893  if ( errorCount == 0 ) outputMS << " in subslot(s)";
894  outputMS << "\n\n";
895  } else {
896  // Snapshot of the Control Flow and FSM states
897  outputMS << m_precSvc->printState( slot ) << "\n";
898  }
899 
900  // Mention sub slots (this is expensive if the number of sub-slots is high)
901  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
902  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
903  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
904  for ( auto& ss : slot.allSubSlots ) {
905  outputMS << "[ slot: " << slotID << ", sub-slot: "
906  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
907  << ", entry: " << ss.entryPoint << ", event: "
908  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
909  << " ]:\n\n";
910  if ( wasAlgError ) {
911  outputMS << "ERROR alg(s):";
912  for ( auto it = ss.algsStates.begin( AState::ERROR ); it != ss.algsStates.end( AState::ERROR ); ++it ) {
913  outputMS << " " << index2algname( *it );
914  }
915  outputMS << "\n\n";
916  } else {
917  outputMS << m_precSvc->printState( ss ) << "\n";
918  }
919  }
920  }
921  }
922  }
923 
924  //===========================================================================
925 
926  if ( 0 <= iSlot && !wasAlgError ) {
927  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
928  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
929  }
930 
931  outputMS << "\n=========================================================================================\n"
932  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
933  << "=========================================================================================\n\n";
934 
935  info() << outputMS.str() << endmsg;
936 }

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 785 of file AvalancheSchedulerSvc.cpp.

785  {
786  const uint slotIdx = eventContext->slot();
787 
788  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
789 
790  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
791 
792  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
793  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
794 
795  // Push into the finished events queue the failed context
796  m_eventSlots[slotIdx].complete = true;
797  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
798 }

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 356 of file AvalancheSchedulerSvc.cpp.

356  {
357 
359  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
360 
361  sc = deactivate();
362  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
363 
364  info() << "Joining Scheduler thread" << endmsg;
365  m_thread.join();
366 
367  // Final error check after thread pool termination
368  if ( m_isActive == FAILURE ) {
369  error() << "problems in scheduler thread" << endmsg;
370  return StatusCode::FAILURE;
371  }
372 
373  return sc;
374 }

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 544 of file AvalancheSchedulerSvc.cpp.

544 { return std::max( m_freeSlots.load(), 0 ); }

◆ index2algname()

const std::string& AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 222 of file AvalancheSchedulerSvc.h.

222 { return m_algname_vect[index]; };

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 74 of file AvalancheSchedulerSvc.cpp.

74  {
75 
76  // Initialise mother class (read properties, ...)
78  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
79 
80  // Get hold of the TBBSvc. This should initialize the thread pool
81  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
82  if ( !m_threadPoolSvc.isValid() ) {
83  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
84  return StatusCode::FAILURE;
85  }
86  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
87  if ( !castTPS ) {
88  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
89  return StatusCode::FAILURE;
90  }
91  m_arena = castTPS->getArena();
92  if ( !m_arena ) {
93  fatal() << "Cannot find valid TBB task_arena" << endmsg;
94  return StatusCode::FAILURE;
95  }
96 
97  // Activate the scheduler in another thread.
98  info() << "Activating scheduler in a separate thread" << endmsg;
99  m_thread = std::thread( [this]() { this->activate(); } );
100 
101  while ( m_isActive != ACTIVE ) {
102  if ( m_isActive == FAILURE ) {
103  fatal() << "Terminating initialization" << endmsg;
104  return StatusCode::FAILURE;
105  } else {
106  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
107  sleep( 1 );
108  }
109  }
110 
111  if ( m_enableCondSvc ) {
112  // Get hold of the CondSvc
113  m_condSvc = serviceLocator()->service( "CondSvc" );
114  if ( !m_condSvc.isValid() ) {
115  warning() << "No CondSvc found, or not enabled. "
116  << "Will not manage CondAlgorithms" << endmsg;
117  m_enableCondSvc = false;
118  }
119  }
120 
121  // Get the algo resource pool
122  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
123  if ( !m_algResourcePool.isValid() ) {
124  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
125  return StatusCode::FAILURE;
126  }
127 
128  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
129  if ( !m_algExecStateSvc.isValid() ) {
130  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
131  return StatusCode::FAILURE;
132  }
133 
134  // Get Whiteboard
136  if ( !m_whiteboard.isValid() ) {
137  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
138  return StatusCode::FAILURE;
139  }
140 
141  // Set the MaxEventsInFlight parameters from the number of WB stores
142  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
143 
144  // Set the number of free slots
146 
147  // Get the list of algorithms
148  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
149  const unsigned int algsNumber = algos.size();
150  if ( algsNumber != 0 ) {
151  info() << "Found " << algsNumber << " algorithms" << endmsg;
152  } else {
153  error() << "No algorithms found" << endmsg;
154  return StatusCode::FAILURE;
155  }
156 
157  /* Dependencies
158  1) Look for handles in algo, if none
159  2) Assume none are required
160  */
161 
162  DataObjIDColl globalInp, globalOutp;
163 
164  // figure out all outputs
165  for ( IAlgorithm* ialgoPtr : algos ) {
166  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
167  if ( !algoPtr ) {
168  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171  for ( auto id : algoPtr->outputDataObjs() ) globalOutp.insert( id );
172  }
173 
174  std::ostringstream ostdd;
175  ostdd << "Data Dependencies for Algorithms:";
176 
177  std::map<std::string, DataObjIDColl> algosDependenciesMap;
178  for ( IAlgorithm* ialgoPtr : algos ) {
179  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180  if ( nullptr == algoPtr ) {
181  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
182  << ": this will result in a crash." << endmsg;
183  return StatusCode::FAILURE;
184  }
185 
186  ostdd << "\n " << algoPtr->name();
187 
188  DataObjIDColl algoDependencies;
189  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
190  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
191  DataObjID id = *idp;
192  ostdd << "\n o INPUT " << id;
193  if ( id.key().find( ":" ) != std::string::npos ) {
194  ostdd << " contains alternatives which require resolution...\n";
195  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
196  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
197  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
198  } );
199  if ( itok != tokens.end() ) {
200  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
201  id.updateKey( *itok );
202  } else {
203  error() << "failed to find alternate in global output list"
204  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
205  m_showDataDeps = true;
206  }
207  }
208  algoDependencies.insert( id );
209  globalInp.insert( id );
210  }
211  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
212  ostdd << "\n o OUTPUT " << *id;
213  if ( id->key().find( ":" ) != std::string::npos ) {
214  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
215  << endmsg;
216  m_showDataDeps = true;
217  }
218  }
219  } else {
220  ostdd << "\n none";
221  }
222  algosDependenciesMap[algoPtr->name()] = algoDependencies;
223  }
224 
225  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
226 
227  // Check if we have unmet global input dependencies, and, optionally, heal them
228  // WARNING: this step must be done BEFORE the Precedence Service is initialized
229  if ( m_checkDeps ) {
230  DataObjIDColl unmetDep;
231  for ( auto o : globalInp )
232  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
233 
234  if ( unmetDep.size() > 0 ) {
235 
236  auto printUnmet = [&]( auto msg ) {
237  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
238  msg << " o " << *o << " required by Algorithm: " << endmsg;
239 
240  for ( const auto& p : algosDependenciesMap )
241  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
242  }
243  };
244 
245  if ( !m_useDataLoader.empty() ) {
246 
247  // Find the DataLoader Alg
248  IAlgorithm* dataLoaderAlg( nullptr );
249  for ( IAlgorithm* algo : algos )
250  if ( algo->name() == m_useDataLoader ) {
251  dataLoaderAlg = algo;
252  break;
253  }
254 
255  if ( dataLoaderAlg == nullptr ) {
256  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
257  << "\" found, and unmet INPUT dependencies "
258  << "detected:" << endmsg;
259  printUnmet( fatal() );
260  return StatusCode::FAILURE;
261  }
262 
263  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
264  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
265  printUnmet( info() );
266 
267  // Set the property Load of DataLoader Alg
268  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
269  if ( !dataAlg ) {
270  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
271  << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  for ( auto& id : unmetDep ) {
276  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
277  << dataLoaderAlg->name() << endmsg;
279  }
280 
281  } else {
282  fatal() << "Auto DataLoading not requested, "
283  << "and the following unmet INPUT dependencies were found:" << endmsg;
284  printUnmet( fatal() );
285  return StatusCode::FAILURE;
286  }
287 
288  } else {
289  info() << "No unmet INPUT data dependencies were found" << endmsg;
290  }
291  }
292 
293  // Get the precedence service
294  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
295  if ( !m_precSvc.isValid() ) {
296  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
297  return StatusCode::FAILURE;
298  }
299  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
300  if ( !precSvc ) {
301  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
302  return StatusCode::FAILURE;
303  }
304 
305  // Fill the containers to convert algo names to index
306  m_algname_vect.resize( algsNumber );
307  for ( IAlgorithm* algo : algos ) {
308  const std::string& name = algo->name();
309  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
310  m_algname_index_map[name] = index;
311  m_algname_vect.at( index ) = name;
312  }
313 
314  // Shortcut for the message service
315  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
316  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
317 
319  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
320  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
321  m_eventSlots.back().complete = true;
322  }
323 
324  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
325 
326  // Clearly inform about the level of concurrency
327  info() << "Concurrency level information:" << endmsg;
328  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
329  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
330 
331  // Inform about task scheduling prescriptions
332  info() << "Task scheduling settings:" << endmsg;
333  info() << " o Avalanche generation mode: "
334  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
335  info() << " o Preemptive scheduling of CPU-blocking tasks: "
337  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
338  : "disabled" )
339  << endmsg;
340  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
341 
342  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
343 
344  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
345 
346  // Simulate execution flow
347  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
348 
349  return sc;
350 }

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 767 of file AvalancheSchedulerSvc.cpp.

767  {
768 
769  if ( !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
770  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
771 
772  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
773 
774  return true;
775  }
776  return false;
777 }

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 591 of file AvalancheSchedulerSvc.cpp.

591  {
592 
593  StatusCode global_sc( StatusCode::SUCCESS );
594 
595  // Retry algorithms
596  const size_t retries = m_retryQueue.size();
597  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
598  TaskSpec retryTS = std::move( m_retryQueue.front() );
599  m_retryQueue.pop();
600  global_sc = schedule( std::move( retryTS ) );
601  }
602 
603  // Loop over all slots
604  OccupancySnapshot nextSnap;
605  auto now = std::chrono::system_clock::now();
606  for ( EventSlot& thisSlot : m_eventSlots ) {
607 
608  // Ignore slots without a valid context (relevant when populating scheduler for first time)
609  if ( !thisSlot.eventContext ) continue;
610 
611  int iSlot = thisSlot.eventContext->slot();
612 
613  // Cache the states of the algorithms to improve readability and performance
614  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
615 
616  StatusCode partial_sc = StatusCode::FAILURE;
617 
618  // Make an occupancy snapshot
621 
622  // Initialise snapshot
623  if ( nextSnap.states.empty() ) {
624  nextSnap.time = now;
625  nextSnap.states.resize( m_eventSlots.size() );
626  }
627 
628  // Store alg states
629  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
630  slotStateTotals.resize( AState::MAXVALUE );
631  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
632  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
633  }
634 
635  // Add subslot alg states
636  for ( auto& subslot : thisSlot.allSubSlots ) {
637  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
638  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
639  }
640  }
641  }
642 
643  // Perform DR->SCHEDULED
644  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
645  uint algIndex{*it};
646  const std::string& algName{index2algname( algIndex )};
647  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
648  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
649 
650  partial_sc =
651  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
652 
653  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
654  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
655  << " on processing slot " << iSlot << endmsg;
656  }
657 
658  // Check for algorithms ready in sub-slots
659  for ( auto& subslot : thisSlot.allSubSlots ) {
660  auto& subslotStates = subslot.algsStates;
661  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
662  uint algIndex{*it};
663  const std::string& algName{index2algname( algIndex )};
664  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
665  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
666  partial_sc =
667  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
668  }
669  }
670 
671  if ( m_dumpIntraEventDynamics ) {
673  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
674  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
675  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
678  std::ofstream myfile;
679  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
680  myfile << s.str();
681  myfile.close();
682  }
683 
684  // Not complete because this would mean that the slot is already free!
685  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
686  !thisSlot.algsStates.containsAny(
687  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
688  !subSlotAlgsInStates( thisSlot,
689  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
690  !thisSlot.complete ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
697  << thisSlot.eventContext->slot() << ")." << endmsg;
698  m_finishedEvents.push( thisSlot.eventContext.release() );
699  }
700 
701  // now let's return the fully evaluated result of the control flow
702  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
703 
704  thisSlot.eventContext.reset( nullptr );
705 
706  } else if ( isStalled( thisSlot ) ) {
707  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
708  eventFailed( thisSlot.eventContext.get() ); // can't release yet
709  }
710  partial_sc.ignore();
711  } // end loop on slots
712 
713  // Process snapshot
714  if ( !nextSnap.states.empty() ) {
715  m_lastSnapshot = nextSnap.time;
716  m_snapshotCallback( std::move( nextSnap ) );
717  }
718 
719  ON_VERBOSE verbose() << "Iteration done." << endmsg;
720  m_needsUpdate.store( false );
721  return global_sc;
722 }

◆ next()

bool AvalancheSchedulerSvc::next ( TaskSpec ts,
bool  blocking = false 
)
inline

Definition at line 339 of file AvalancheSchedulerSvc.h.

339  {
340  return blocking ? m_scheduledBlockingQueue.try_pop( ts ) : m_scheduledQueue.try_pop( ts );
341  };

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 550 of file AvalancheSchedulerSvc.cpp.

550  {
551 
552  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
553  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
554  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
555  // << " active: " << m_isActive << endmsg;
556  return StatusCode::FAILURE;
557  } else {
558  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
559  // << " active: " << m_isActive << endmsg;
560  m_finishedEvents.pop( eventContext );
561  ++m_freeSlots;
562  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
563  return StatusCode::SUCCESS;
564  }
565 }

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 475 of file AvalancheSchedulerSvc.cpp.

475  {
476 
477  if ( !eventContext ) {
478  fatal() << "Event context is nullptr" << endmsg;
479  return StatusCode::FAILURE;
480  }
481 
482  if ( m_freeSlots.load() == 0 ) {
483  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
484  return StatusCode::FAILURE;
485  }
486 
487  // no problem as push new event is only called from one thread (event loop manager)
488  --m_freeSlots;
489 
490  auto action = [this, eventContext]() -> StatusCode {
491  // Event processing slot forced to be the same as the wb slot
492  const unsigned int thisSlotNum = eventContext->slot();
493  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
494  if ( !thisSlot.complete ) {
495  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
496  return StatusCode::FAILURE;
497  }
498 
499  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
500  thisSlot.reset( eventContext );
501 
502  // Result status code:
504 
505  // promote to CR and DR the initial set of algorithms
506  Cause cs = {Cause::source::Root, "RootDecisionHub"};
507  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
508  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
509  result = StatusCode::FAILURE;
510  }
511 
512  if ( this->iterate().isFailure() ) {
513  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
514  result = StatusCode::FAILURE;
515  }
516 
517  return result;
518  }; // end of lambda
519 
520  // Kick off scheduling
521  ON_VERBOSE {
522  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
523  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
524  }
525 
526  m_actionsQueue.push( action );
527 
528  return StatusCode::SUCCESS;
529 }

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 533 of file AvalancheSchedulerSvc.cpp.

533  {
534  StatusCode sc;
535  for ( auto context : eventContexts ) {
536  sc = pushNewEvent( context );
537  if ( sc != StatusCode::SUCCESS ) return sc;
538  }
539  return sc;
540 }

◆ recordOccupancy()

void AvalancheSchedulerSvc::recordOccupancy ( int  samplePeriod,
std::function< void(OccupancySnapshot)>  callback 
)
overridevirtual

Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.

Definition at line 1090 of file AvalancheSchedulerSvc.cpp.

1090  {
1091 
1092  auto action = [this, samplePeriod, callback{std::move( callback )}]() -> StatusCode {
1093  if ( samplePeriod < 0 ) {
1095  } else {
1098  }
1099  return StatusCode::SUCCESS;
1100  };
1101 
1102  m_actionsQueue.push( std::move( action ) );
1103 }

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int  iAlgo,
EventContext contextPtr,
AState  state,
bool  iterate = false 
)
private

Definition at line 726 of file AvalancheSchedulerSvc.cpp.

726  {
727  StatusCode sc;
728  auto slotIndex = contextPtr->slot();
729  EventSlot& slot = m_eventSlots[slotIndex];
730  Cause cs = {Cause::source::Task, index2algname( iAlgo )};
731 
732  if ( UNLIKELY( contextPtr->usesSubSlot() ) ) {
733  // Sub-slot
734  auto subSlotIndex = contextPtr->subSlot();
735  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
736 
737  sc = subSlot.algsStates.set( iAlgo, state );
738 
739  if ( LIKELY( sc.isSuccess() ) ) {
740  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
741  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
742  // Revise states of algorithms downstream the precedence graph
743  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
744  }
745  } else {
746  // Event level (standard behaviour)
747  sc = slot.algsStates.set( iAlgo, state );
748 
749  if ( LIKELY( sc.isSuccess() ) ) {
750  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
751  << ", event:" << contextPtr->evt() << "]" << endmsg;
752  // Revise states of algorithms downstream the precedence graph
753  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
754  }
755  }
756  return sc;
757 }

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec &&  ts)
private

Definition at line 940 of file AvalancheSchedulerSvc.cpp.

940  {
941 
943  m_retryQueue.push( std::move( ts ) );
944  return StatusCode::SUCCESS;
945  }
946 
947  // Check if a free Algorithm instance is available
948  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
949 
950  // If an instance is available, proceed to scheduling
951  StatusCode sc;
952  if ( LIKELY( getAlgSC.isSuccess() ) ) {
953 
954  // Decide how to schedule the task and schedule it
955  if ( LIKELY( -100 != m_threadPoolSize ) ) {
956 
957  // Cache values before moving the TaskSpec further
958  unsigned int algIndex{ts.algIndex};
959  std::string_view algName( ts.algName );
960  unsigned int algRank{ts.algRank};
961  bool blocking{ts.blocking};
962  int slotIndex{ts.slotIndex};
963  EventContext* contextPtr{ts.contextPtr};
964 
965  if ( LIKELY( !blocking ) ) {
966  // Add the algorithm to the scheduled queue
967  m_scheduledQueue.push( std::move( ts ) );
968 
969  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
970  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, false ) );
971  ++m_algosInFlight;
972 
973  } else { // schedule blocking algorithm in independent thread
975 
976  // Schedule the blocking task in an independent thread
978  std::thread _t( AlgTask( this, serviceLocator(), m_algExecStateSvc, true ) );
979  _t.detach();
980 
981  } // end scheduling blocking Algorithm
982 
983  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
984 
985  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
986  << ", rank:" << algRank << ", blocking:" << ( blocking ? "yes" : "no" )
987  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
989  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
990  : "" )
991  << endmsg;
992 
993  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
994  ++m_algosInFlight;
995  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
996  AlgTask( this, serviceLocator(), m_algExecStateSvc, false )();
997  --m_algosInFlight;
998  }
999  } else { // if no Algorithm instance available, retry later
1000 
1001  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1002  // Add the algorithm to the retry queue
1003  m_retryQueue.push( std::move( ts ) );
1004  }
1005 
1007 
1008  return sc;
1009 }

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1050 of file AvalancheSchedulerSvc.cpp.

1051  {
1052  // Prevent view nesting
1053  if ( sourceContext->usesSubSlot() ) {
1054  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1055  return StatusCode::FAILURE;
1056  }
1057 
1058  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1059 
1060  // It's not possible to create an std::functional from a move-capturing lambda
1061  // So, we have to release the unique pointer
1062  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1063  &nodeName]() -> StatusCode {
1064  // Attach the sub-slot to the top-level slot
1065  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1066 
1067  if ( viewContextPtr ) {
1068  // Re-create the unique pointer
1069  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1070  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1071  return StatusCode::SUCCESS;
1072  } else {
1073  // Disable the view node if there are no views
1074  topSlot.disableSubSlots( nodeName );
1075  return StatusCode::SUCCESS;
1076  }
1077  };
1078 
1079  m_actionsQueue.push( std::move( action ) );
1080 
1081  return StatusCode::SUCCESS;
1082 }

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 1016 of file AvalancheSchedulerSvc.cpp.

1016  {
1017 
1018  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1019 
1020  if ( LIKELY( !ts.blocking ) )
1021  --m_algosInFlight;
1022  else
1024 
1025  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1026  AState state = algstate.execStatus().isSuccess()
1027  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1028  : AState::ERROR;
1029 
1030  // Update algorithm state and revise the downstream states
1031  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1032 
1033  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1034  << ", rank:" << ts.algRank << ", blocking:" << ( ts.blocking ? "yes" : "no" )
1035  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1037  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1038  : "" )
1039  << endmsg;
1040 
1041  // Prompt a call to updateStates
1042  m_needsUpdate.store( true );
1043  return sc;
1044 }

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 571 of file AvalancheSchedulerSvc.cpp.

571  {
572 
573  if ( m_finishedEvents.try_pop( eventContext ) ) {
574  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
575  << endmsg;
576  ++m_freeSlots;
577  return StatusCode::SUCCESS;
578  }
579  return StatusCode::FAILURE;
580 }

Friends And Related Function Documentation

◆ AlgTask

friend class AlgTask
friend

Definition at line 114 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 283 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 243 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 219 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 225 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 249 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 278 of file AvalancheSchedulerSvc.h.

◆ m_arena

tbb::task_arena* AvalancheSchedulerSvc::m_arena {nullptr}
private

Definition at line 333 of file AvalancheSchedulerSvc.h.

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 252 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 183 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 246 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 178 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 188 of file AvalancheSchedulerSvc.h.

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly."}

Definition at line 180 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 234 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 240 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 237 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 210 of file AvalancheSchedulerSvc.h.

◆ m_lastSnapshot

std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now()
private

Definition at line 162 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 335 of file AvalancheSchedulerSvc.h.

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms"}

Definition at line 171 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 334 of file AvalancheSchedulerSvc.h.

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate {true}
private

Definition at line 327 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 176 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 228 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 324 of file AvalancheSchedulerSvc.h.

◆ m_scheduledBlockingQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledBlockingQueue
private

Definition at line 323 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 322 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 196 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 190 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 193 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 173 of file AvalancheSchedulerSvc.h.

◆ m_snapshotCallback

std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback
private

Definition at line 163 of file AvalancheSchedulerSvc.h.

◆ m_snapshotInterval

std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
private

Definition at line 161 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 213 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread."}

Definition at line 165 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 332 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 185 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
private

Definition at line 199 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 231 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 170 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files:
IOTest.evt
evt
Definition: IOTest.py:105
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:231
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
std::vector::resize
T resize(T... args)
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:185
std::string
STL class.
std::list< IAlgorithm * >
Read.app
app
Definition: Read.py:35
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:542
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:355
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:176
std::unordered_set< DataObjID, DataObjID_Hasher >
std::vector::reserve
T reserve(T... args)
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:43
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:158
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:674
gaudirun.s
string s
Definition: gaudirun.py:328
std::vector< int >
std::find
T find(T... args)
std::string::length
T length(T... args)
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:591
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
AlgsExecutionStates
Definition: AlgsExecutionStates.h:36
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:86
std::chrono::duration
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:18
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:162
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
std::stringstream
STL class.
std::unique_ptr::get
T get(T... args)
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:73
std::unique_ptr::release
T release(T... args)
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AlgsExecutionStates::end
Iterator end(State kind)
Definition: AlgsExecutionStates.h:117
std::vector::back
T back(T... args)
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:322
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:940
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:196
compareRootHistos.ts
tuple ts
Definition: compareRootHistos.py:460
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:327
AtlasMCRecoScenario.threads
int threads
Definition: AtlasMCRecoScenario.py:23
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:188
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:444
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:234
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:40
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:333
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:243
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
std::hex
T hex(T... args)
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:158
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:246
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:785
TimelineEvent
Definition: ITimelineSvc.h:23
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:165
bug_34121.t
t
Definition: bug_34121.py:30
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:73
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:334
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:171
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
ITimelineSvc
Definition: ITimelineSvc.h:37
std::vector::at
T at(T... args)
IAlgorithm
Definition: IAlgorithm.h:38
std::atomic::load
T load(T... args)
AlgsExecutionStates::begin
Iterator begin(State kind)
Definition: AlgsExecutionStates.h:116
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::ofstream
STL class.
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:180
LIKELY
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:170
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
Gaudi::Property::value
const ValueType & value() const
Backward compatibility (.
Definition: Property.h:240
std::to_string
T to_string(T... args)
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
AlgExecState::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:43
std::ofstream::close
T close(T... args)
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:173
AvalancheSchedulerSvc::m_scheduledBlockingQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledBlockingQueue
Definition: AvalancheSchedulerSvc.h:323
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:222
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:155
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:158
std::ofstream::open
T open(T... args)
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:29
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:249
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map
STL class.
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:278
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:190
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:335
DataObjID
Definition: DataObjID.h:47
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:63
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:156
std::chrono::duration::min
T min(T... args)
std::ostringstream
STL class.
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:42
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:142
std::vector::emplace_back
T emplace_back(T... args)
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:666
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:178
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:324
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:25
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:84
compareRootHistos.state
def state
Definition: compareRootHistos.py:468
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:161
std
STL namespace.
std::unordered_set::insert
T insert(T... args)
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:332
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
EventContext
Definition: EventContext.h:34
concurrency::AlgorithmNode::getAlgoIndex
const unsigned int & getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:525
TimelineEvent::algorithm
std::string algorithm
Definition: ITimelineSvc.h:31
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:418
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:726
AlgExecState::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:41
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:387
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:283
std::unordered_set::empty
T empty(T... args)
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:216
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:183
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:767
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:114
std::ostringstream::str
T str(T... args)
std::atomic::store
T store(T... args)
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:213
std::unordered_set::end
T end(T... args)
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:193
AlgExecState
Definition: IAlgExecStateSvc.h:37
std::setw
T setw(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::max
T max(T... args)
UNLIKELY
#define UNLIKELY(x)
Definition: Kernel.h:106
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:77
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:237
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:252
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:163
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:475
AvalancheSchedulerSvc::action
std::function< StatusCode()> action
Definition: AvalancheSchedulerSvc.h:156
std::unique_ptr< EventContext >
ProduceConsume.key
key
Definition: ProduceConsume.py:52
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
Cause
Definition: PrecedenceRulesGraph.h:399
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:228
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:210
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:240
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:222
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:806
AvalancheSchedulerSvc::m_verboseSubSlots
Gaudi::Property< bool > m_verboseSubSlots
Definition: AvalancheSchedulerSvc.h:199
std::thread::join
T join(T... args)
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
gaudirun.callback
callback
Definition: gaudirun.py:194
std::chrono::system_clock::now
T now(T... args)