The Gaudi Framework  v28r3 (cc1cf868)
AvalancheSchedulerSvc Class Reference

Introduction

More...

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  enqueueSchedulerActionTask
 
class  SchedulerState
 

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleBase &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleArrayBase &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandleBase &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, const std::string &algo_name=std::string())
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 Algorithm promotion. More...
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 
bool m_first = true
 
concurrency::PrecedenceRulesGraphm_efg
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by scheduling the CPU-blocking tasks efficiently. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 100 of file AvalancheSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 382 of file AvalancheSchedulerSvc.cpp.

382  {
383 
384  if (msgLevel(MSG::DEBUG))
385  debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
386 
388  error() << "problems initializing ThreadPoolSvc" << endmsg;
390  return;
391  }
392 
393  // Wait for actions pushed into the queue by finishing tasks.
394  action thisAction;
396 
397  m_isActive = ACTIVE;
398 
399  // Continue to wait if the scheduler is running or there is something to do
400  info() << "Start checking the actionsQueue" << endmsg;
401  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
402  m_actionsQueue.pop( thisAction );
403  sc = thisAction();
404  if ( sc != StatusCode::SUCCESS )
405  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
406  else
407  verbose() << "Action succeeded." << endmsg;
408  }
409 
410  info() << "Terminating thread-pool resources" << endmsg;
412  error() << "Problems terminating thread pool" << endmsg;
414  }
415 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void AvalancheSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1128 of file AvalancheSchedulerSvc.cpp.

1129 {
1130 
1132  m_sState.push_back( SchedulerState( a, e, t ) );
1133 }
static std::list< SchedulerState > m_sState
T lock(T...args)
unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 451 of file AvalancheSchedulerSvc.cpp.

451  {
452  unsigned int index = m_algname_index_map[algoname];
453  return index;
454 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 425 of file AvalancheSchedulerSvc.cpp.

425  {
426 
427  if ( m_isActive == ACTIVE ) {
428  // Drain the scheduler
430  // This would be the last action
431  m_actionsQueue.push( [this]() -> StatusCode {
433  return StatusCode::SUCCESS;
434  } );
435  }
436 
437  return StatusCode::SUCCESS;
438 }
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode m_drain()
Drain the actions present in the queue.
bool AvalancheSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1136 of file AvalancheSchedulerSvc.cpp.

1137 {
1138 
1140 
1141  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1142  if ( *itr == a ) {
1143  m_sState.erase( itr );
1144  return true;
1145  }
1146  }
1147 
1148  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1149  return false;
1150 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
static std::list< SchedulerState > m_sState
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
T lock(T...args)
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 834 of file AvalancheSchedulerSvc.cpp.

834  {
835 
836  // To have just one big message
837  std::ostringstream outputMessageStream;
838 
839  outputMessageStream << "============================== Execution Task State ============================="
840  << std::endl;
841  dumpState( outputMessageStream );
842 
843  outputMessageStream << std::endl
844  << "============================== Scheduler State ================================="
845  << std::endl;
846 
847  int slotCount = -1;
848  for ( auto thisSlot : m_eventSlots ) {
849  slotCount++;
850  if ( thisSlot.complete ) continue;
851 
852  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
853  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
854 
855  if ( 0 > iSlot or iSlot == slotCount ) {
856  outputMessageStream << "Algorithms states:" << std::endl;
857 
858  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
859  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
860  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
861  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
862  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
863  const int depsSize = deps.size();
864  if ( depsSize == 0 ) outputMessageStream << " none";
865 
866  DataObjIDColl missing;
867  for ( auto d : deps ) {
868  outputMessageStream << d << " ";
869  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
870  // outputMessageStream << "[missing] ";
871  missing.insert( d );
872  }
873  }
874 
875  if ( !missing.empty() ) {
876  outputMessageStream << ". The following are missing: ";
877  for ( auto d : missing ) {
878  outputMessageStream << d << " ";
879  }
880  }
881 
882  outputMessageStream << std::endl;
883  }
884 
885  // Snapshot of the WhiteBoard
886  outputMessageStream << "\nWhiteboard contents: " << std::endl;
887  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
888 
889  // Snapshot of the ControlFlow
890  outputMessageStream << "\nControl Flow:" << std::endl;
891  std::stringstream cFlowStateStringStream;
892  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
893 
894  outputMessageStream << cFlowStateStringStream.str() << std::endl;
895  }
896  }
897 
898  outputMessageStream << "=================================== END ======================================" << std::endl;
899 
900  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
901 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
T str(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
T insert(T...args)
T size(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void AvalancheSchedulerSvc::dumpState ( )
override

Definition at line 1164 of file AvalancheSchedulerSvc.cpp.

1165 {
1166 
1168 
1169  std::ostringstream ost;
1170  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1171  dumpState( ost );
1172 
1173  info() << ost.str() << endmsg;
1174 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void AvalancheSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1153 of file AvalancheSchedulerSvc.cpp.

1154 {
1155 
1157 
1158  for ( auto it : m_sState ) {
1159  ost << " " << it << std::endl;
1160  }
1161 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
StatusCode AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 588 of file AvalancheSchedulerSvc.cpp.

588  {
589 
590  // Set the number of slots available to an error code
591  m_freeSlots.store( 0 );
592 
593  fatal() << "*** Event " << eventContext->evt() << " on slot "
594  << eventContext->slot() << " failed! ***" << endmsg;
595 
596  std::ostringstream ost;
597  m_algExecStateSvc->dump(ost, *eventContext);
598 
599  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
600  << ":\n" << ost.str() << endmsg;
601 
602  dumpSchedulerState(-1);
603 
604  // Empty queue and deactivate the service
605  action thisAction;
606  while ( m_actionsQueue.try_pop( thisAction ) ) {
607  };
608  deactivate();
609 
610  // Push into the finished events queue the failed context
611  EventContext* thisEvtContext;
612  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
613  m_finishedEvents.push( thisEvtContext );
614  };
615  m_finishedEvents.push( eventContext );
616 
617  return StatusCode::FAILURE;
618 }
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 350 of file AvalancheSchedulerSvc.cpp.

350  {
351 
353  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
354 
355  sc = deactivate();
356  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
357 
358  info() << "Joining Scheduler thread" << endmsg;
359  m_thread.join();
360 
361  // Final error check after thread pool termination
362  if ( m_isActive == FAILURE ) {
363  error() << "problems in scheduler thread" << endmsg;
364  return StatusCode::FAILURE;
365  }
366 
367  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
368 
369  return sc;
370 }
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::thread m_thread
The thread in which the activate function runs.
unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 524 of file AvalancheSchedulerSvc.cpp.

524  {
525  return std::max( m_freeSlots.load(), 0 );
526 }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 445 of file AvalancheSchedulerSvc.cpp.

445  {
446  return m_algname_vect[index];
447 }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 70 of file AvalancheSchedulerSvc.cpp.

70  {
71 
72  // Initialise mother class (read properties, ...)
74  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
75 
76  // Get hold of the TBBSvc. This should initialize the thread pool
77  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
78  if ( !m_threadPoolSvc.isValid() ) {
79  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
80  return StatusCode::FAILURE;
81  }
82 
83  // Activate the scheduler in another thread.
84  info() << "Activating scheduler in a separate thread" << endmsg;
86 
87  while ( m_isActive != ACTIVE ) {
88  if ( m_isActive == FAILURE ) {
89  fatal() << "Terminating initialization" << endmsg;
90  return StatusCode::FAILURE;
91  } else {
92  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
93  sleep( 1 );
94  }
95  }
96 
97  // Get the algo resource pool
98  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
99  if ( !m_algResourcePool.isValid() ) {
100  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
101  return StatusCode::FAILURE;
102  }
103 
104  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
105  if (!m_algExecStateSvc.isValid()) {
106  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
107  return StatusCode::FAILURE;
108  }
109 
110  // Get Whiteboard
112  if ( !m_whiteboard.isValid() ) {
113  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
114  return StatusCode::FAILURE;
115  }
116 
117  // Get dedicated scheduler for I/O-bound algorithms
118  if ( m_useIOBoundAlgScheduler ) {
121  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
122  }
123 
124  // Set the MaxEventsInFlight parameters from the number of WB stores
126 
127  // Set the number of free slots
129 
130  // set global concurrency flags
132 
133  // Get the list of algorithms
135  const unsigned int algsNumber = algos.size();
136  info() << "Found " << algsNumber << " algorithms" << endmsg;
137 
138  /* Dependencies
139  1) Look for handles in algo, if none
140  2) Assume none are required
141  */
142 
143  DataObjIDColl globalInp, globalOutp;
144 
145  // figure out all outputs
146  for (IAlgorithm* ialgoPtr : algos) {
147  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
148  if (!algoPtr) {
149  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
150  }
151  for (auto id : algoPtr->outputDataObjs()) {
152  auto r = globalOutp.insert(id);
153  if (!r.second) {
154  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
155  }
156  }
157  }
158 
159  std::ostringstream ostdd;
160  ostdd << "Data Dependencies for Algorithms:";
161 
162  std::vector<DataObjIDColl> m_algosDependencies;
163  for ( IAlgorithm* ialgoPtr : algos ) {
164  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
165  if ( nullptr == algoPtr ) {
166  fatal() << "Could not convert IAlgorithm into Algorithm for "
167  << ialgoPtr->name()
168  << ": this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171 
172  ostdd << "\n " << algoPtr->name();
173 
174  DataObjIDColl algoDependencies;
175  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
176  for ( const DataObjID* idp : sortedDataObjIDColl (algoPtr->inputDataObjs()) ) {
177  DataObjID id = *idp;
178  ostdd << "\n o INPUT " << id;
179  if (id.key().find(":")!=std::string::npos) {
180  ostdd << " contains alternatives which require resolution...\n";
181  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
182  auto itok = std::find_if( tokens.begin(), tokens.end(),
183  [&](const std::string& t) {
184  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
185  } );
186  if (itok!=tokens.end()) {
187  ostdd << "found matching output for " << *itok
188  << " -- updating scheduler info\n";
189  id.updateKey(*itok);
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name()
193  << endmsg;
194  m_showDataDeps = true;
195  }
196  }
197  algoDependencies.insert( id );
198  globalInp.insert( id );
199  }
200  for ( const DataObjID* id : sortedDataObjIDColl (algoPtr->outputDataObjs()) ) {
201  ostdd << "\n o OUTPUT " << *id;
202  if (id->key().find(":")!=std::string::npos) {
203  error() << " in Alg " << algoPtr->name()
204  << " alternatives are NOT allowed for outputs! id: "
205  << *id << endmsg;
206  m_showDataDeps = true;
207  }
208  }
209  } else {
210  ostdd << "\n none";
211  }
212  m_algosDependencies.emplace_back( algoDependencies );
213  }
214 
215  if ( m_showDataDeps ) {
216  info() << ostdd.str() << endmsg;
217  }
218 
219  // Fill the containers to convert algo names to index
220  m_algname_vect.reserve( algsNumber );
221  unsigned int index = 0;
222  IAlgorithm* dataLoaderAlg( nullptr );
223  for ( IAlgorithm* algo : algos ) {
224  const std::string& name = algo->name();
225  m_algname_index_map[name] = index;
227  if (algo->name() == m_useDataLoader) {
228  dataLoaderAlg = algo;
229  }
230  index++;
231  }
232 
233  // Check if we have unmet global input dependencies
234  if ( m_checkDeps ) {
235  DataObjIDColl unmetDep;
236  for ( auto o : globalInp ) {
237  if ( globalOutp.find( o ) == globalOutp.end() ) {
238  unmetDep.insert( o );
239  }
240  }
241 
242  if ( unmetDep.size() > 0 ) {
243 
244  std::ostringstream ost;
245  for ( const DataObjID* o : sortedDataObjIDColl (unmetDep) ) {
246  ost << "\n o " << *o << " required by Algorithm: ";
247  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
248  if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].end() ) {
249  ost << "\n * " << m_algname_vect[i];
250  }
251  }
252  }
253 
254  if ( m_useDataLoader != "" ) {
255  // Find the DataLoader Alg
256  if (dataLoaderAlg == nullptr) {
257  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
258  << "\" found, and unmet INPUT dependencies "
259  << "detected:\n" << ost.str() << endmsg;
260  return StatusCode::FAILURE;
261  }
262 
263  info() << "Will attribute the following unmet INPUT dependencies to \""
264  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
265  << "\" Algorithm"
266  << ost.str() << endmsg;
267 
268  // Set the property Load of DataLoader Alg
269  Algorithm *dataAlg = dynamic_cast<Algorithm*>(dataLoaderAlg);
270  if ( !dataAlg ) {
271  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value()
272  << "\" IAlg to Algorithm" << endmsg;
273  return StatusCode::FAILURE;
274  }
275 
276  for (auto& id : unmetDep) {
277  debug() << "adding OUTPUT dep \"" << id << "\" to "
278  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
279  << endmsg;
281  }
282 
283  } else {
284  fatal() << "Auto DataLoading not requested, "
285  << "and the following unmet INPUT dependencies were found:"
286  << ost.str() << endmsg;
287  return StatusCode::FAILURE;
288  }
289 
290  } else {
291  info() << "No unmet INPUT data dependencies were found" << endmsg;
292  }
293  }
294 
295  // prepare the control flow part
296  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
297  if ( !algPool ) {
298  fatal() << "Unable to dcast algResourcePool" << endmsg;
299  return StatusCode::FAILURE;
300  }
302  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
303 
304  // Shortcut for the message service
305  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
306  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
307 
308  m_eventSlots.assign( m_maxEventsInFlight,
309  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
310  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
311 
312  if (m_threadPoolSize > 1) {
314  }
315 
316  // Clearly inform about the level of concurrency
317  info() << "Concurrency level information:" << endmsg;
318  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
319  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
320 
321  m_efg = algPool->getPRGraph();
322 
323  if (m_showControlFlow) {
324  info() << std::endl
325  << "========== Algorithm and Sequence Configuration =========="
326  << std::endl << std::endl;
327  info() << m_efg->dumpControlFlow() << endmsg;
328  }
329 
330  if (m_showDataFlow) {
331  info() << std::endl
332  << "======================= Data Flow ========================"
333  << std::endl;
334  info() << m_efg->dumpDataFlow() << endmsg;
335  }
336 
337  // Simulating execution flow by only analyzing the graph topology and logic
338  if ( m_simulateExecution ) {
339  auto vis = concurrency::RunSimulator( m_eventSlots[0] );
341  }
342 
343  return sc;
344 }
Gaudi::Property< bool > m_showDataFlow
void simulateExecutionFlow(IGraphVisitor &visitor) const
StatusCode initialize() override
Definition: Service.cpp:64
T empty(T...args)
Gaudi::Property< std::string > m_whiteboardSvcName
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
Gaudi::Property< bool > m_showDataDeps
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
T endl(T...args)
StatusCode initialize(PrecedenceRulesGraph *graph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
const DataObjIDColl & inputDataObjs() const override
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
T bind(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
STL class.
Gaudi::Property< bool > m_simulateExecution
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Class representing the event slot.
Definition: EventSlot.h:11
std::vector< EventSlot > m_eventSlots
Vector of events slots.
concurrency::PrecedenceRulesGraph * m_efg
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
PrecedenceRulesGraph * getPrecedenceRulesGraph() const
Get the flow graph instance.
std::string dumpControlFlow() const
Print out control flow of Algorithms and Sequences.
T for_each(T...args)
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
T reserve(T...args)
T emplace_back(T...args)
std::thread m_thread
The thread in which the activate function runs.
StatusCode AvalancheSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 810 of file AvalancheSchedulerSvc.cpp.

810  {
811  // Get the slot
812  EventSlot& thisSlot = m_eventSlots[iSlot];
813 
814  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
816 
817  info() << "About to declare a stall" << endmsg;
818  fatal() << "*** Stall detected! ***\n" << endmsg;
819  dumpSchedulerState( iSlot );
820  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
821 
822  return StatusCode::FAILURE;
823  }
824  return StatusCode::SUCCESS;
825 }
bool algsPresent(State state) const
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
Class representing the event slot.
Definition: EventSlot.h:11
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 532 of file AvalancheSchedulerSvc.cpp.

532  {
533 
534  unsigned int slotNum = 0;
535  for ( auto& thisSlot : m_eventSlots ) {
536  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
537  updateStates( slotNum );
538  }
539  slotNum++;
540  }
541  return StatusCode::SUCCESS;
542 }
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 548 of file AvalancheSchedulerSvc.cpp.

548  {
549  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
550  if ( m_freeSlots.load() == (int) m_maxEventsInFlight or
551  m_isActive == INACTIVE ) {
552  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
553  // << " active: " << m_isActive << endmsg;
554  return StatusCode::FAILURE;
555  } else {
556  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
557  // << " active: " << m_isActive << endmsg;
558  m_finishedEvents.pop( eventContext );
559  m_freeSlots++;
560  if (msgLevel(MSG::DEBUG))
561  debug() << "Popped slot " << eventContext->slot() << "(event "
562  << eventContext->evt() << ")" << endmsg;
563  return StatusCode::SUCCESS;
564  }
565 }
ContextID_t slot() const
Definition: EventContext.h:40
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1075 of file AvalancheSchedulerSvc.cpp.

1076  {
1077  // Put back the instance
1078  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1079  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1080  // EventContext* eventContext = castedAlgo->getContext();
1081 
1082  // Check if the execution failed
1083  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1084  eventFailed(eventContext).ignore();
1085 
1086  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1087 
1088  if ( !sc.isSuccess() ) {
1089  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1090  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1091  return StatusCode::FAILURE;
1092  }
1093 
1095 
1096  EventSlot& thisSlot = m_eventSlots[si];
1097 
1098  if (msgLevel(MSG::DEBUG))
1099  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1100  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1101 
1102  // Schedule an update of the status of the algorithms
1103  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1104  m_actionsQueue.push( updateAction );
1105  m_updateNeeded = false;
1106 
1107  if (msgLevel(MSG::DEBUG))
1108  debug() << "[Asynchronous] Trying to handle execution result of "
1109  << index2algname(iAlgo) << " on slot " << si << endmsg;
1110  State state;
1111  if ( algo->filterPassed() ) {
1112  state = State::EVTACCEPTED;
1113  } else {
1114  state = State::EVTREJECTED;
1115  }
1116 
1117  sc = thisSlot.algsStates.updateState( iAlgo, state );
1118 
1119  if (sc.isSuccess())
1120  if (msgLevel(MSG::VERBOSE))
1121  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1122  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1123 
1124  return sc;
1125 }
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
ContextEvt_t evt() const
Definition: EventContext.h:39
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
std::vector< EventSlot > m_eventSlots
Vector of events slots.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 971 of file AvalancheSchedulerSvc.cpp.

971  {
972 
974 
975  // bool IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(algName)->isIOBound();
976 
977  const std::string& algName( index2algname( iAlgo ) );
978  IAlgorithm* ialgoPtr = nullptr;
979  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
980 
981  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
982  EventContext* eventContext( m_eventSlots[si].eventContext );
983  if ( !eventContext ) {
984  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
985  << endmsg;
986  return StatusCode::FAILURE;
987  }
988 
990  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
991  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
992  IOBoundAlgTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
993  m_IOBoundAlgScheduler->push(*theTask);
994 
995  if (msgLevel(MSG::DEBUG))
996  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
997  << eventContext->evt() << " in slot " << si
998  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
999 
1000  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
1001 
1002  if (updateSc.isSuccess())
1003  if (msgLevel(MSG::VERBOSE))
1004  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
1005  << " to SCHEDULED on slot " << si << endmsg;
1006  return updateSc;
1007  } else {
1008  if ( msgLevel( MSG::DEBUG ) )
1009  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
1010  << si << endmsg;
1011  return sc;
1012  }
1013 }
Wrapper around I/O-bound Gaudi-algorithms.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
STL class.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual StatusCode push(IAlgTask &task)=0
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode AvalancheSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 1019 of file AvalancheSchedulerSvc.cpp.

1020  {
1021  // Put back the instance
1022  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1023  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1024  // EventContext* eventContext = castedAlgo->getContext();
1025 
1026  // Check if the execution failed
1027  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1028  eventFailed(eventContext).ignore();
1029 
1030  Gaudi::Hive::setCurrentContext(eventContext);
1031  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1032 
1033  if ( !sc.isSuccess() ) {
1034  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1035  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1036  return StatusCode::FAILURE;
1037  }
1038 
1039  m_algosInFlight--;
1040 
1041  EventSlot& thisSlot = m_eventSlots[si];
1042 
1043  if ( msgLevel( MSG::DEBUG ) )
1044  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1045  << m_algosInFlight << endmsg;
1046 
1047  // Schedule an update of the status of the algorithms
1048  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1049  m_actionsQueue.push( updateAction );
1050  m_updateNeeded = false;
1051 
1052  if ( msgLevel( MSG::DEBUG ) )
1053  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1054  State state;
1055  if ( algo->filterPassed() ) {
1056  state = State::EVTACCEPTED;
1057  } else {
1058  state = State::EVTREJECTED;
1059  }
1060 
1061  sc = thisSlot.algsStates.updateState( iAlgo, state );
1062 
1063  if (sc.isSuccess())
1064  if (msgLevel(MSG::VERBOSE))
1065  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1067 
1068  return sc;
1069 }
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
ContextEvt_t evt() const
Definition: EventContext.h:39
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
GAUDI_API void setCurrentContext(const EventContext *ctx)
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
std::vector< EventSlot > m_eventSlots
Vector of events slots.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode AvalancheSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode AvalancheSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion.

Definition at line 905 of file AvalancheSchedulerSvc.cpp.

905  {
906 
908 
909  const std::string& algName( index2algname( iAlgo ) );
910  IAlgorithm* ialgoPtr = nullptr;
911  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
912 
913  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
914  EventContext* eventContext( m_eventSlots[si].eventContext );
915  if ( !eventContext ) {
916  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
917  return StatusCode::FAILURE;
918  }
919 
920  ++m_algosInFlight;
921  auto promote2ExecutedClosure = std::bind(&AvalancheSchedulerSvc::promoteToExecuted,
922  this,
923  iAlgo,
924  eventContext->slot(),
925  ialgoPtr,
926  eventContext);
927  // Avoid to use tbb if the pool size is 1 and run in this thread
928  if (-100 != m_threadPoolSize) {
929 
930  // this parent task is needed to promote an Algorithm as EXECUTED,
931  // it will be started as soon as the child task (see below) is completed
932  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
933  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
934  // setting parent's refcount to 1 is made here only for consistency
935  // (in this case since it is not scheduled explicitly and there it has only one child task)
936  triggerAlgoStateUpdate->set_ref_count(1);
937  // the child task that executes an Algorithm
938  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
939  AlgoExecutionTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
940  // schedule the algoTask
941  tbb::task::enqueue( *algoTask);
942 
943  } else {
944  AlgoExecutionTask theTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
945  theTask.execute();
946  promote2ExecutedClosure();
947  }
948 
949  if ( msgLevel( MSG::DEBUG ) )
950  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
951  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
952 
953  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
954 
955  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
956 
957  if (updateSc.isSuccess())
958  if (msgLevel(MSG::VERBOSE))
959  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
960  << si << endmsg;
961  return updateSc;
962  } else {
963  if ( msgLevel( MSG::DEBUG ) )
964  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
965  return sc;
966  }
967 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
Gaudi::Property< int > m_threadPoolSize
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 464 of file AvalancheSchedulerSvc.cpp.

464  {
465 
466  if ( m_first ) {
467  m_first = false;
468  }
469 
470  if ( !eventContext ) {
471  fatal() << "Event context is nullptr" << endmsg;
472  return StatusCode::FAILURE;
473  }
474 
475  if ( m_freeSlots.load() == 0 ) {
476  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
477  return StatusCode::FAILURE;
478  }
479 
480  // no problem as push new event is only called from one thread (event loop manager)
481  m_freeSlots--;
482 
483  auto action = [this, eventContext]() -> StatusCode {
484  // Event processing slot forced to be the same as the wb slot
485  const unsigned int thisSlotNum = eventContext->slot();
486  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
487  if ( !thisSlot.complete ) {
488  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
489  return StatusCode::FAILURE;
490  }
491 
492  debug() << "Executing event " << eventContext->evt() << " on slot "
493  << thisSlotNum << endmsg;
494  thisSlot.reset( eventContext );
495 
496  // promote to CR and DR the initial set of algorithms
497  auto vis = concurrency::Supervisor( m_eventSlots[thisSlotNum] );
499 
500  return this->updateStates( thisSlotNum );
501  }; // end of lambda
502 
503  // Kick off the scheduling!
504  if ( msgLevel( MSG::VERBOSE ) ) {
505  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
506  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
507  }
508  m_actionsQueue.push( action );
509 
510  return StatusCode::SUCCESS;
511 }
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:11
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 514 of file AvalancheSchedulerSvc.cpp.

514  {
515  StatusCode sc;
516  for ( auto context : eventContexts ) {
517  sc = pushNewEvent( context );
518  if ( sc != StatusCode::SUCCESS ) return sc;
519  }
520  return sc;
521 }
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 571 of file AvalancheSchedulerSvc.cpp.

571  {
572  if ( m_finishedEvents.try_pop( eventContext ) ) {
573  if ( msgLevel( MSG::DEBUG ) )
574  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
575  << endmsg;
576  m_freeSlots++;
577  return StatusCode::SUCCESS;
578  }
579  return StatusCode::FAILURE;
580 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode AvalancheSchedulerSvc::updateStates ( int  si = -1,
const std::string algo_name = std::string() 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 634 of file AvalancheSchedulerSvc.cpp.

634  {
635 
636  m_updateNeeded = true;
637 
638  StatusCode global_sc( StatusCode::FAILURE, true );
639 
640  // Sort from the oldest to the newest event
641  // Prepare a vector of pointers to the slots to avoid copies
642  std::vector<EventSlot*> eventSlotsPtrs;
643 
644  // Consider all slots if si <0 or just one otherwise
645  if ( si < 0 ) {
646  const int eventsSlotsSize( m_eventSlots.size() );
647  eventSlotsPtrs.reserve( eventsSlotsSize );
648  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
649  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
650  }
651  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
652  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
653  } else {
654  eventSlotsPtrs.push_back( &m_eventSlots[si] );
655  }
656 
657  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
658  int iSlot = thisSlotPtr->eventContext->slot();
659 
660  // Cache the states of the algos to improve readability and performance
661  auto& thisSlot = m_eventSlots[iSlot];
662  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
663 
664  // Take care of the control ready update
665  if ( !algo_name.empty() )
666  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
667 
668  StatusCode partial_sc( StatusCode::FAILURE, true );
669  // first update CONTROLREADY to DATAREADY
670 
671  // now update DATAREADY to SCHEDULED
672  if ( !m_optimizationMode.empty() ) {
673  auto comp_nodes = [this]( const uint& i, const uint& j ) {
676  };
678  comp_nodes, std::vector<uint>() );
679  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
680  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
681  buffer.push( *it );
682  /*std::stringstream s;
683  auto buffer2 = buffer;
684  while (!buffer2.empty()) {
685  s << m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
686  buffer2.pop();
687  }
688  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
689 
690  /*while (!buffer.empty()) {
691  partial_sc = promoteToScheduled(buffer.top(), iSlot);
692  if (partial_sc.isFailure()) {
693  if (msgLevel(MSG::VERBOSE))
694  verbose() << "Could not apply transition from "
695  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
696  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
697  if (m_useIOBoundAlgScheduler) {
698  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
699  if (msgLevel(MSG::VERBOSE))
700  if (partial_sc.isFailure())
701  verbose() << "[Asynchronous] Could not apply transition from "
702  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
703  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
704  }
705  }
706  buffer.pop();
707  }*/
708  while ( !buffer.empty() ) {
709  bool IOBound = false;
712 
713  if ( !IOBound )
714  partial_sc = promoteToScheduled( buffer.top(), iSlot );
715  else
716  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
717 
718  if (msgLevel(MSG::VERBOSE))
719  if (partial_sc.isFailure())
720  verbose() << "Could not apply transition from "
721  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
722  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
723 
724  buffer.pop();
725  }
726 
727  } else {
728  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
729  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
730  uint algIndex = *it;
731 
732  bool IOBound = false;
735 
736  if ( !IOBound )
737  partial_sc = promoteToScheduled( algIndex, iSlot );
738  else
739  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
740 
741  if (msgLevel(MSG::VERBOSE))
742  if (partial_sc.isFailure())
743  verbose() << "Could not apply transition from "
744  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
745  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
746  }
747  }
748 
750  auto now = std::chrono::system_clock::now();
752  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
753  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
754  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
756  << "\n";
758  : std::to_string(tbb::task_scheduler_init::default_num_threads());
759  std::ofstream myfile;
760  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
761  myfile << s.str();
762  myfile.close();
763  }
764 
765  // Not complete because this would mean that the slot is already free!
766  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
767  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
768  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
769  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
770 
771  thisSlot.complete = true;
772  // if the event did not fail, add it to the finished events
773  // otherwise it is taken care of in the error handling already
774  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
775  m_finishedEvents.push(thisSlot.eventContext);
776  if (msgLevel(MSG::DEBUG))
777  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
778  << thisSlot.eventContext->slot() << ")." << endmsg;
779  }
780  // now let's return the fully evaluated result of the control flow
781  if ( msgLevel( MSG::DEBUG ) ) {
783  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
784  debug() << ss.str() << endmsg;
785  }
786 
787  thisSlot.eventContext = nullptr;
788  } else {
789  StatusCode eventStalledSC = isStalled(iSlot);
790  if (! eventStalledSC.isSuccess()) {
791  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
792  eventFailed(thisSlot.eventContext).ignore();
793  }
794  }
795  } // end loop on slots
796 
797  verbose() << "States Updated." << endmsg;
798 
799  return global_sc;
800 }
T empty(T...args)
const std::chrono::system_clock::time_point getInitTime() const
T open(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
ContextID_t slot() const
Definition: EventContext.h:40
Gaudi::Property< bool > m_dumpIntraEventDynamics
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T duration_cast(T...args)
T end(T...args)
Gaudi::Property< std::string > m_optimizationMode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
bool isIOBound() const
Check if algorithm is I/O-bound.
ContextEvt_t evt() const
Definition: EventContext.h:39
Gaudi::Property< bool > m_useIOBoundAlgScheduler
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T...args)
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Gaudi::Property< int > m_threadPoolSize
bool m_updateNeeded
Keep track of update actions scheduled.
T count(T...args)
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T...args)
void ignore() const
Definition: StatusCode.h:106
PrecedenceRulesGraph * getPrecedenceRulesGraph() const
Get the flow graph instance.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 251 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 210 of file AvalancheSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 183 of file AvalancheSchedulerSvc.h.

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 189 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 215 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 243 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps
private
Initial value:
{this, "CheckDependencies", false,
"Runtime check of Algorithm Data Dependencies"}

Definition at line 150 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 145 of file AvalancheSchedulerSvc.h.

concurrency::PrecedenceRulesGraph* AvalancheSchedulerSvc::m_efg
private

Definition at line 318 of file AvalancheSchedulerSvc.h.

concurrency::ExecutionFlowManager AvalancheSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 271 of file AvalancheSchedulerSvc.h.

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 198 of file AvalancheSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 204 of file AvalancheSchedulerSvc.h.

bool AvalancheSchedulerSvc::m_first = true
private

Definition at line 277 of file AvalancheSchedulerSvc.h.

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 201 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 218 of file AvalancheSchedulerSvc.h.

SmartIF<IAccelerator> AvalancheSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 195 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_IOBoundAlgSchedulerSvcName
private
Initial value:
{this, "IOBoundAlgSchedulerSvc",
"IOBoundAlgSchedulerSvc"}

Definition at line 137 of file AvalancheSchedulerSvc.h.

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 174 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 276 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 275 of file AvalancheSchedulerSvc.h.

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 139 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 143 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 162 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 156 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 159 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 141 of file AvalancheSchedulerSvc.h.

std::mutex AvalancheSchedulerSvc::m_ssMut
staticprivate

Definition at line 309 of file AvalancheSchedulerSvc.h.

std::list< AvalancheSchedulerSvc::SchedulerState > AvalancheSchedulerSvc::m_sState
staticprivate

Definition at line 308 of file AvalancheSchedulerSvc.h.

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 177 of file AvalancheSchedulerSvc.h.

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 133 of file AvalancheSchedulerSvc.h.

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 274 of file AvalancheSchedulerSvc.h.

bool AvalancheSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 239 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 153 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 147 of file AvalancheSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 192 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName
private
Initial value:
{this, "WhiteboardSvc", "EventDataSvc",
"The whiteboard name"}

Definition at line 135 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: