The Gaudi Framework  v30r2 (9eca68f7)
AvalancheSchedulerSvc Class Reference

Introduction

More...

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
 Method to inform the scheduler about event views. More...
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const
 Backward compatibility function for getting the output level. More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, int algo_index=-1, EventContext *=nullptr)
 Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state) More...
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si, EventContext *)
 Algorithm promotion. More...
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si, EventContext *)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algoritms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 
bool m_first = true
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by scheduling the CPU-blocking tasks efficiently. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 103 of file AvalancheSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 362 of file AvalancheSchedulerSvc.cpp.

363 {
364 
365  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
366 
368  error() << "problems initializing ThreadPoolSvc" << endmsg;
370  return;
371  }
372 
373  // Wait for actions pushed into the queue by finishing tasks.
374  action thisAction;
376 
377  m_isActive = ACTIVE;
378 
379  // Continue to wait if the scheduler is running or there is something to do
380  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
381  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
382  m_actionsQueue.pop( thisAction );
383  sc = thisAction();
384  ON_VERBOSE
385  {
386  if ( sc != StatusCode::SUCCESS )
387  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
388  else
389  verbose() << "Action succeeded." << endmsg;
390  }
391  }
392 
393  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
395  error() << "Problems terminating thread pool" << endmsg;
397  }
398 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
#define ON_DEBUG
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isFailure() const
Definition: StatusCode.h:139
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 433 of file AvalancheSchedulerSvc.cpp.

434 {
435  unsigned int index = m_algname_index_map[algoname];
436  return index;
437 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 408 of file AvalancheSchedulerSvc.cpp.

409 {
410 
411  if ( m_isActive == ACTIVE ) {
412  // Drain the scheduler
413  m_actionsQueue.push( [this]() { return this->m_drain(); } );
414  // This would be the last action
415  m_actionsQueue.push( [this]() -> StatusCode {
417  return StatusCode::SUCCESS;
418  } );
419  }
420 
421  return StatusCode::SUCCESS;
422 }
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode m_drain()
Drain the actions present in the queue.
void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 829 of file AvalancheSchedulerSvc.cpp.

830 {
831 
832  // To have just one big message
833  std::ostringstream outputMS;
834 
835  outputMS << "Dumping scheduler state\n"
836  << "=========================================================================================\n"
837  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
838  << "=========================================================================================\n\n";
839 
840  //===========================================================================
841 
842  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
843  << "------------------\n\n";
844 
845  // Figure if TimelineSvc is available (used below to detect threads IDs)
846  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
847  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
848  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
849  } else {
850 
851  // Figure optimal printout layout
852  size_t indt( 0 );
853  for ( auto& slot : m_eventSlots )
854  for ( auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
855  it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it )
856  if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
857 
858  // Figure the last running schedule across all slots
859  for ( auto& slot : m_eventSlots ) {
860  for ( auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
861  it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it ) {
862 
863  const std::string algoName{index2algname( (uint)*it )};
864 
865  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
866  << slot.eventContext->slot();
867 
868  // Try to get POSIX threads IDs the currently running tasks are scheduled to
869  if ( timelineSvc.isValid() ) {
870  TimelineEvent te{};
871  te.algorithm = algoName;
872  te.slot = slot.eventContext->slot();
873  te.event = slot.eventContext->evt();
874 
875  if ( timelineSvc->getTimelineEvent( te ) )
876  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
877  else
878  outputMS << " thread.id: [unknown]"; // this means a task has just
879  // been signed off as SCHEDULED,
880  // but has not been assigned to a thread yet
881  // (i.e., not running yet)
882  }
883  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
884  }
885  }
886  }
887 
888  //===========================================================================
889 
890  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
891  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
892 
893  int slotCount = -1;
894  for ( auto& slot : m_eventSlots ) {
895  slotCount++;
896  if ( slot.complete ) continue;
897 
898  outputMS << "[ slot: "
899  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
900  << " event: "
901  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
902  << " ]:\n\n";
903 
904  if ( 0 > iSlot or iSlot == slotCount ) {
905 
906  // Snapshot of the Control Flow and FSM states
907  outputMS << m_precSvc->printState( slot ) << "\n";
908 
909  // Mention sub slots
910  if ( slot.allSubSlots.size() ) {
911  outputMS << "\nNumber of sub-slots:" << slot.allSubSlots.size() << "\n";
912  outputMS << "Sub-slot algorithms ready:" << slot.subSlotAlgsReady.size() << "\n";
913  }
914  }
915  }
916 
917  //===========================================================================
918 
919  if ( 0 <= iSlot ) {
920  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
921  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
922  }
923 
924  outputMS << "\n=========================================================================================\n"
925  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
926  << "=========================================================================================\n\n";
927 
928  info() << outputMS.str() << endmsg;
929 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
T to_string(T...args)
std::string algorithm
Definition: ITimelineSvc.h:21
T setw(T...args)
virtual const std::string printState(EventSlot &) const =0
STL class.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:79
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
T length(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T hex(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:291
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 581 of file AvalancheSchedulerSvc.cpp.

582 {
583 
584  // Set the number of slots available to an error code
585  m_freeSlots.store( 0 );
586 
587  const uint slotIdx = eventContext->slot();
588 
589  fatal() << "*** Event " << eventContext->evt() << " on slot " << slotIdx << " failed! ***" << endmsg;
590 
591  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
592 
593  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
595 
596  // Empty queue and deactivate the service
597  action thisAction;
598  while ( m_actionsQueue.try_pop( thisAction ) ) {
599  };
600  deactivate();
601 
602  // Push into the finished events queue the failed context
603  EventContext* thisEvtContext;
604  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
605  m_finishedEvents.push( thisEvtContext );
606  };
607  m_finishedEvents.push( eventContext );
608 
609  return StatusCode::FAILURE;
610 }
constexpr static const auto FAILURE
Definition: StatusCode.h:88
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
ContextID_t slot() const
Definition: EventContext.h:40
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode deactivate()
Deactivate scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 331 of file AvalancheSchedulerSvc.cpp.

332 {
333 
335  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
336 
337  sc = deactivate();
338  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
339 
340  info() << "Joining Scheduler thread" << endmsg;
341  m_thread.join();
342 
343  // Final error check after thread pool termination
344  if ( m_isActive == FAILURE ) {
345  error() << "problems in scheduler thread" << endmsg;
346  return StatusCode::FAILURE;
347  }
348 
349  return sc;
350 }
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode finalize() override
Definition: Service.cpp:173
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
std::thread m_thread
The thread in which the activate function runs.
unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 520 of file AvalancheSchedulerSvc.cpp.

520 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 429 of file AvalancheSchedulerSvc.cpp.

429 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 60 of file AvalancheSchedulerSvc.cpp.

61 {
62 
63  // Initialise mother class (read properties, ...)
65  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
66 
67  // Get hold of the TBBSvc. This should initialize the thread pool
68  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
69  if ( !m_threadPoolSvc.isValid() ) {
70  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
71  return StatusCode::FAILURE;
72  }
73 
74  // Activate the scheduler in another thread.
75  info() << "Activating scheduler in a separate thread" << endmsg;
76  m_thread = std::thread( [this]() { this->activate(); } );
77 
78  while ( m_isActive != ACTIVE ) {
79  if ( m_isActive == FAILURE ) {
80  fatal() << "Terminating initialization" << endmsg;
81  return StatusCode::FAILURE;
82  } else {
83  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
84  sleep( 1 );
85  }
86  }
87 
88  if ( m_enableCondSvc ) {
89  // Get hold of the CondSvc
90  m_condSvc = serviceLocator()->service( "CondSvc" );
91  if ( !m_condSvc.isValid() ) {
92  warning() << "No CondSvc found, or not enabled. "
93  << "Will not manage CondAlgorithms" << endmsg;
94  m_enableCondSvc = false;
95  }
96  }
97 
98  // Get the algo resource pool
99  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
100  if ( !m_algResourcePool.isValid() ) {
101  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
102  return StatusCode::FAILURE;
103  }
104 
105  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
106  if ( !m_algExecStateSvc.isValid() ) {
107  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
108  return StatusCode::FAILURE;
109  }
110 
111  // Get Whiteboard
113  if ( !m_whiteboard.isValid() ) {
114  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
115  return StatusCode::FAILURE;
116  }
117 
118  // Get dedicated scheduler for I/O-bound algorithms
119  if ( m_useIOBoundAlgScheduler ) {
122  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
123  }
124 
125  // Set the MaxEventsInFlight parameters from the number of WB stores
127 
128  // Set the number of free slots
130 
131  // set global concurrency flags
133 
134  // Get the list of algorithms
136  const unsigned int algsNumber = algos.size();
137  info() << "Found " << algsNumber << " algorithms" << endmsg;
138 
139  /* Dependencies
140  1) Look for handles in algo, if none
141  2) Assume none are required
142  */
143 
144  DataObjIDColl globalInp, globalOutp;
145 
146  // figure out all outputs
147  for ( IAlgorithm* ialgoPtr : algos ) {
148  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
149  if ( !algoPtr ) {
150  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
151  }
152  for ( auto id : algoPtr->outputDataObjs() ) {
153  auto r = globalOutp.insert( id );
154  if ( !r.second ) {
155  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
156  "though, or control flow may guarantee only one runs...!"
157  << endmsg;
158  }
159  }
160  }
161 
162  std::ostringstream ostdd;
163  ostdd << "Data Dependencies for Algorithms:";
164 
165  std::map<std::string, DataObjIDColl> algosDependenciesMap;
166  for ( IAlgorithm* ialgoPtr : algos ) {
167  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
168  if ( nullptr == algoPtr ) {
169  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
170  << ": this will result in a crash." << endmsg;
171  return StatusCode::FAILURE;
172  }
173 
174  ostdd << "\n " << algoPtr->name();
175 
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
179  DataObjID id = *idp;
180  ostdd << "\n o INPUT " << id;
181  if ( id.key().find( ":" ) != std::string::npos ) {
182  ostdd << " contains alternatives which require resolution...\n";
183  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
184  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if ( itok != tokens.end() ) {
188  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
189  id.updateKey( *itok );
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
193  m_showDataDeps = true;
194  }
195  }
196  algoDependencies.insert( id );
197  globalInp.insert( id );
198  }
199  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
200  ostdd << "\n o OUTPUT " << *id;
201  if ( id->key().find( ":" ) != std::string::npos ) {
202  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
203  << endmsg;
204  m_showDataDeps = true;
205  }
206  }
207  } else {
208  ostdd << "\n none";
209  }
210  algosDependenciesMap[algoPtr->name()] = algoDependencies;
211  }
212 
213  if ( m_showDataDeps ) {
214  info() << ostdd.str() << endmsg;
215  }
216 
217  // Check if we have unmet global input dependencies, and, optionally, heal them
218  // WARNING: this step must be done BEFORE the Precedence Service is initialized
219  if ( m_checkDeps ) {
220  DataObjIDColl unmetDep;
221  for ( auto o : globalInp )
222  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
223 
224  if ( unmetDep.size() > 0 ) {
225 
226  std::ostringstream ost;
227  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
228  ost << "\n o " << *o << " required by Algorithm: ";
229 
230  for ( const auto& p : algosDependenciesMap )
231  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
232  }
233 
234  if ( !m_useDataLoader.empty() ) {
235 
236  // Find the DataLoader Alg
237  IAlgorithm* dataLoaderAlg( nullptr );
238  for ( IAlgorithm* algo : algos )
239  if ( algo->name() == m_useDataLoader ) {
240  dataLoaderAlg = algo;
241  break;
242  }
243 
244  if ( dataLoaderAlg == nullptr ) {
245  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
246  << "\" found, and unmet INPUT dependencies "
247  << "detected:\n"
248  << ost.str() << endmsg;
249  return StatusCode::FAILURE;
250  }
251 
252  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
253  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
254 
255  // Set the property Load of DataLoader Alg
256  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
257  if ( !dataAlg ) {
258  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
259  return StatusCode::FAILURE;
260  }
261 
262  for ( auto& id : unmetDep ) {
263  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
264  << dataLoaderAlg->name() << endmsg;
266  }
267 
268  } else {
269  fatal() << "Auto DataLoading not requested, "
270  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
271  return StatusCode::FAILURE;
272  }
273 
274  } else {
275  info() << "No unmet INPUT data dependencies were found" << endmsg;
276  }
277  }
278 
279  // Get the precedence service
280  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
281  if ( !m_precSvc.isValid() ) {
282  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
283  return StatusCode::FAILURE;
284  }
285  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
286  if ( !precSvc ) {
287  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290 
291  // Fill the containers to convert algo names to index
292  m_algname_vect.resize( algsNumber );
293  for ( IAlgorithm* algo : algos ) {
294  const std::string& name = algo->name();
295  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
296  m_algname_index_map[name] = index;
297  m_algname_vect.at( index ) = name;
298  }
299 
300  // Shortcut for the message service
301  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
302  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
303 
305  EventSlot( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc ) );
306  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
307 
308  if ( m_threadPoolSize > 1 ) {
310  }
311 
312  // Clearly inform about the level of concurrency
313  info() << "Concurrency level information:" << endmsg;
314  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
315  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
316 
318 
320 
321  // Simulate execution flow
323 
324  return sc;
325 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:65
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
StatusCode initialize() override
Definition: Service.cpp:63
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
Gaudi::Property< std::string > m_whiteboardSvcName
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:746
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:288
Gaudi::Property< bool > m_showDataDeps
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
STL class.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T...args)
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T...args)
virtual void dumpDataFlow() const =0
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:79
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
const DataObjIDColl & inputDataObjs() const override
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
virtual void dumpControlFlow() const =0
Dump precedence rules.
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
T assign(T...args)
Gaudi::Property< bool > m_simulateExecution
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
Gaudi::Property< bool > m_enableCondSvc
Class representing the event slot.
Definition: EventSlot.h:10
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
T for_each(T...args)
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:291
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
std::thread m_thread
The thread in which the activate function runs.
StatusCode AvalancheSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 804 of file AvalancheSchedulerSvc.cpp.

805 {
806  // Get the slot
807  EventSlot& thisSlot = m_eventSlots[iSlot];
808 
809  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
810  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
812 
813  info() << "About to declare a stall" << endmsg;
814  fatal() << "*** Stall detected! ***\n" << endmsg;
815 
816  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
817 
818  return StatusCode::FAILURE;
819  }
820  return StatusCode::SUCCESS;
821 }
bool algsPresent(State state) const
constexpr static const auto FAILURE
Definition: StatusCode.h:88
T empty(T...args)
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
Definition: EventSlot.h:62
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 526 of file AvalancheSchedulerSvc.cpp.

527 {
528 
529  unsigned int slotNum = 0;
530  for ( auto& thisSlot : m_eventSlots ) {
531  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
532  updateStates( slotNum );
533  }
534  slotNum++;
535  }
536  return StatusCode::SUCCESS;
537 }
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 543 of file AvalancheSchedulerSvc.cpp.

544 {
545  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
546  if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
547  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
548  // << " active: " << m_isActive << endmsg;
549  return StatusCode::FAILURE;
550  } else {
551  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
552  // << " active: " << m_isActive << endmsg;
553  m_finishedEvents.pop( eventContext );
554  m_freeSlots++;
555  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
556  return StatusCode::SUCCESS;
557  }
558 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
ContextID_t slot() const
Definition: EventContext.h:40
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1092 of file AvalancheSchedulerSvc.cpp.

1094 {
1095  // Check if the execution failed
1096  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1097 
1098  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1099 
1100  if ( sc.isFailure() ) {
1101  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1102  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1103  return StatusCode::FAILURE;
1104  }
1105 
1107 
1108  EventSlot& thisSlot = m_eventSlots[si];
1109 
1110  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1111  << endmsg;
1112 
1113  State state = algo->filterPassed() ? State::EVTACCEPTED : State::EVTREJECTED;
1114 
1115  // Update states in the appropriate slot
1116  if ( eventContext == thisSlot.eventContext ) {
1117  // Event level (standard behaviour)
1118  sc = thisSlot.algsStates.updateState( iAlgo, state );
1119  } else {
1120  // Sub-slot
1121  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1122  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1123  }
1124 
1125  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si
1126  << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1127 
1128  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1129  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1130 
1131  // Schedule an update of the status of the algorithms
1132  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1133 
1134  return sc;
1135 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Definition: StatusCode.h:287
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool isFailure() const
Definition: StatusCode.h:139
ContextEvt_t evt() const
Definition: EventContext.h:39
T at(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
State
Execution states of the algorithms.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
#define ON_VERBOSE
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Definition at line 994 of file AvalancheSchedulerSvc.cpp.

995 {
996 
998 
999  // bool IOBound = m_precSvc->isBlocking(algName);
1000 
1001  const std::string& algName( index2algname( iAlgo ) );
1002  IAlgorithm* ialgoPtr = nullptr;
1003  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
1004 
1005  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
1006 
1008  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
1009  // tbb::task)? it seems it works..
1010  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
1011  IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
1012  m_IOBoundAlgScheduler->push( *theTask );
1013 
1014  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
1015  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1016 
1017  // Update states in the appropriate event slot
1018  StatusCode updateSc;
1019  EventSlot& thisSlot = m_eventSlots[si];
1020  if ( eventContext == thisSlot.eventContext ) {
1021  // Event level (standard behaviour)
1022  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
1023  } else {
1024  // Sub-slot
1025  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1026  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
1027  }
1028 
1029  ON_VERBOSE if ( updateSc.isSuccess() ) verbose() << "[Asynchronous] Promoting " << algName
1030  << " to SCHEDULED on slot " << si << endmsg;
1031  return updateSc;
1032  } else {
1033  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1034  << " on slot " << si << endmsg;
1035  return sc;
1036  }
1037 }
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
constexpr static const auto FAILURE
Definition: StatusCode.h:88
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Definition: StatusCode.h:287
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual StatusCode push(IAlgTask &task)=0
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:291
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
#define ON_VERBOSE
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 1043 of file AvalancheSchedulerSvc.cpp.

1045 {
1046  // Check if the execution failed
1047  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1048 
1049  Gaudi::Hive::setCurrentContext( eventContext );
1050  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1051 
1052  if ( sc.isFailure() ) {
1053  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1054  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1055  return StatusCode::FAILURE;
1056  }
1057 
1058  m_algosInFlight--;
1059 
1060  EventSlot& thisSlot = m_eventSlots[si];
1061 
1062  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1063 
1064  State state = algo->filterPassed() ? State::EVTACCEPTED : State::EVTREJECTED;
1065 
1066  // Update states in the appropriate slot
1067  if ( eventContext == thisSlot.eventContext ) {
1068  // Event level (standard behaviour)
1069  sc = thisSlot.algsStates.updateState( iAlgo, state );
1070  } else {
1071  // Sub-slot
1072  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1073  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1074  }
1075 
1076  ON_VERBOSE if ( sc.isSuccess() ) verbose() << "Promoting " << algo->name() << " on slot " << si << " to "
1078 
1079  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1080  << m_algosInFlight << endmsg;
1081 
1082  // Schedule an update of the status of the algorithms
1083  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1084 
1085  return sc;
1086 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Definition: StatusCode.h:287
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool isFailure() const
Definition: StatusCode.h:139
ContextEvt_t evt() const
Definition: EventContext.h:39
T at(T...args)
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
#define ON_VERBOSE
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode AvalancheSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Algorithm promotion.

Definition at line 933 of file AvalancheSchedulerSvc.cpp.

934 {
935 
937 
938  const std::string& algName( index2algname( iAlgo ) );
939  IAlgorithm* ialgoPtr = nullptr;
940  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
941 
942  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
943 
944  ++m_algosInFlight;
945  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
946  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
947  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
948  } );
949  return StatusCode::SUCCESS;
950  };
951 
952  // Avoid to use tbb if the pool size is 1 and run in this thread
953  if ( -100 != m_threadPoolSize ) {
954  // the child task that executes an Algorithm
955  tbb::task* algoTask = new ( tbb::task::allocate_root() )
956  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
957  // schedule the algoTask
958  tbb::task::enqueue( *algoTask );
959 
960  } else {
961  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
962  theTask.execute();
963  }
964 
965  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
966  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
967 
968  // Update states in the appropriate event slot
969  StatusCode updateSc;
970  EventSlot& thisSlot = m_eventSlots[si];
971  if ( eventContext == thisSlot.eventContext ) {
972  // Event level (standard behaviour)
973  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
974  } else {
975  // Sub-slot
976  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
977  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
978  }
979 
981 
982  if ( updateSc.isSuccess() )
983  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
984  return updateSc;
985  } else {
986  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
987  << endmsg;
988  return sc;
989  }
990 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:40
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Definition: StatusCode.h:287
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
unsigned int m_algosInFlight
Number of algoritms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
Gaudi::Property< int > m_threadPoolSize
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:291
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 447 of file AvalancheSchedulerSvc.cpp.

448 {
449 
450  if ( m_first ) {
451  m_first = false;
452  }
453 
454  if ( !eventContext ) {
455  fatal() << "Event context is nullptr" << endmsg;
456  return StatusCode::FAILURE;
457  }
458 
459  if ( m_freeSlots.load() == 0 ) {
460  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
461  return StatusCode::FAILURE;
462  }
463 
464  // no problem as push new event is only called from one thread (event loop manager)
465  m_freeSlots--;
466 
467  auto action = [this, eventContext]() -> StatusCode {
468  // Event processing slot forced to be the same as the wb slot
469  const unsigned int thisSlotNum = eventContext->slot();
470  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
471  if ( !thisSlot.complete ) {
472  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
473  return StatusCode::FAILURE;
474  }
475 
476  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
477  thisSlot.reset( eventContext );
478 
479  // Result status code:
481 
482  // promote to CR and DR the initial set of algorithms
483  Cause cs = {Cause::source::Root, "RootDecisionHub"};
484  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
485  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
486  result = StatusCode::FAILURE;
487  }
488 
489  if ( this->updateStates( thisSlotNum ).isFailure() ) {
490  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
491  result = StatusCode::FAILURE;
492  }
493 
494  return result;
495  }; // end of lambda
496 
497  // Kick off the scheduling!
498  ON_VERBOSE
499  {
500  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
501  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
502  }
503  m_actionsQueue.push( action );
504 
505  return StatusCode::SUCCESS;
506 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isFailure() const
Definition: StatusCode.h:139
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:33
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 509 of file AvalancheSchedulerSvc.cpp.

510 {
511  StatusCode sc;
512  for ( auto context : eventContexts ) {
513  sc = pushNewEvent( context );
514  if ( sc != StatusCode::SUCCESS ) return sc;
515  }
516  return sc;
517 }
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode AvalancheSchedulerSvc::scheduleEventView ( EventContext const *  sourceContext,
std::string const &  nodeName,
EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1139 of file AvalancheSchedulerSvc.cpp.

1141 {
1142  // Find the top-level slot, to attach the sub-slot to
1143  int const topSlotIndex = sourceContext->slot();
1144  EventSlot& topSlot = m_eventSlots[topSlotIndex];
1145 
1146  // Prevent view nesting - this doesn't work because EventContext is copied when passed to algorithm
1147  /*if ( sourceContext != topSlot.eventContext )
1148  {
1149  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1150  return StatusCode::FAILURE;
1151  }*/
1152 
1153  if ( viewContext ) {
1154  // Make new slot by copying the top slot
1155  unsigned int lastIndex = topSlot.allSubSlots.size();
1156  topSlot.allSubSlots.push_back( EventSlot( m_eventSlots[topSlotIndex], viewContext ) );
1157  topSlot.allSubSlots.back().entryPoint = nodeName;
1158 
1159  // Store index of the new slot in lookup structures
1160  topSlot.contextToSlot[viewContext] = lastIndex;
1161  topSlot.subSlotsByNode[nodeName].push_back( lastIndex );
1162  } else {
1163  // Disable the view node if there are no views
1164  topSlot.subSlotsByNode[nodeName] = std::vector<unsigned int>( 0 );
1165  }
1166 
1167  return StatusCode::SUCCESS;
1168 }
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
Definition: EventSlot.h:58
T push_back(T...args)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
Definition: EventSlot.h:56
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
T size(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
Class representing the event slot.
Definition: EventSlot.h:10
T back(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 564 of file AvalancheSchedulerSvc.cpp.

565 {
566  if ( m_finishedEvents.try_pop( eventContext ) ) {
567  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
568  << endmsg;
569  m_freeSlots++;
570  return StatusCode::SUCCESS;
571  }
572  return StatusCode::FAILURE;
573 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode AvalancheSchedulerSvc::updateStates ( int  si = -1,
int  algo_index = -1,
EventContext inputContext = nullptr 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 626 of file AvalancheSchedulerSvc.cpp.

627 {
628 
629  StatusCode global_sc( StatusCode::SUCCESS );
630 
631  // Sort from the oldest to the newest event
632  // Prepare a vector of pointers to the slots to avoid copies
633  std::vector<EventSlot*> eventSlotsPtrs;
634 
635  // Consider all slots if si <0 or just one otherwise
636  if ( si < 0 ) {
637  const int eventsSlotsSize( m_eventSlots.size() );
638  eventSlotsPtrs.reserve( eventsSlotsSize );
639  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
640  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
641  }
642  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
643  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
644  } else {
645  eventSlotsPtrs.push_back( &m_eventSlots[si] );
646  }
647 
648  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
649  int iSlot = thisSlotPtr->eventContext->slot();
650 
651  // Cache the states of the algos to improve readability and performance
652  auto& thisSlot = m_eventSlots[iSlot];
653  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
654 
655  // Perform the I->CR->DR transitions
656  if ( algo_index >= 0 ) {
657  Cause cs = {Cause::source::Task, index2algname( algo_index )};
658 
659  // Pass sub-slots to precedence service if necessary
660  if ( !inputContext || iSlot != (int)inputContext->slot() || inputContext == thisSlot.eventContext ) {
661  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
662  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
663  global_sc = StatusCode::FAILURE;
664  }
665  } else {
666  // An input context that doesn't match the event context for that slot number implies a sub-slot
667  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
668  if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
669  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot << endmsg;
670  global_sc = StatusCode::FAILURE;
671  }
672  }
673  }
674 
675  StatusCode partial_sc( StatusCode::FAILURE, true );
676 
677  // Perform DR->SCHEDULED
678  if ( !m_optimizationMode.empty() ) {
679  auto comp_nodes = [this]( const uint& i, const uint& j ) {
680  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
681  };
683  comp_nodes, std::vector<uint>() );
684  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
685  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
686  buffer.push( *it );
687  while ( !buffer.empty() ) {
688  bool IOBound = false;
689  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
690 
691  if ( !IOBound )
692  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
693  else
694  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
695 
696  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
697  << "Could not apply transition from "
698  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
699  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
700 
701  buffer.pop();
702  }
703 
704  } else {
705  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
706  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
707  uint algIndex = *it;
708 
709  bool IOBound = false;
710  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
711 
712  if ( !IOBound )
713  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
714  else
715  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
716 
717  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
718  << "Could not apply transition from "
719  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
720  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
721  }
722  }
723 
724  // Check for algorithms ready in sub-slots
725  if ( thisSlot.subSlotAlgsReady.size() ) {
726  // Any data-ready algorithms that don't get scheduled need to be retried later
728  failedAlgs.reserve( thisSlot.subSlotAlgsReady.size() );
729 
730  // Loop with iterator so we can use it for a fast append if needed
731  for ( auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
732  ++contextAlgPair ) {
734  partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
735 
736  // Add the alg back into the ready list if scheduling failed
737  if ( partial_sc.isFailure() ) failedAlgs.push_back( *contextAlgPair );
738  } else {
739  // Don't loop through all remaining algs if we're already busy
740  failedAlgs.insert( failedAlgs.end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
741  break;
742  }
743  }
744 
745  // Update ready list
746  thisSlot.subSlotAlgsReady = failedAlgs;
747  }
748 
749  if ( m_dumpIntraEventDynamics ) {
751  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
752  << thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
753  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
755  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
756  std::ofstream myfile;
757  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
758  myfile << s.str();
759  myfile.close();
760  }
761 
762  // Not complete because this would mean that the slot is already free!
763  if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
764  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
765  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
766  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
767  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
768 
769  thisSlot.complete = true;
770  // if the event did not fail, add it to the finished events
771  // otherwise it is taken care of in the error handling already
772  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
773  m_finishedEvents.push( thisSlot.eventContext );
774  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
775  << thisSlot.eventContext->slot() << ")." << endmsg;
776  }
777 
778  // now let's return the fully evaluated result of the control flow
779  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
780 
781  thisSlot.eventContext = nullptr;
782  } else {
783  StatusCode eventStalledSC = isStalled( iSlot );
784  if ( eventStalledSC.isFailure() ) {
785  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
786  eventFailed( thisSlot.eventContext ).ignore();
787  }
788  }
789  } // end loop on slots
790 
791  ON_VERBOSE verbose() << "States Updated." << endmsg;
792 
793  return global_sc;
794 }
#define ON_DEBUG
constexpr static const auto FAILURE
Definition: StatusCode.h:88
T open(T...args)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:40
Gaudi::Property< bool > m_dumpIntraEventDynamics
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
Gaudi::Property< std::string > m_optimizationMode
size_t sizeOfSubset(State state) const
bool isFailure() const
Definition: StatusCode.h:139
virtual const std::string printState(EventSlot &) const =0
ContextEvt_t evt() const
Definition: EventContext.h:39
Gaudi::Property< bool > m_useIOBoundAlgScheduler
unsigned int m_algosInFlight
Number of algoritms presently in flight.
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
virtual uint getPriority(const std::string &) const =0
Get task priority.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T...args)
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
Gaudi::Property< int > m_threadPoolSize
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T insert(T...args)
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
string s
Definition: gaudirun.py:253
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T...args)
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
T reserve(T...args)
static std::map< State, std::string > stateNames
#define ON_VERBOSE
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 262 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 221 of file AvalancheSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 191 of file AvalancheSchedulerSvc.h.

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 197 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 229 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 254 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 157 of file AvalancheSchedulerSvc.h.

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 224 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 152 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 162 of file AvalancheSchedulerSvc.h.

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 209 of file AvalancheSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 215 of file AvalancheSchedulerSvc.h.

bool AvalancheSchedulerSvc::m_first = true
private

Definition at line 270 of file AvalancheSchedulerSvc.h.

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 212 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 232 of file AvalancheSchedulerSvc.h.

SmartIF<IAccelerator> AvalancheSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 206 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 144 of file AvalancheSchedulerSvc.h.

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 182 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 269 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 268 of file AvalancheSchedulerSvc.h.

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 145 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 150 of file AvalancheSchedulerSvc.h.

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 200 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 170 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 164 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 167 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 147 of file AvalancheSchedulerSvc.h.

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 185 of file AvalancheSchedulerSvc.h.

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 140 of file AvalancheSchedulerSvc.h.

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 267 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 159 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 154 of file AvalancheSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 203 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 143 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: