The Gaudi Framework  v30r1 (5d4f4ae2)
AvalancheSchedulerSvc Class Reference

Introduction

More...

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

class  SchedulerState
 

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
 Method to inform the scheduler about event views. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, int algo_index=-1, EventContext *=nullptr)
 Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state) More...
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si, EventContext *)
 Algorithm promotion. More...
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si, EventContext *)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algoritms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 
bool m_first = true
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging ()
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by scheduling the CPU-blocking tasks efficiently. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 102 of file AvalancheSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 363 of file AvalancheSchedulerSvc.cpp.

364 {
365 
366  if ( msgLevel( MSG::DEBUG ) ) debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
367 
369  error() << "problems initializing ThreadPoolSvc" << endmsg;
371  return;
372  }
373 
374  // Wait for actions pushed into the queue by finishing tasks.
375  action thisAction;
377 
378  m_isActive = ACTIVE;
379 
380  // Continue to wait if the scheduler is running or there is something to do
381  info() << "Start checking the actionsQueue" << endmsg;
382  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383  m_actionsQueue.pop( thisAction );
384  sc = thisAction();
385  if ( sc != StatusCode::SUCCESS )
386  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
387  else
388  verbose() << "Action succeeded." << endmsg;
389  }
390 
391  info() << "Terminating thread-pool resources" << endmsg;
393  error() << "Problems terminating thread pool" << endmsg;
395  }
396 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:61
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void AvalancheSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1101 of file AvalancheSchedulerSvc.cpp.

1102 {
1103 
1105  m_sState.push_back( SchedulerState( a, e, t ) );
1106 }
static std::list< SchedulerState > m_sState
T lock(T...args)
unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 431 of file AvalancheSchedulerSvc.cpp.

432 {
433  unsigned int index = m_algname_index_map[algoname];
434  return index;
435 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 406 of file AvalancheSchedulerSvc.cpp.

407 {
408 
409  if ( m_isActive == ACTIVE ) {
410  // Drain the scheduler
411  m_actionsQueue.push( [this]() { return this->m_drain(); } );
412  // This would be the last action
413  m_actionsQueue.push( [this]() -> StatusCode {
415  return StatusCode::SUCCESS;
416  } );
417  }
418 
419  return StatusCode::SUCCESS;
420 }
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode m_drain()
Drain the actions present in the queue.
bool AvalancheSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1109 of file AvalancheSchedulerSvc.cpp.

1110 {
1111 
1113 
1114  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1115  if ( *itr == a ) {
1116  m_sState.erase( itr );
1117  return true;
1118  }
1119  }
1120 
1121  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1122  return false;
1123 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:737
static std::list< SchedulerState > m_sState
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
T lock(T...args)
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 834 of file AvalancheSchedulerSvc.cpp.

835 {
836 
837  // To have just one big message
838  std::ostringstream outputMessageStream;
839 
840  outputMessageStream << "============================== Execution Task State ============================="
841  << std::endl;
842  dumpState( outputMessageStream );
843 
844  outputMessageStream << std::endl
845  << "============================== Scheduler State ================================="
846  << std::endl;
847 
848  int slotCount = -1;
849  for ( auto& thisSlot : m_eventSlots ) {
850  slotCount++;
851  if ( thisSlot.complete ) continue;
852 
853  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
854  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
855 
856  if ( 0 > iSlot or iSlot == slotCount ) {
857 
858  // Snapshot of the Control Flow and FSM states
859  outputMessageStream << "\nControl Flow and FSM states:" << std::endl;
860  outputMessageStream << m_precSvc->printState( thisSlot ) << std::endl;
861 
862  // Mention sub slots
863  if ( thisSlot.allSubSlots.size() ) {
864  outputMessageStream << "\nSub-slots:" << thisSlot.allSubSlots.size() << std::endl;
865  outputMessageStream << "Sub-slot algorithms ready:" << thisSlot.subSlotAlgsReady.size() << std::endl;
866  }
867  }
868  }
869 
870  outputMessageStream << "=================================== END ======================================" << std::endl;
871 
872  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
873 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
virtual const std::string printState(EventSlot &) const =0
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void AvalancheSchedulerSvc::dumpState ( )
override

Definition at line 1137 of file AvalancheSchedulerSvc.cpp.

1138 {
1139 
1141 
1142  std::ostringstream ost;
1143  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1144  dumpState( ost );
1145 
1146  info() << ost.str() << endmsg;
1147 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void AvalancheSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1126 of file AvalancheSchedulerSvc.cpp.

1127 {
1128 
1130 
1131  for ( auto it : m_sState ) {
1132  ost << " " << it << std::endl;
1133  }
1134 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
StatusCode AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 580 of file AvalancheSchedulerSvc.cpp.

581 {
582 
583  // Set the number of slots available to an error code
584  m_freeSlots.store( 0 );
585 
586  fatal() << "*** Event " << eventContext->evt() << " on slot " << eventContext->slot() << " failed! ***" << endmsg;
587 
588  std::ostringstream ost;
589  m_algExecStateSvc->dump( ost, *eventContext );
590 
591  info() << "Dumping Alg Exec State for slot " << eventContext->slot() << ":\n" << ost.str() << endmsg;
592 
593  dumpSchedulerState( -1 );
594  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
595  m_precSvc->dumpPrecedenceRules( m_eventSlots[eventContext->slot()] );
596 
597  // Empty queue and deactivate the service
598  action thisAction;
599  while ( m_actionsQueue.try_pop( thisAction ) ) {
600  };
601  deactivate();
602 
603  // Push into the finished events queue the failed context
604  EventContext* thisEvtContext;
605  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
606  m_finishedEvents.push( thisEvtContext );
607  };
608  m_finishedEvents.push( eventContext );
609 
610  return StatusCode::FAILURE;
611 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode deactivate()
Deactivate scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 332 of file AvalancheSchedulerSvc.cpp.

333 {
334 
336  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
337 
338  sc = deactivate();
339  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
340 
341  info() << "Joining Scheduler thread" << endmsg;
342  m_thread.join();
343 
344  // Final error check after thread pool termination
345  if ( m_isActive == FAILURE ) {
346  error() << "problems in scheduler thread" << endmsg;
347  return StatusCode::FAILURE;
348  }
349 
350  return sc;
351 }
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
std::thread m_thread
The thread in which the activate function runs.
unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 517 of file AvalancheSchedulerSvc.cpp.

517 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 427 of file AvalancheSchedulerSvc.cpp.

427 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 61 of file AvalancheSchedulerSvc.cpp.

62 {
63 
64  // Initialise mother class (read properties, ...)
66  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
67 
68  // Get hold of the TBBSvc. This should initialize the thread pool
69  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
70  if ( !m_threadPoolSvc.isValid() ) {
71  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
72  return StatusCode::FAILURE;
73  }
74 
75  // Activate the scheduler in another thread.
76  info() << "Activating scheduler in a separate thread" << endmsg;
77  m_thread = std::thread( [this]() { this->activate(); } );
78 
79  while ( m_isActive != ACTIVE ) {
80  if ( m_isActive == FAILURE ) {
81  fatal() << "Terminating initialization" << endmsg;
82  return StatusCode::FAILURE;
83  } else {
84  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
85  sleep( 1 );
86  }
87  }
88 
89  if ( m_enableCondSvc ) {
90  // Get hold of the CondSvc
91  m_condSvc = serviceLocator()->service( "CondSvc" );
92  if ( !m_condSvc.isValid() ) {
93  warning() << "No CondSvc found, or not enabled. "
94  << "Will not manage CondAlgorithms" << endmsg;
95  m_enableCondSvc = false;
96  }
97  }
98 
99  // Get the algo resource pool
100  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
101  if ( !m_algResourcePool.isValid() ) {
102  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
107  if ( !m_algExecStateSvc.isValid() ) {
108  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
109  return StatusCode::FAILURE;
110  }
111 
112  // Get Whiteboard
114  if ( !m_whiteboard.isValid() ) {
115  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Get dedicated scheduler for I/O-bound algorithms
120  if ( m_useIOBoundAlgScheduler ) {
123  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
124  }
125 
126  // Set the MaxEventsInFlight parameters from the number of WB stores
128 
129  // Set the number of free slots
131 
132  // set global concurrency flags
134 
135  // Get the list of algorithms
137  const unsigned int algsNumber = algos.size();
138  info() << "Found " << algsNumber << " algorithms" << endmsg;
139 
140  /* Dependencies
141  1) Look for handles in algo, if none
142  2) Assume none are required
143  */
144 
145  DataObjIDColl globalInp, globalOutp;
146 
147  // figure out all outputs
148  for ( IAlgorithm* ialgoPtr : algos ) {
149  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
150  if ( !algoPtr ) {
151  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
152  }
153  for ( auto id : algoPtr->outputDataObjs() ) {
154  auto r = globalOutp.insert( id );
155  if ( !r.second ) {
156  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
157  "though, or control flow may guarantee only one runs...!"
158  << endmsg;
159  }
160  }
161  }
162 
163  std::ostringstream ostdd;
164  ostdd << "Data Dependencies for Algorithms:";
165 
167  for ( IAlgorithm* ialgoPtr : algos ) {
168  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
169  if ( nullptr == algoPtr ) {
170  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
171  << ": this will result in a crash." << endmsg;
172  return StatusCode::FAILURE;
173  }
174 
175  ostdd << "\n " << algoPtr->name();
176 
177  DataObjIDColl algoDependencies;
178  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
179  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
180  DataObjID id = *idp;
181  ostdd << "\n o INPUT " << id;
182  if ( id.key().find( ":" ) != std::string::npos ) {
183  ostdd << " contains alternatives which require resolution...\n";
184  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
185  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
186  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
187  } );
188  if ( itok != tokens.end() ) {
189  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
190  id.updateKey( *itok );
191  } else {
192  error() << "failed to find alternate in global output list"
193  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
194  m_showDataDeps = true;
195  }
196  }
197  algoDependencies.insert( id );
198  globalInp.insert( id );
199  }
200  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
201  ostdd << "\n o OUTPUT " << *id;
202  if ( id->key().find( ":" ) != std::string::npos ) {
203  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
204  << endmsg;
205  m_showDataDeps = true;
206  }
207  }
208  } else {
209  ostdd << "\n none";
210  }
211  algosDependenciesMap[algoPtr->name()] = algoDependencies;
212  }
213 
214  if ( m_showDataDeps ) {
215  info() << ostdd.str() << endmsg;
216  }
217 
218  // Check if we have unmet global input dependencies, and, optionally, heal them
219  // WARNING: this step must be done BEFORE the Precedence Service is initialized
220  if ( m_checkDeps ) {
221  DataObjIDColl unmetDep;
222  for ( auto o : globalInp )
223  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
224 
225  if ( unmetDep.size() > 0 ) {
226 
227  std::ostringstream ost;
228  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
229  ost << "\n o " << *o << " required by Algorithm: ";
230 
231  for ( const auto& p : algosDependenciesMap )
232  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
233  }
234 
235  if ( !m_useDataLoader.empty() ) {
236 
237  // Find the DataLoader Alg
238  IAlgorithm* dataLoaderAlg( nullptr );
239  for ( IAlgorithm* algo : algos )
240  if ( algo->name() == m_useDataLoader ) {
241  dataLoaderAlg = algo;
242  break;
243  }
244 
245  if ( dataLoaderAlg == nullptr ) {
246  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
247  << "\" found, and unmet INPUT dependencies "
248  << "detected:\n"
249  << ost.str() << endmsg;
250  return StatusCode::FAILURE;
251  }
252 
253  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
254  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
255 
256  // Set the property Load of DataLoader Alg
257  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
258  if ( !dataAlg ) {
259  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
260  return StatusCode::FAILURE;
261  }
262 
263  for ( auto& id : unmetDep ) {
264  debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
265  << endmsg;
267  }
268 
269  } else {
270  fatal() << "Auto DataLoading not requested, "
271  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  } else {
276  info() << "No unmet INPUT data dependencies were found" << endmsg;
277  }
278  }
279 
280  // Get the precedence service
281  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
282  if ( !m_precSvc.isValid() ) {
283  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
284  return StatusCode::FAILURE;
285  }
286  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
287  if ( !precSvc ) {
288  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
289  return StatusCode::FAILURE;
290  }
291 
292  // Fill the containers to convert algo names to index
293  m_algname_vect.resize( algsNumber );
294  for ( IAlgorithm* algo : algos ) {
295  const std::string& name = algo->name();
296  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
297  m_algname_index_map[name] = index;
298  m_algname_vect.at( index ) = name;
299  }
300 
301  // Shortcut for the message service
302  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
303  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
304 
306  EventSlot( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc ) );
307  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
308 
309  if ( m_threadPoolSize > 1 ) {
311  }
312 
313  // Clearly inform about the level of concurrency
314  info() << "Concurrency level information:" << endmsg;
315  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
316  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
317 
319 
321 
322  // Simulate execution flow
324 
325  return sc;
326 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:68
Gaudi::Property< bool > m_showDataFlow
StatusCode initialize() override
Definition: Service.cpp:64
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
Gaudi::Property< std::string > m_whiteboardSvcName
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:737
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
Gaudi::Property< bool > m_showDataDeps
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T...args)
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T...args)
virtual void dumpDataFlow() const =0
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:79
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
const DataObjIDColl & inputDataObjs() const override
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
virtual void dumpControlFlow() const =0
Dump precedence rules.
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
T assign(T...args)
Gaudi::Property< bool > m_simulateExecution
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
Gaudi::Property< bool > m_enableCondSvc
Class representing the event slot.
Definition: EventSlot.h:10
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
T for_each(T...args)
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
std::thread m_thread
The thread in which the activate function runs.
StatusCode AvalancheSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 808 of file AvalancheSchedulerSvc.cpp.

809 {
810  // Get the slot
811  EventSlot& thisSlot = m_eventSlots[iSlot];
812 
813  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
814  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
816 
817  info() << "About to declare a stall" << endmsg;
818  fatal() << "*** Stall detected! ***\n" << endmsg;
819  dumpSchedulerState( iSlot );
820  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
821 
822  return StatusCode::FAILURE;
823  }
824  return StatusCode::SUCCESS;
825 }
bool algsPresent(State state) const
T empty(T...args)
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
Definition: EventSlot.h:62
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 523 of file AvalancheSchedulerSvc.cpp.

524 {
525 
526  unsigned int slotNum = 0;
527  for ( auto& thisSlot : m_eventSlots ) {
528  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
529  updateStates( slotNum );
530  }
531  slotNum++;
532  }
533  return StatusCode::SUCCESS;
534 }
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 540 of file AvalancheSchedulerSvc.cpp.

541 {
542  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
543  if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
544  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
545  // << " active: " << m_isActive << endmsg;
546  return StatusCode::FAILURE;
547  } else {
548  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
549  // << " active: " << m_isActive << endmsg;
550  m_finishedEvents.pop( eventContext );
551  m_freeSlots++;
552  if ( msgLevel( MSG::DEBUG ) )
553  debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
554  return StatusCode::SUCCESS;
555  }
556 }
ContextID_t slot() const
Definition: EventContext.h:40
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1048 of file AvalancheSchedulerSvc.cpp.

1050 {
1051  // Check if the execution failed
1052  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1053 
1054  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1055 
1056  if ( !sc.isSuccess() ) {
1057  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1058  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1059  return StatusCode::FAILURE;
1060  }
1061 
1063 
1064  EventSlot& thisSlot = m_eventSlots[si];
1065 
1066  if ( msgLevel( MSG::DEBUG ) )
1067  debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1068  State state;
1069  if ( algo->filterPassed() ) {
1070  state = State::EVTACCEPTED;
1071  } else {
1072  state = State::EVTREJECTED;
1073  }
1074 
1075  // Update states in the appropriate slot
1076  if ( eventContext == thisSlot.eventContext ) {
1077  // Event level (standard behaviour)
1078  sc = thisSlot.algsStates.updateState( iAlgo, state );
1079  } else {
1080  // Sub-slot
1081  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1082  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1083  }
1084 
1085  if ( sc.isSuccess() )
1086  if ( msgLevel( MSG::VERBOSE ) )
1087  verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to "
1089 
1090  if ( msgLevel( MSG::DEBUG ) )
1091  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1092  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1093 
1094  // Schedule an update of the status of the algorithms
1095  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1096 
1097  return sc;
1098 }
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
ContextEvt_t evt() const
Definition: EventContext.h:39
T at(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
State
Execution states of the algorithms.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
void ignore() const
Definition: StatusCode.h:84
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Definition at line 939 of file AvalancheSchedulerSvc.cpp.

940 {
941 
943 
944  // bool IOBound = m_precSvc->isBlocking(algName);
945 
946  const std::string& algName( index2algname( iAlgo ) );
947  IAlgorithm* ialgoPtr = nullptr;
948  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
949 
950  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
951 
953  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
954  // tbb::task)? it seems it works..
955  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
956  IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
957  m_IOBoundAlgScheduler->push( *theTask );
958 
959  if ( msgLevel( MSG::DEBUG ) )
960  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
961  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
962 
963  // Update states in the appropriate event slot
964  StatusCode updateSc;
965  EventSlot& thisSlot = m_eventSlots[si];
966  if ( eventContext == thisSlot.eventContext ) {
967  // Event level (standard behaviour)
968  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
969  } else {
970  // Sub-slot
971  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
972  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
973  }
974 
975  if ( updateSc.isSuccess() )
976  if ( msgLevel( MSG::VERBOSE ) )
977  verbose() << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
978  return updateSc;
979  } else {
980  if ( msgLevel( MSG::DEBUG ) )
981  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
982  << si << endmsg;
983  return sc;
984  }
985 }
Wrapper around I/O-bound Gaudi-algorithms.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual StatusCode push(IAlgTask &task)=0
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 991 of file AvalancheSchedulerSvc.cpp.

993 {
994  // Check if the execution failed
995  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
996 
997  Gaudi::Hive::setCurrentContext( eventContext );
998  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
999 
1000  if ( !sc.isSuccess() ) {
1001  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1002  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1003  return StatusCode::FAILURE;
1004  }
1005 
1006  m_algosInFlight--;
1007 
1008  EventSlot& thisSlot = m_eventSlots[si];
1009 
1010  if ( msgLevel( MSG::DEBUG ) )
1011  debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1012  State state;
1013  if ( algo->filterPassed() ) {
1014  state = State::EVTACCEPTED;
1015  } else {
1016  state = State::EVTREJECTED;
1017  }
1018 
1019  // Update states in the appropriate slot
1020  if ( eventContext == thisSlot.eventContext ) {
1021  // Event level (standard behaviour)
1022  sc = thisSlot.algsStates.updateState( iAlgo, state );
1023  } else {
1024  // Sub-slot
1025  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
1026  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1027  }
1028 
1029  if ( sc.isSuccess() )
1030  if ( msgLevel( MSG::VERBOSE ) )
1031  verbose() << "Promoting " << algo->name() << " on slot " << si << " to " << AlgsExecutionStates::stateNames[state]
1032  << endmsg;
1033 
1034  if ( msgLevel( MSG::DEBUG ) )
1035  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1036  << m_algosInFlight << endmsg;
1037 
1038  // Schedule an update of the status of the algorithms
1039  m_actionsQueue.push( [this, iAlgo, eventContext]() { return this->updateStates( -1, iAlgo, eventContext ); } );
1040 
1041  return sc;
1042 }
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
ContextEvt_t evt() const
Definition: EventContext.h:39
T at(T...args)
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
void ignore() const
Definition: StatusCode.h:84
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode AvalancheSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Algorithm promotion.

Definition at line 877 of file AvalancheSchedulerSvc.cpp.

878 {
879 
881 
882  const std::string& algName( index2algname( iAlgo ) );
883  IAlgorithm* ialgoPtr = nullptr;
884  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
885 
886  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
887 
888  ++m_algosInFlight;
889  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
890  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
891  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
892  } );
893  return StatusCode::SUCCESS;
894  };
895 
896  // Avoid to use tbb if the pool size is 1 and run in this thread
897  if ( -100 != m_threadPoolSize ) {
898  // the child task that executes an Algorithm
899  tbb::task* algoTask = new ( tbb::task::allocate_root() )
900  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
901  // schedule the algoTask
902  tbb::task::enqueue( *algoTask );
903 
904  } else {
905  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
906  theTask.execute();
907  }
908 
909  if ( msgLevel( MSG::DEBUG ) )
910  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
911  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
912 
913  // Update states in the appropriate event slot
914  StatusCode updateSc;
915  EventSlot& thisSlot = m_eventSlots[si];
916  if ( eventContext == thisSlot.eventContext ) {
917  // Event level (standard behaviour)
918  updateSc = thisSlot.algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
919  } else {
920  // Sub-slot
921  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( eventContext );
922  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED );
923  }
924 
925  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
926 
927  if ( updateSc.isSuccess() )
928  if ( msgLevel( MSG::VERBOSE ) ) verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
929  return updateSc;
930  } else {
931  if ( msgLevel( MSG::DEBUG ) )
932  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
933  return sc;
934  }
935 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:40
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:50
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
T at(T...args)
unsigned int m_algosInFlight
Number of algoritms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< int > m_threadPoolSize
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 445 of file AvalancheSchedulerSvc.cpp.

446 {
447 
448  if ( m_first ) {
449  m_first = false;
450  }
451 
452  if ( !eventContext ) {
453  fatal() << "Event context is nullptr" << endmsg;
454  return StatusCode::FAILURE;
455  }
456 
457  if ( m_freeSlots.load() == 0 ) {
458  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
459  return StatusCode::FAILURE;
460  }
461 
462  // no problem as push new event is only called from one thread (event loop manager)
463  m_freeSlots--;
464 
465  auto action = [this, eventContext]() -> StatusCode {
466  // Event processing slot forced to be the same as the wb slot
467  const unsigned int thisSlotNum = eventContext->slot();
468  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
469  if ( !thisSlot.complete ) {
470  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
475  thisSlot.reset( eventContext );
476 
477  // Result status code:
479 
480  // promote to CR and DR the initial set of algorithms
481  Cause cs = {Cause::source::Root, "RootDecisionHub"};
482  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
484  result = StatusCode::FAILURE;
485  }
486 
487  if ( this->updateStates( thisSlotNum ).isFailure() ) {
488  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
489  result = StatusCode::FAILURE;
490  }
491 
492  return result;
493  }; // end of lambda
494 
495  // Kick off the scheduling!
496  if ( msgLevel( MSG::VERBOSE ) ) {
497  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
498  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
499  }
500  m_actionsQueue.push( action );
501 
502  return StatusCode::SUCCESS;
503 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:61
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:52
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:33
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:10
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 506 of file AvalancheSchedulerSvc.cpp.

507 {
508  StatusCode sc;
509  for ( auto context : eventContexts ) {
510  sc = pushNewEvent( context );
511  if ( sc != StatusCode::SUCCESS ) return sc;
512  }
513  return sc;
514 }
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode AvalancheSchedulerSvc::scheduleEventView ( EventContext const *  sourceContext,
std::string const &  nodeName,
EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1151 of file AvalancheSchedulerSvc.cpp.

1153 {
1154  // Find the top-level slot, to attach the sub-slot to
1155  int const topSlotIndex = sourceContext->slot();
1156  EventSlot& topSlot = m_eventSlots[topSlotIndex];
1157 
1158  // Prevent view nesting - this doesn't work because EventContext is copied when passed to algorithm
1159  /*if ( sourceContext != topSlot.eventContext )
1160  {
1161  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1162  return StatusCode::FAILURE;
1163  }*/
1164 
1165  if ( viewContext ) {
1166  // Make new slot by copying the top slot
1167  unsigned int lastIndex = topSlot.allSubSlots.size();
1168  topSlot.allSubSlots.push_back( EventSlot( m_eventSlots[topSlotIndex], viewContext ) );
1169  topSlot.allSubSlots.back().entryPoint = nodeName;
1170 
1171  // Store index of the new slot in lookup structures
1172  topSlot.contextToSlot[viewContext] = lastIndex;
1173  topSlot.subSlotsByNode[nodeName].push_back( lastIndex );
1174  } else {
1175  // Disable the view node if there are no views
1176  topSlot.subSlotsByNode[nodeName] = std::vector<unsigned int>( 0 );
1177  }
1178 
1179  return StatusCode::SUCCESS;
1180 }
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
Definition: EventSlot.h:58
T push_back(T...args)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
Definition: EventSlot.h:56
T size(T...args)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:66
Class representing the event slot.
Definition: EventSlot.h:10
T back(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
Definition: EventSlot.h:64
StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 562 of file AvalancheSchedulerSvc.cpp.

563 {
564  if ( m_finishedEvents.try_pop( eventContext ) ) {
565  if ( msgLevel( MSG::DEBUG ) )
566  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
567  << endmsg;
568  m_freeSlots++;
569  return StatusCode::SUCCESS;
570  }
571  return StatusCode::FAILURE;
572 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode AvalancheSchedulerSvc::updateStates ( int  si = -1,
int  algo_index = -1,
EventContext inputContext = nullptr 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 627 of file AvalancheSchedulerSvc.cpp.

628 {
629 
630  StatusCode global_sc( StatusCode::SUCCESS );
631 
632  // Sort from the oldest to the newest event
633  // Prepare a vector of pointers to the slots to avoid copies
634  std::vector<EventSlot*> eventSlotsPtrs;
635 
636  // Consider all slots if si <0 or just one otherwise
637  if ( si < 0 ) {
638  const int eventsSlotsSize( m_eventSlots.size() );
639  eventSlotsPtrs.reserve( eventsSlotsSize );
640  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
641  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
642  }
643  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
644  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
645  } else {
646  eventSlotsPtrs.push_back( &m_eventSlots[si] );
647  }
648 
649  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
650  int iSlot = thisSlotPtr->eventContext->slot();
651 
652  // Cache the states of the algos to improve readability and performance
653  auto& thisSlot = m_eventSlots[iSlot];
654  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
655 
656  // Perform the I->CR->DR transitions
657  if ( algo_index >= 0 ) {
658  Cause cs = {Cause::source::Task, index2algname( algo_index )};
659 
660  // Pass sub-slots to precedence service if necessary
661  if ( !inputContext || iSlot != (int)inputContext->slot() || inputContext == thisSlot.eventContext ) {
662  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
663  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
664  global_sc = StatusCode::FAILURE;
665  }
666  } else {
667  // An input context that doesn't match the event context for that slot number implies a sub-slot
668  unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
669  if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
670  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot << endmsg;
671  global_sc = StatusCode::FAILURE;
672  }
673  }
674  }
675 
676  StatusCode partial_sc( StatusCode::FAILURE, true );
677 
678  // Perform DR->SCHEDULED
679  if ( !m_optimizationMode.empty() ) {
680  auto comp_nodes = [this]( const uint& i, const uint& j ) {
681  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
682  };
684  comp_nodes, std::vector<uint>() );
685  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
686  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
687  buffer.push( *it );
688  while ( !buffer.empty() ) {
689  bool IOBound = false;
690  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
691 
692  if ( !IOBound )
693  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
694  else
695  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext );
696 
697  if ( msgLevel( MSG::VERBOSE ) )
698  if ( partial_sc.isFailure() )
699  verbose() << "Could not apply transition from "
700  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
701  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
702 
703  buffer.pop();
704  }
705 
706  } else {
707  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
708  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
709  uint algIndex = *it;
710 
711  bool IOBound = false;
712  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
713 
714  if ( !IOBound )
715  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
716  else
717  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
718 
719  if ( msgLevel( MSG::VERBOSE ) )
720  if ( partial_sc.isFailure() )
721  verbose() << "Could not apply transition from "
722  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
723  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
724  }
725  }
726 
727  // Check for algorithms ready in sub-slots
728  if ( thisSlot.subSlotAlgsReady.size() ) {
729  // Any data-ready algorithms that don't get scheduled need to be retried later
731  failedAlgs.reserve( thisSlot.subSlotAlgsReady.size() );
732 
733  // Loop with iterator so we can use it for a fast append if needed
734  for ( auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
735  ++contextAlgPair ) {
737  partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
738 
739  // Add the alg back into the ready list if scheduling failed
740  if ( !partial_sc.isSuccess() ) failedAlgs.push_back( *contextAlgPair );
741  } else {
742  // Don't loop through all remaining algs if we're already busy
743  failedAlgs.insert( failedAlgs.end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
744  break;
745  }
746  }
747 
748  // Update ready list
749  thisSlot.subSlotAlgsReady = failedAlgs;
750  }
751 
752  if ( m_dumpIntraEventDynamics ) {
754  s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
755  << thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
756  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
758  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
759  std::ofstream myfile;
760  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
761  myfile << s.str();
762  myfile.close();
763  }
764 
765  // Not complete because this would mean that the slot is already free!
766  if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
767  thisSlot.subSlotAlgsReady.empty() && // Account for sub-slot algs
768  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
769  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
770  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
771 
772  thisSlot.complete = true;
773  // if the event did not fail, add it to the finished events
774  // otherwise it is taken care of in the error handling already
775  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
776  m_finishedEvents.push( thisSlot.eventContext );
777  if ( msgLevel( MSG::DEBUG ) )
778  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot " << thisSlot.eventContext->slot()
779  << ")." << endmsg;
780  }
781 
782  // now let's return the fully evaluated result of the control flow
783  if ( msgLevel( MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) << endmsg;
784 
785  thisSlot.eventContext = nullptr;
786  } else {
787  StatusCode eventStalledSC = isStalled( iSlot );
788  if ( !eventStalledSC.isSuccess() ) {
789  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
790  eventFailed( thisSlot.eventContext ).ignore();
791  }
792  }
793  } // end loop on slots
794 
795  verbose() << "States Updated." << endmsg;
796 
797  return global_sc;
798 }
T open(T...args)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:40
Gaudi::Property< bool > m_dumpIntraEventDynamics
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:50
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:45
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
Gaudi::Property< std::string > m_optimizationMode
size_t sizeOfSubset(State state) const
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:61
virtual const std::string printState(EventSlot &) const =0
ContextEvt_t evt() const
Definition: EventContext.h:39
Gaudi::Property< bool > m_useIOBoundAlgScheduler
unsigned int m_algosInFlight
Number of algoritms presently in flight.
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
virtual uint getPriority(const std::string &) const =0
Get task priority.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T...args)
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
Gaudi::Property< int > m_threadPoolSize
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T insert(T...args)
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:10
string s
Definition: gaudirun.py:253
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T...args)
void ignore() const
Definition: StatusCode.h:84
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 261 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 220 of file AvalancheSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 190 of file AvalancheSchedulerSvc.h.

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 196 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 228 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 253 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 156 of file AvalancheSchedulerSvc.h.

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 223 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 151 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 161 of file AvalancheSchedulerSvc.h.

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 208 of file AvalancheSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 214 of file AvalancheSchedulerSvc.h.

bool AvalancheSchedulerSvc::m_first = true
private

Definition at line 269 of file AvalancheSchedulerSvc.h.

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 211 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 231 of file AvalancheSchedulerSvc.h.

SmartIF<IAccelerator> AvalancheSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 205 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 143 of file AvalancheSchedulerSvc.h.

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 181 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 268 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 267 of file AvalancheSchedulerSvc.h.

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 144 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 149 of file AvalancheSchedulerSvc.h.

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 199 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 169 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 163 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 166 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 146 of file AvalancheSchedulerSvc.h.

std::mutex AvalancheSchedulerSvc::m_ssMut
staticprivate

Definition at line 301 of file AvalancheSchedulerSvc.h.

std::list< AvalancheSchedulerSvc::SchedulerState > AvalancheSchedulerSvc::m_sState
staticprivate

Definition at line 300 of file AvalancheSchedulerSvc.h.

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 184 of file AvalancheSchedulerSvc.h.

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 139 of file AvalancheSchedulerSvc.h.

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 266 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 158 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 153 of file AvalancheSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 202 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 142 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: