The Gaudi Framework  v32r2 (46d42edc)
AvalancheSchedulerSvc Class Reference

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const
 Backward compatibility function for getting the output level. More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode updateStates (int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
 Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state) More...
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si, EventContext *)
 Algorithm promotion. More...
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si, EventContext *)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
std::vector< unsigned int > m_actionsCounts
 Bookkeeping of the number of actions in flight per slot. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, " unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by scheduling the CPU-blocking tasks efficiently. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 100 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

Definition at line 135 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Constructor & Destructor Documentation

◆ ~AvalancheSchedulerSvc()

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 368 of file AvalancheSchedulerSvc.cpp.

368  {
369 
370  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
371 
373  error() << "problems initializing ThreadPoolSvc" << endmsg;
375  return;
376  }
377 
378  // Wait for actions pushed into the queue by finishing tasks.
379  action thisAction;
381 
382  m_isActive = ACTIVE;
383 
384  // Continue to wait if the scheduler is running or there is something to do
385  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
386  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
387  m_actionsQueue.pop( thisAction );
388  sc = thisAction();
389  ON_VERBOSE {
390  if ( sc.isFailure() )
391  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
392  else
393  verbose() << "Action succeeded." << endmsg;
394  }
395  else sc.ignore();
396  }
397 
398  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
400  error() << "Problems terminating thread pool" << endmsg;
402  }
403 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
#define ON_DEBUG
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Definition: StatusCode.h:130
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 443 of file AvalancheSchedulerSvc.cpp.

443  {
444  unsigned int index = m_algname_index_map[algoname];
445  return index;
446 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 413 of file AvalancheSchedulerSvc.cpp.

413  {
414 
415  if ( m_isActive == ACTIVE ) {
416 
417  // Set the number of slots available to an error code
418  m_freeSlots.store( 0 );
419 
420  // Empty queue
421  action thisAction;
422  while ( m_actionsQueue.try_pop( thisAction ) ) {};
423 
424  // This would be the last action
425  m_actionsQueue.push( [this]() -> StatusCode {
426  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
428  return StatusCode::SUCCESS;
429  } );
430  }
431 
432  return StatusCode::SUCCESS;
433 }
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 775 of file AvalancheSchedulerSvc.cpp.

775  {
776 
777  // To have just one big message
778  std::ostringstream outputMS;
779 
780  outputMS << "Dumping scheduler state\n"
781  << "=========================================================================================\n"
782  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
783  << "=========================================================================================\n\n";
784 
785  //===========================================================================
786 
787  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
788  << "------------------\n\n";
789 
790  // Figure if TimelineSvc is available (used below to detect threads IDs)
791  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
792  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
793  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
794  } else {
795 
796  // Figure optimal printout layout
797  size_t indt( 0 );
798  for ( auto& slot : m_eventSlots )
799  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
800  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
801 
802  // Figure the last running schedule across all slots
803  for ( auto& slot : m_eventSlots ) {
804  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
805  ++it ) {
806 
807  const std::string algoName{index2algname( *it )};
808 
809  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
810  << slot.eventContext->slot();
811 
812  // Try to get POSIX threads IDs the currently running tasks are scheduled to
813  if ( timelineSvc.isValid() ) {
814  TimelineEvent te{};
815  te.algorithm = algoName;
816  te.slot = slot.eventContext->slot();
817  te.event = slot.eventContext->evt();
818 
819  if ( timelineSvc->getTimelineEvent( te ) )
820  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
821  else
822  outputMS << " thread.id: [unknown]"; // this means a task has just
823  // been signed off as SCHEDULED,
824  // but has not been assigned to a thread yet
825  // (i.e., not running yet)
826  }
827  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
828  }
829  }
830  }
831 
832  //===========================================================================
833 
834  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
835  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
836 
837  int slotCount = -1;
838  for ( auto& slot : m_eventSlots ) {
839  ++slotCount;
840  if ( slot.complete ) continue;
841 
842  outputMS << "[ slot: "
843  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
844  << " event: "
845  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
846  << " ]:\n\n";
847 
848  if ( 0 > iSlot || iSlot == slotCount ) {
849 
850  // Snapshot of the Control Flow and FSM states
851  outputMS << m_precSvc->printState( slot ) << "\n";
852 
853  // Mention sub slots (this is expensive if the number of sub-slots is high)
854  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
855  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
856  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
857  for ( auto& ss : slot.allSubSlots ) {
858  outputMS << "[ slot: " << slotID << ", sub-slot: "
859  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
860  << ", entry: " << ss.entryPoint << ", event: "
861  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
862  << " ]:\n\n";
863  outputMS << m_precSvc->printState( ss ) << "\n";
864  }
865  }
866  }
867  }
868 
869  //===========================================================================
870 
871  if ( 0 <= iSlot ) {
872  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
873  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
874  }
875 
876  outputMS << "\n=========================================================================================\n"
877  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
878  << "=========================================================================================\n\n";
879 
880  info() << outputMS.str() << endmsg;
881 }
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
T to_string(T... args)
std::string algorithm
Definition: ITimelineSvc.h:21
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T setw(T... args)
STL class.
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
Gaudi::Property< bool > m_verboseSubSlots
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
T str(T... args)
T length(T... args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T hex(T... args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 754 of file AvalancheSchedulerSvc.cpp.

754  {
755  const uint slotIdx = eventContext->slot();
756 
757  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
758 
759  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
760 
761  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
763 
764  // Push into the finished events queue the failed context
765  m_eventSlots[slotIdx].complete = true;
766  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
767 }
ContextID_t slot() const
Definition: EventContext.h:41
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
ContextEvt_t evt() const
Definition: EventContext.h:40
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 337 of file AvalancheSchedulerSvc.cpp.

337  {
338 
340  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
341 
342  sc = deactivate();
343  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
344 
345  info() << "Joining Scheduler thread" << endmsg;
346  m_thread.join();
347 
348  // Final error check after thread pool termination
349  if ( m_isActive == FAILURE ) {
350  error() << "problems in scheduler thread" << endmsg;
351  return StatusCode::FAILURE;
352  }
353 
354  return sc;
355 }
StatusCode finalize() override
Definition: Service.cpp:164
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
T join(T... args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::thread m_thread
The thread in which the activate function runs.

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 526 of file AvalancheSchedulerSvc.cpp.

526 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T... args)

◆ index2algname()

const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 439 of file AvalancheSchedulerSvc.cpp.

439 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 63 of file AvalancheSchedulerSvc.cpp.

63  {
64 
65  // Initialise mother class (read properties, ...)
67  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
68 
69  // Get hold of the TBBSvc. This should initialize the thread pool
70  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
71  if ( !m_threadPoolSvc.isValid() ) {
72  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
73  return StatusCode::FAILURE;
74  }
75 
76  // Activate the scheduler in another thread.
77  info() << "Activating scheduler in a separate thread" << endmsg;
78  m_thread = std::thread( [this]() { this->activate(); } );
79 
80  while ( m_isActive != ACTIVE ) {
81  if ( m_isActive == FAILURE ) {
82  fatal() << "Terminating initialization" << endmsg;
83  return StatusCode::FAILURE;
84  } else {
85  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
86  sleep( 1 );
87  }
88  }
89 
90  if ( m_enableCondSvc ) {
91  // Get hold of the CondSvc
92  m_condSvc = serviceLocator()->service( "CondSvc" );
93  if ( !m_condSvc.isValid() ) {
94  warning() << "No CondSvc found, or not enabled. "
95  << "Will not manage CondAlgorithms" << endmsg;
96  m_enableCondSvc = false;
97  }
98  }
99 
100  // Get the algo resource pool
101  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
102  if ( !m_algResourcePool.isValid() ) {
103  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
104  return StatusCode::FAILURE;
105  }
106 
107  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
108  if ( !m_algExecStateSvc.isValid() ) {
109  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
110  return StatusCode::FAILURE;
111  }
112 
113  // Get Whiteboard
115  if ( !m_whiteboard.isValid() ) {
116  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
117  return StatusCode::FAILURE;
118  }
119 
120  // Get dedicated scheduler for I/O-bound algorithms
121  if ( m_useIOBoundAlgScheduler ) {
124  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
125  }
126 
127  // Set the MaxEventsInFlight parameters from the number of WB stores
129 
130  // Set the number of free slots
132 
133  // Get the list of algorithms
135  const unsigned int algsNumber = algos.size();
136  if ( algsNumber != 0 ) {
137  info() << "Found " << algsNumber << " algorithms" << endmsg;
138  } else {
139  error() << "No algorithms found" << endmsg;
140  return StatusCode::FAILURE;
141  }
142 
143  /* Dependencies
144  1) Look for handles in algo, if none
145  2) Assume none are required
146  */
147 
148  DataObjIDColl globalInp, globalOutp;
149 
150  // figure out all outputs
151  for ( IAlgorithm* ialgoPtr : algos ) {
152  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
153  if ( !algoPtr ) {
154  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
155  return StatusCode::FAILURE;
156  }
157  for ( auto id : algoPtr->outputDataObjs() ) {
158  auto r = globalOutp.insert( id );
159  if ( !r.second ) {
160  warning() << "multiple algorithms declare " << id
161  << " as output! could be a single instance in multiple paths "
162  "though, or control flow may guarantee only one runs...!"
163  << endmsg;
164  }
165  }
166  }
167 
168  std::ostringstream ostdd;
169  ostdd << "Data Dependencies for Algorithms:";
170 
171  std::map<std::string, DataObjIDColl> algosDependenciesMap;
172  for ( IAlgorithm* ialgoPtr : algos ) {
173  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
174  if ( nullptr == algoPtr ) {
175  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176  << ": this will result in a crash." << endmsg;
177  return StatusCode::FAILURE;
178  }
179 
180  ostdd << "\n " << algoPtr->name();
181 
182  DataObjIDColl algoDependencies;
183  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
184  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
185  DataObjID id = *idp;
186  ostdd << "\n o INPUT " << id;
187  if ( id.key().find( ":" ) != std::string::npos ) {
188  ostdd << " contains alternatives which require resolution...\n";
189  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
190  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
191  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
192  } );
193  if ( itok != tokens.end() ) {
194  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
195  id.updateKey( *itok );
196  } else {
197  error() << "failed to find alternate in global output list"
198  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
199  m_showDataDeps = true;
200  }
201  }
202  algoDependencies.insert( id );
203  globalInp.insert( id );
204  }
205  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
206  ostdd << "\n o OUTPUT " << *id;
207  if ( id->key().find( ":" ) != std::string::npos ) {
208  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
209  << endmsg;
210  m_showDataDeps = true;
211  }
212  }
213  } else {
214  ostdd << "\n none";
215  }
216  algosDependenciesMap[algoPtr->name()] = algoDependencies;
217  }
218 
219  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
263  << endmsg;
264  return StatusCode::FAILURE;
265  }
266 
267  for ( auto& id : unmetDep ) {
268  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
269  << dataLoaderAlg->name() << endmsg;
271  }
272 
273  } else {
274  fatal() << "Auto DataLoading not requested, "
275  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
276  return StatusCode::FAILURE;
277  }
278 
279  } else {
280  info() << "No unmet INPUT data dependencies were found" << endmsg;
281  }
282  }
283 
284  // Get the precedence service
285  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
286  if ( !m_precSvc.isValid() ) {
287  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
291  if ( !precSvc ) {
292  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  // Fill the containers to convert algo names to index
297  m_algname_vect.resize( algsNumber );
298  for ( IAlgorithm* algo : algos ) {
299  const std::string& name = algo->name();
300  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
301  m_algname_index_map[name] = index;
302  m_algname_vect.at( index ) = name;
303  }
304 
305  // Shortcut for the message service
306  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
307  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
308 
310  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
312  m_eventSlots.back().complete = true;
313  }
315 
316  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
317 
318  // Clearly inform about the level of concurrency
319  info() << "Concurrency level information:" << endmsg;
320  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
321  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
322 
324 
326 
327  // Simulate execution flow
329 
330  return sc;
331 }
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
StatusCode initialize() override
Definition: Service.cpp:60
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
T empty(T... args)
Gaudi::Property< std::string > m_whiteboardSvcName
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
void activate()
Activate scheduler.
T end(T... args)
Gaudi::Property< std::string > m_useDataLoader
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
STL class.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T... args)
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T... args)
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:274
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
T str(T... args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T insert(T... args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
T find_if(T... args)
T size(T... args)
T assign(T... args)
Gaudi::Property< bool > m_simulateExecution
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T back(T... args)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
const DataObjIDColl & inputDataObjs() const override
virtual void dumpControlFlow() const =0
Dump precedence rules.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:63
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:645
T reserve(T... args)
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 735 of file AvalancheSchedulerSvc.cpp.

735  {
736 
737  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
738  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
739  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
740 
741  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
742 
743  return true;
744  }
745  return false;
746 }
ContextID_t slot() const
Definition: EventContext.h:41
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 532 of file AvalancheSchedulerSvc.cpp.

532  {
533 
534  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
535  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
536  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
537  // << " active: " << m_isActive << endmsg;
538  return StatusCode::FAILURE;
539  } else {
540  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
541  // << " active: " << m_isActive << endmsg;
542  m_finishedEvents.pop( eventContext );
543  ++m_freeSlots;
544  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
545  return StatusCode::SUCCESS;
546  }
547 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192

◆ promoteToAsyncExecuted()

StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1071 of file AvalancheSchedulerSvc.cpp.

1072  {
1073  Gaudi::Hive::setCurrentContext( eventContext );
1074  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1075 
1076  if ( sc.isFailure() ) {
1077  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1078  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1079  return StatusCode::FAILURE;
1080  }
1081 
1083 
1084  EventSlot& thisSlot = m_eventSlots[si];
1085 
1086  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1087  << endmsg;
1088 
1089  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1090  AState state = algstate.execStatus().isSuccess()
1091  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1092  : AState::ERROR;
1093 
1094  // Update states in the appropriate slot
1095  int subSlotIndex = -1;
1096  if ( eventContext->usesSubSlot() ) {
1097  // Sub-slot
1098  subSlotIndex = eventContext->subSlot();
1099  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1100  } else {
1101  // Event level (standard behaviour)
1102  sc = thisSlot.algsStates.set( iAlgo, state );
1103  }
1104 
1105  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1106  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1107 
1108  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1109  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1110 
1111  // Schedule an update of the status of the algorithms
1112  ++m_actionsCounts[si];
1113  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1114  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1115  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1116  } );
1117 
1118  return sc;
1119 }
#define ON_DEBUG
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
ContextID_t slot() const
Definition: EventContext.h:41
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
ContextEvt_t evt() const
Definition: EventContext.h:40
bool filterPassed() const
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
const StatusCode & execStatus() const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isSuccess() const
Definition: StatusCode.h:267
GAUDI_API void setCurrentContext(const EventContext *ctx)
ContextID_t subSlot() const
Definition: EventContext.h:42
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool isFailure() const
Definition: StatusCode.h:130
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE

◆ promoteToAsyncScheduled()

StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Definition at line 946 of file AvalancheSchedulerSvc.cpp.

946  {
947 
949 
950  // bool IOBound = m_precSvc->isBlocking(algName);
951 
952  const std::string& algName( index2algname( iAlgo ) );
953  IAlgorithm* ialgoPtr = nullptr;
954  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
955 
956  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
957 
959  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
960  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
961  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
962  eventContext );
963  } );
964  return StatusCode::SUCCESS;
965  };
966  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
967  // tbb::task)? it seems it works..
968 
969  // FIXME - The memory allocation here is causing memory leaks as detected by the gcc leak sanitizer
970  //
971  // clang-format off
972  // Direct leak of 224 byte(s) in 7 object(s) allocated from:
973  // #0 0x7fc0cb524da8 in operator new(unsigned long) /afs/cern.ch/cms/CAF/CMSCOMM/COMM_ECAL/dkonst/GCC/build/contrib/gcc-8.2.0/src/gcc/8.2.0/libsanitizer/lsan/lsan_interceptors.cc:229
974  // #1 0x7fc0ba979f7b in function<AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*)::<lambda()> > /cvmfs/lhcb.cern.ch/lib/lcg/releases/gcc/8.2.0-3fa06/x86_64-centos7/include/c++/8.2.0/bits/std_function.h:249
975  // #2 0x7fc0ba97d181 in AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:969
976  // #3 0x7fc0ba98354d in AvalancheSchedulerSvc::updateStates(int, int, int, int) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:660
977  // clang-format on
978  //
979  // These leaks are currently suppressed in Gaudi/job/Gaudi-LSan.supp - remove entry there to reactivate
980  //
981  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
982  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
983  m_IOBoundAlgScheduler->push( *theTask );
984  //
985  // FIXME
986 
987  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
988  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
989 
990  // Update states in the appropriate event slot
991  StatusCode updateSc;
992  EventSlot& thisSlot = m_eventSlots[si];
993  if ( eventContext->usesSubSlot() ) {
994  // Sub-slot
995  size_t const subSlotIndex = eventContext->subSlot();
996  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
997  } else {
998  // Event level (standard behaviour)
999  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
1000  }
1001 
1002  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
1003  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
1004  return updateSc;
1005  } else {
1006  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1007  << " on slot " << si << endmsg;
1008  return sc;
1009  }
1010 }
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
Class representing an event slot.
Definition: EventSlot.h:14
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:41
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
ContextEvt_t evt() const
Definition: EventContext.h:40
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
STL class.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isSuccess() const
Definition: StatusCode.h:267
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
ContextID_t subSlot() const
Definition: EventContext.h:42
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
virtual StatusCode push(IAlgTask &task)=0
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE

◆ promoteToExecuted()

StatusCode AvalancheSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 1017 of file AvalancheSchedulerSvc.cpp.

1018  {
1019  Gaudi::Hive::setCurrentContext( eventContext );
1020  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1021 
1022  if ( sc.isFailure() ) {
1023  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1024  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1025  return StatusCode::FAILURE;
1026  }
1027 
1028  --m_algosInFlight;
1029 
1030  EventSlot& thisSlot = m_eventSlots[si];
1031 
1032  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1033 
1034  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1035  AState state = algstate.execStatus().isSuccess()
1036  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1037  : AState::ERROR;
1038 
1039  // Update states in the appropriate slot
1040  int subSlotIndex = -1;
1041  if ( eventContext->usesSubSlot() ) {
1042  // Sub-slot
1043  subSlotIndex = eventContext->subSlot();
1044  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1045  } else {
1046  // Event level (standard behaviour)
1047  sc = thisSlot.algsStates.set( iAlgo, state );
1048  }
1049 
1050  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1051  << "Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1052 
1053  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1054  << m_algosInFlight << endmsg;
1055 
1056  // Schedule an update of the status of the algorithms
1057  ++m_actionsCounts[si];
1058  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1059  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1060  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1061  } );
1062 
1063  return sc;
1064 }
#define ON_DEBUG
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
ContextID_t slot() const
Definition: EventContext.h:41
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
ContextEvt_t evt() const
Definition: EventContext.h:40
bool filterPassed() const
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
unsigned int m_algosInFlight
Number of algorithms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
const StatusCode & execStatus() const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isSuccess() const
Definition: StatusCode.h:267
GAUDI_API void setCurrentContext(const EventContext *ctx)
ContextID_t subSlot() const
Definition: EventContext.h:42
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool isFailure() const
Definition: StatusCode.h:130
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE

◆ promoteToFinished()

StatusCode AvalancheSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private

◆ promoteToScheduled()

StatusCode AvalancheSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Algorithm promotion.

Definition at line 885 of file AvalancheSchedulerSvc.cpp.

885  {
886 
888 
889  const std::string& algName( index2algname( iAlgo ) );
890  IAlgorithm* ialgoPtr = nullptr;
891  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
892 
893  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
894 
895  ++m_algosInFlight;
896  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
897  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
898  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
899  } );
900  return StatusCode::SUCCESS;
901  };
902 
903  // Avoid to use tbb if the pool size is 1 and run in this thread
904  if ( -100 != m_threadPoolSize ) {
905  // the child task that executes an Algorithm
906  tbb::task* algoTask = new ( tbb::task::allocate_root() )
907  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
908  // schedule the algoTask
909  tbb::task::enqueue( *algoTask );
910 
911  } else {
912  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
913  promote2ExecutedClosure );
914  theTask.execute();
915  }
916 
917  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
918  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
919 
920  // Update states in the appropriate event slot
921  StatusCode updateSc;
922  EventSlot& thisSlot = m_eventSlots[si];
923  if ( eventContext->usesSubSlot() ) {
924  // Sub-slot
925  size_t const subSlotIndex = eventContext->subSlot();
926  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
927  } else {
928  // Event level (standard behaviour)
929  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
930  }
931 
933 
934  if ( updateSc.isSuccess() )
935  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
936  return updateSc;
937  } else {
938  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
939  << endmsg;
940  return sc;
941  }
942 }
#define ON_DEBUG
Class representing an event slot.
Definition: EventSlot.h:14
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
STL class.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
bool isSuccess() const
Definition: StatusCode.h:267
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
ContextID_t subSlot() const
Definition: EventContext.h:42
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 457 of file AvalancheSchedulerSvc.cpp.

457  {
458 
459  if ( !eventContext ) {
460  fatal() << "Event context is nullptr" << endmsg;
461  return StatusCode::FAILURE;
462  }
463 
464  if ( m_freeSlots.load() == 0 ) {
465  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
466  return StatusCode::FAILURE;
467  }
468 
469  // no problem as push new event is only called from one thread (event loop manager)
470  --m_freeSlots;
471 
472  auto action = [this, eventContext]() -> StatusCode {
473  // Event processing slot forced to be the same as the wb slot
474  const unsigned int thisSlotNum = eventContext->slot();
475  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
476  if ( !thisSlot.complete ) {
477  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
478  return StatusCode::FAILURE;
479  }
480 
481  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
482  thisSlot.reset( eventContext );
483 
484  // Result status code:
486 
487  // promote to CR and DR the initial set of algorithms
488  Cause cs = {Cause::source::Root, "RootDecisionHub"};
489  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
490  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
491  result = StatusCode::FAILURE;
492  }
493 
494  if ( this->updateStates( thisSlotNum ).isFailure() ) {
495  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
496  result = StatusCode::FAILURE;
497  }
498 
499  return result;
500  }; // end of lambda
501 
502  // Kick off the scheduling!
503  ON_VERBOSE {
504  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
505  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
506  }
507 
508  m_actionsQueue.push( action );
509 
510  return StatusCode::SUCCESS;
511 }
#define ON_DEBUG
Class representing an event slot.
Definition: EventSlot.h:14
ContextID_t slot() const
Definition: EventContext.h:41
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextEvt_t evt() const
Definition: EventContext.h:40
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
bool isFailure() const
Definition: StatusCode.h:130
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 515 of file AvalancheSchedulerSvc.cpp.

515  {
516  StatusCode sc;
517  for ( auto context : eventContexts ) {
518  sc = pushNewEvent( context );
519  if ( sc != StatusCode::SUCCESS ) return sc;
520  }
521  return sc;
522 }
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1125 of file AvalancheSchedulerSvc.cpp.

1126  {
1127  // Prevent view nesting
1128  if ( sourceContext->usesSubSlot() ) {
1129  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1130  return StatusCode::FAILURE;
1131  }
1132 
1133  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1134 
1135  // It's not possible to create an std::functional from a move-capturing lambda
1136  // So, we have to release the unique pointer
1137  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1138  &nodeName]() -> StatusCode {
1139  // Attach the sub-slot to the top-level slot
1140  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1141 
1142  if ( viewContextPtr ) {
1143  // Re-create the unique pointer
1144  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1145  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1146  return StatusCode::SUCCESS;
1147  } else {
1148  // Disable the view node if there are no views
1149  topSlot.disableSubSlots( nodeName );
1150  return StatusCode::SUCCESS;
1151  }
1152  };
1153 
1154  m_actionsQueue.push( std::move( action ) );
1155 
1156  return StatusCode::SUCCESS;
1157 }
Class representing an event slot.
Definition: EventSlot.h:14
ContextID_t slot() const
Definition: EventContext.h:41
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:68
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:51
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
T release(T... args)
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T move(T... args)
T get(T... args)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool usesSubSlot() const
Definition: EventContext.h:43
#define ON_VERBOSE

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 553 of file AvalancheSchedulerSvc.cpp.

553  {
554 
555  if ( m_finishedEvents.try_pop( eventContext ) ) {
556  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
557  << endmsg;
558  ++m_freeSlots;
559  return StatusCode::SUCCESS;
560  }
561  return StatusCode::FAILURE;
562 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192

◆ updateStates()

StatusCode AvalancheSchedulerSvc::updateStates ( int  si = -1,
int  algo_index = -1,
int  sub_slot = -1,
int  source_slot = -1 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 576 of file AvalancheSchedulerSvc.cpp.

577  {
578 
579  StatusCode global_sc( StatusCode::SUCCESS );
580 
581  // Sort from the oldest to the newest event
582  // Prepare a vector of pointers to the slots to avoid copies
583  std::vector<EventSlot*> eventSlotsPtrs;
584 
585  // Consider all slots if si <0 or just one otherwise
586  if ( si < 0 ) {
587  const int eventsSlotsSize( m_eventSlots.size() );
588  eventSlotsPtrs.reserve( eventsSlotsSize );
589  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
590  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
591  }
592  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
593  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
594  } else {
595  eventSlotsPtrs.push_back( &m_eventSlots[si] );
596  }
597 
598  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
599  int iSlot = thisSlotPtr->eventContext->slot();
600 
601  // Cache the states of the algos to improve readability and performance
602  auto& thisSlot = m_eventSlots[iSlot];
603  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
604 
605  // Perform the I->CR->DR transitions
606  if ( algo_index >= 0 ) {
607  Cause cs = {Cause::source::Task, index2algname( algo_index )};
608 
609  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
610  if ( sub_slot == -1 || iSlot != source_slot ) {
611  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
612  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
613  global_sc = StatusCode::FAILURE;
614  }
615  } else {
616  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
617  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
618  global_sc = StatusCode::FAILURE;
619  }
620  }
621  }
622 
623  StatusCode partial_sc( StatusCode::FAILURE, true );
624 
625  // Perform DR->SCHEDULED
626  if ( !m_optimizationMode.empty() ) {
627  auto comp_nodes = [this]( const uint& i, const uint& j ) {
628  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
629  };
630  std::priority_queue<uint, std::vector<uint>, std::function<bool( const uint&, const uint& )>> buffer(
631  comp_nodes, std::vector<uint>() );
632  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
633  buffer.push( *it );
634  while ( !buffer.empty() ) {
635  bool IOBound = false;
636  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
637 
638  if ( !IOBound )
639  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
640  else
641  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
642 
643  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
644  << "Could not apply transition from " << AState::DATAREADY << " for algorithm "
645  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
646 
647  buffer.pop();
648  }
649 
650  } else {
651  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
652  uint algIndex = *it;
653 
654  bool IOBound = false;
655  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
656 
657  if ( !IOBound )
658  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
659  else
660  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
661 
662  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
663  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
664  << " on processing slot " << iSlot << endmsg;
665  }
666  }
667 
668  // Check for algorithms ready in sub-slots
669  for ( auto& subslot : thisSlot.allSubSlots ) {
670  auto& subslotStates = subslot.algsStates;
671  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
672  uint algIndex{*it};
673  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
674  // The following verbosity is expensive when the number of sub-slots is high
675  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
676  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
677  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
678  }
679  }
680 
681  if ( m_dumpIntraEventDynamics ) {
683  s << ( algo_index != -1 ? index2algname( algo_index ) : "START" ) << ", "
684  << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
685  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
686  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
688  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
689  std::ofstream myfile;
690  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
691  myfile << s.str();
692  myfile.close();
693  }
694 
695  // Not complete because this would mean that the slot is already free!
696  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
697  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
698  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
699  !thisSlot.complete ) {
700 
701  thisSlot.complete = true;
702  // if the event did not fail, add it to the finished events
703  // otherwise it is taken care of in the error handling
704  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
705  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
706  << thisSlot.eventContext->slot() << ")." << endmsg;
707  m_finishedEvents.push( thisSlot.eventContext.release() );
708  }
709 
710  // now let's return the fully evaluated result of the control flow
711  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
712 
713  thisSlot.eventContext.reset( nullptr );
714 
715  } else if ( isStalled( thisSlot ) ) {
716  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
717  eventFailed( thisSlot.eventContext.get() ); // can't release yet
718  }
719  partial_sc.ignore();
720  } // end loop on slots
721 
722  ON_VERBOSE verbose() << "States Updated." << endmsg;
723 
724  return global_sc;
725 }
#define ON_DEBUG
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
ContextID_t slot() const
Definition: EventContext.h:41
T open(T... args)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextEvt_t evt() const
Definition: EventContext.h:40
Gaudi::Property< bool > m_dumpIntraEventDynamics
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
T to_string(T... args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
STL namespace.
T end(T... args)
Gaudi::Property< std::string > m_optimizationMode
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
T push_back(T... args)
STL class.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T... args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T count(T... args)
T size(T... args)
STL class.
T begin(T... args)
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual uint getPriority(const std::string &) const =0
Get task priority.
string s
Definition: gaudirun.py:318
constexpr static const auto FAILURE
Definition: StatusCode.h:86
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T... args)
bool isFailure() const
Definition: StatusCode.h:130
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
T reserve(T... args)
#define ON_VERBOSE
Iterator end(State kind)

Member Data Documentation

◆ m_actionsCounts

std::vector<unsigned int> AvalancheSchedulerSvc::m_actionsCounts
private

Bookkeeping of the number of actions in flight per slot.

Definition at line 263 of file AvalancheSchedulerSvc.h.

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 260 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 219 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 192 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 198 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 225 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 255 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 156 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 222 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 151 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 161 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 210 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 216 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 213 of file AvalancheSchedulerSvc.h.

◆ m_IOBoundAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 228 of file AvalancheSchedulerSvc.h.

◆ m_IOBoundAlgScheduler

SmartIF<IAccelerator> AvalancheSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 207 of file AvalancheSchedulerSvc.h.

◆ m_IOBoundAlgSchedulerSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 143 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 183 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 270 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 269 of file AvalancheSchedulerSvc.h.

◆ m_maxIOBoundAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 144 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 149 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 201 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 169 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 163 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 166 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 146 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 186 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 139 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 268 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 158 of file AvalancheSchedulerSvc.h.

◆ m_useIOBoundAlgScheduler

Gaudi::Property<bool> AvalancheSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 153 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
private

Definition at line 172 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 204 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 142 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: