The Gaudi Framework  v29r0 (ff2e7097)
ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

struct  enqueueSchedulerActionTask
 
class  SchedulerState
 

Public Member Functions

 ~ForwardSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleBase &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleArrayBase &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandleBase &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1)
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_maxEventsInFlight
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< unsigned int > m_maxAlgosInFlight
 
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::recursive_CF::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first = true
 
concurrency::recursive_CF::ControlFlowGraphm_efg
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms.

This is a forward scheduler: an algorithm is scheduled for execution as soon as its data dependencies are available in the whiteboard, and as soon as a worker thread becomes available. The scheduler has no means for automatic intra-event throughput maximization. Consider the AvalancheScheduler if you need those.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 83 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

ForwardSchedulerSvc::~ForwardSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 342 of file ForwardSchedulerSvc.cpp.

343 {
344 
345  if ( msgLevel( MSG::DEBUG ) ) debug() << "ForwardSchedulerSvc::activate()" << endmsg;
346 
348  error() << "problems initializing ThreadPoolSvc" << endmsg;
350  return;
351  }
352 
353  // Wait for actions pushed into the queue by finishing tasks.
354  action thisAction;
356 
357  m_isActive = ACTIVE;
358 
359  // Continue to wait if the scheduler is running or there is something to do
360  info() << "Start checking the actionsQueue" << endmsg;
361  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
362  m_actionsQueue.pop( thisAction );
363  sc = thisAction();
364  if ( sc != StatusCode::SUCCESS )
365  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
366  else
367  verbose() << "Action succeeded." << endmsg;
368  }
369 
370  info() << "Terminating thread-pool resources" << endmsg;
372  error() << "Problems terminating thread pool" << endmsg;
374  }
375 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 989 of file ForwardSchedulerSvc.cpp.

990 {
991 
993  m_sState.push_back( SchedulerState( a, e, t ) );
994 }
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 410 of file ForwardSchedulerSvc.cpp.

411 {
412  unsigned int index = m_algname_index_map[algoname];
413  return index;
414 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 385 of file ForwardSchedulerSvc.cpp.

386 {
387 
388  if ( m_isActive == ACTIVE ) {
389  // Drain the scheduler
391  // This would be the last action
392  m_actionsQueue.push( [this]() -> StatusCode {
394  return StatusCode::SUCCESS;
395  } );
396  }
397 
398  return StatusCode::SUCCESS;
399 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 997 of file ForwardSchedulerSvc.cpp.

998 {
999 
1001 
1002  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1003  if ( *itr == a ) {
1004  m_sState.erase( itr );
1005  return true;
1006  }
1007  }
1008 
1009  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1010  return false;
1011 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
T lock(T...args)
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 756 of file ForwardSchedulerSvc.cpp.

757 {
758 
759  // To have just one big message
760  std::ostringstream outputMessageStream;
761 
762  outputMessageStream << "============================== Execution Task State ============================="
763  << std::endl;
764  dumpState( outputMessageStream );
765 
766  outputMessageStream << std::endl
767  << "============================== Scheduler State ================================="
768  << std::endl;
769 
770  int slotCount = -1;
771  for ( auto thisSlot : m_eventSlots ) {
772  slotCount++;
773  if ( thisSlot.complete ) continue;
774 
775  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
776  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
777 
778  if ( 0 > iSlot or iSlot == slotCount ) {
779  outputMessageStream << "Algorithms states:" << std::endl;
780 
781  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
782  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
783  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
784  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
785  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
786  const int depsSize = deps.size();
787  if ( depsSize == 0 ) outputMessageStream << " none";
788 
789  DataObjIDColl missing;
790  for ( auto d : deps ) {
791  outputMessageStream << d << " ";
792  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
793  // outputMessageStream << "[missing] ";
794  missing.insert( d );
795  }
796  }
797 
798  if ( !missing.empty() ) {
799  outputMessageStream << ". The following are missing: ";
800  for ( auto d : missing ) {
801  outputMessageStream << d << " ";
802  }
803  }
804 
805  outputMessageStream << std::endl;
806  }
807 
808  // Snapshot of the WhiteBoard
809  outputMessageStream << "\nWhiteboard contents: " << std::endl;
810  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
811 
812  // Snapshot of the ControlFlow
813  outputMessageStream << "\nControl Flow:" << std::endl;
814  std::stringstream cFlowStateStringStream;
815  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
816 
817  outputMessageStream << cFlowStateStringStream.str() << std::endl;
818  }
819  }
820 
821  outputMessageStream << "=================================== END ======================================" << std::endl;
822 
823  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
824 }
T empty(T...args)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
concurrency::recursive_CF::ExecutionFlowManager m_efManager
Member to take care of the control flow.
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )
override

Definition at line 1025 of file ForwardSchedulerSvc.cpp.

1026 {
1027 
1029 
1030  std::ostringstream ost;
1031  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1032  dumpState( ost );
1033 
1034  info() << ost.str() << endmsg;
1035 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1014 of file ForwardSchedulerSvc.cpp.

1015 {
1016 
1018 
1019  for ( auto it : m_sState ) {
1020  ost << " " << it << std::endl;
1021  }
1022 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 543 of file ForwardSchedulerSvc.cpp.

544 {
545 
546  // Set the number of slots available to an error code
547  m_freeSlots.store( 0 );
548 
549  fatal() << "*** Event " << eventContext->evt() << " on slot " << eventContext->slot() << " failed! ***" << endmsg;
550 
551  std::ostringstream ost;
552  m_algExecStateSvc->dump( ost, *eventContext );
553 
554  info() << "Dumping Alg Exec State for slot " << eventContext->slot() << ":\n" << ost.str() << endmsg;
555 
556  dumpSchedulerState( -1 );
557 
558  // Empty queue and deactivate the service
559  action thisAction;
560  while ( m_actionsQueue.try_pop( thisAction ) ) {
561  };
562  deactivate();
563 
564  // Push into the finished events queue the failed context
565  EventContext* thisEvtContext;
566  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
567  m_finishedEvents.push( thisEvtContext );
568  };
569  m_finishedEvents.push( eventContext );
570 
571  return StatusCode::FAILURE;
572 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 311 of file ForwardSchedulerSvc.cpp.

312 {
313 
315  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
316 
317  sc = deactivate();
318  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
319 
320  info() << "Joining Scheduler thread" << endmsg;
321  m_thread.join();
322 
323  // Final error check after thread pool termination
324  if ( m_isActive == FAILURE ) {
325  error() << "problems in scheduler thread" << endmsg;
326  return StatusCode::FAILURE;
327  }
328 
329  return sc;
330 }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
unsigned int ForwardSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 481 of file ForwardSchedulerSvc.cpp.

481 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 406 of file ForwardSchedulerSvc.cpp.

406 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 44 of file ForwardSchedulerSvc.cpp.

45 {
46 
47  // Initialise mother class (read properties, ...)
49  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
50 
51  // Get hold of the TBBSvc. This should initialize the thread pool
52  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
53  if ( !m_threadPoolSvc.isValid() ) {
54  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
55  return StatusCode::FAILURE;
56  }
57 
58  // Activate the scheduler in another thread.
59  info() << "Activating scheduler in a separate thread" << endmsg;
61 
62  while ( m_isActive != ACTIVE ) {
63  if ( m_isActive == FAILURE ) {
64  fatal() << "Terminating initialization" << endmsg;
65  return StatusCode::FAILURE;
66  } else {
67  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
68  sleep( 1 );
69  }
70  }
71 
72  // Get the algo resource pool
73  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
74  if ( !m_algResourcePool.isValid() ) {
75  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
76  return StatusCode::FAILURE;
77  }
78 
79  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
80  if ( !m_algExecStateSvc.isValid() ) {
81  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
82  return StatusCode::FAILURE;
83  }
84 
85  // Get Whiteboard
87  if ( !m_whiteboard.isValid() ) {
88  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
89  return StatusCode::FAILURE;
90  }
91 
92  // Check the MaxEventsInFlight parameters and react
93  // Deprecated for the moment
94  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
95  if ( m_maxEventsInFlight != 0 ) {
96  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
97  << "Please migrate your code options files." << endmsg;
98 
99  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
100  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
101  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
102  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
103  }
104  }
105 
106  // set global concurrency flags
108 
109  // Align the two quantities
110  m_maxEventsInFlight = numberOfWBSlots;
111 
112  // Set the number of free slots
114 
115  if ( m_algosDependencies.size() != 0 ) {
116  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
117  << " FIX your job options #####" << endmsg;
118  }
119 
120  // Get the list of algorithms
122  const unsigned int algsNumber = algos.size();
123  info() << "Found " << algsNumber << " algorithms" << endmsg;
124 
125  /* Dependencies
126  1) Look for handles in algo, if none
127  2) Assume none are required
128  */
129 
130  DataObjIDColl globalInp, globalOutp;
131 
132  // figure out all outputs
133  for ( IAlgorithm* ialgoPtr : algos ) {
134  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
135  if ( !algoPtr ) {
136  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
137  }
138  for ( auto id : algoPtr->outputDataObjs() ) {
139  auto r = globalOutp.insert( id );
140  if ( !r.second ) {
141  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
142  "though, or control flow may guarantee only one runs...!"
143  << endmsg;
144  }
145  }
146  }
147 
148  std::ostringstream ostdd;
149  ostdd << "Data Dependencies for Algorithms:";
150 
152  for ( IAlgorithm* ialgoPtr : algos ) {
153  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
154  if ( nullptr == algoPtr ) {
155  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
156  << ": this will result in a crash." << endmsg;
157  return StatusCode::FAILURE;
158  }
159 
160  ostdd << "\n " << algoPtr->name();
161 
162  DataObjIDColl algoDependencies;
163  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
164  for ( auto id : algoPtr->inputDataObjs() ) {
165  ostdd << "\n o INPUT " << id;
166  if ( id.key().find( ":" ) != std::string::npos ) {
167  ostdd << " contains alternatives which require resolution...\n";
168  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
169  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
170  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
171  } );
172  if ( itok != tokens.end() ) {
173  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
174  id.updateKey( *itok );
175  } else {
176  error() << "failed to find alternate in global output list"
177  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
178  m_showDataDeps = true;
179  }
180  }
181  algoDependencies.insert( id );
182  globalInp.insert( id );
183  }
184  for ( auto id : algoPtr->outputDataObjs() ) {
185  ostdd << "\n o OUTPUT " << id;
186  if ( id.key().find( ":" ) != std::string::npos ) {
187  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << id << endmsg;
188  m_showDataDeps = true;
189  }
190  }
191  } else {
192  ostdd << "\n none";
193  }
194  m_algosDependencies.emplace_back( algoDependencies );
195  }
196 
197  if ( m_showDataDeps ) {
198  info() << ostdd.str() << endmsg;
199  }
200 
201  // Fill the containers to convert algo names to index
202  m_algname_vect.reserve( algsNumber );
203  unsigned int index = 0;
204  IAlgorithm* dataLoaderAlg( nullptr );
205  for ( IAlgorithm* algo : algos ) {
206  const std::string& name = algo->name();
207  m_algname_index_map[name] = index;
209  if ( algo->name() == m_useDataLoader ) {
210  dataLoaderAlg = algo;
211  }
212  index++;
213  }
214 
215  // Check if we have unmet global input dependencies
216  if ( m_checkDeps ) {
217  DataObjIDColl unmetDep;
218  for ( auto o : globalInp ) {
219  if ( globalOutp.find( o ) == globalOutp.end() ) {
220  unmetDep.insert( o );
221  }
222  }
223 
224  if ( unmetDep.size() > 0 ) {
225 
226  std::ostringstream ost;
227  for ( auto& o : unmetDep ) {
228  ost << "\n o " << o << " required by Algorithm: ";
229  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
230  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
231  ost << "\n * " << m_algname_vect[i];
232  }
233  }
234  }
235 
236  if ( m_useDataLoader != "" ) {
237  // Find the DataLoader Alg
238  if ( dataLoaderAlg == nullptr ) {
239  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
240  << "\" found, and unmet INPUT dependencies "
241  << "detected:\n"
242  << ost.str() << endmsg;
243  return StatusCode::FAILURE;
244  }
245 
246  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
247  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
248 
249  // Set the property Load of DataLoader Alg
250  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
251  if ( !dataAlg ) {
252  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  for ( auto& id : unmetDep ) {
257  debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
258  << endmsg;
260  }
261 
262  } else {
263  fatal() << "Auto DataLoading not requested, "
264  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
265  return StatusCode::FAILURE;
266  }
267 
268  } else {
269  info() << "No unmet INPUT data dependencies were found" << endmsg;
270  }
271  }
272 
273  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
275  unsigned int controlFlowNodeNumber = m_efManager.getCFGraph()->getControlFlowNodeCounter();
276 
277  // Shortcut for the message service
278  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
279  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
280 
282  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
283  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
284 
285  // Clearly inform about the level of concurrency
286  info() << "Concurrency level information:" << endmsg;
287  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
288  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
289  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
290 
291  m_efg = algPool->getCFGraph();
292 
293  if ( m_showControlFlow ) {
294  info() << std::endl << "========== Algorithm and Sequence Configuration ==========" << std::endl << std::endl;
295  info() << m_efg->dumpControlFlow() << endmsg;
296  }
297 
298  if ( m_showDataFlow ) {
299  warning() << "A 1-level data flow dump requested, but this feature is not supported"
300  << " by the ForwardScheduler any more. Use the AvalancheScheduler"
301  << " to dump as 1-level data flow, so the complete data flow graph." << endmsg;
302  }
303 
304  return sc;
305 }
Gaudi::Property< bool > m_showControlFlow
StatusCode initialize() override
Definition: Service.cpp:64
T empty(T...args)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
concurrency::recursive_CF::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const DataObjIDColl & outputDataObjs() const override
std::string dumpControlFlow() const
Print out control flow of Algorithms and Sequences.
T endl(T...args)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< bool > m_checkDeps
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:79
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_showDataFlow
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
const DataObjIDColl & inputDataObjs() const override
Gaudi::Property< std::string > m_whiteboardSvcName
T bind(T...args)
ControlFlowGraph * getCFGraph() const
Get the flow graph instance.
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
concurrency::recursive_CF::ControlFlowGraph * getCFGraph() const
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
concurrency::recursive_CF::ControlFlowGraph * m_efg
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
T assign(T...args)
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void initialize(ControlFlowGraph *graph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
Class representing the event slot.
Definition: EventSlot.h:11
Gaudi::Property< bool > m_showDataDeps
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
T for_each(T...args)
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 731 of file ForwardSchedulerSvc.cpp.

732 {
733  // Get the slot
734  EventSlot& thisSlot = m_eventSlots[iSlot];
735 
736  if ( m_actionsQueue.empty() && m_algosInFlight == 0 &&
738 
739  info() << "About to declare a stall" << endmsg;
740  fatal() << "*** Stall detected! ***\n" << endmsg;
741  dumpSchedulerState( iSlot );
742  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
743 
744  return StatusCode::FAILURE;
745  }
746  return StatusCode::SUCCESS;
747 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 487 of file ForwardSchedulerSvc.cpp.

488 {
489  unsigned int slotNum = 0;
490  for ( auto& thisSlot : m_eventSlots ) {
491  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
492  updateStates( slotNum );
493  }
494  slotNum++;
495  }
496  return StatusCode::SUCCESS;
497 }
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 503 of file ForwardSchedulerSvc.cpp.

504 {
505  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
506  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
507  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
508  // << " active: " << m_isActive << endmsg;
509  return StatusCode::FAILURE;
510  } else {
511  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
512  // << " active: " << m_isActive << endmsg;
513  m_finishedEvents.pop( eventContext );
514  m_freeSlots++;
515  if ( msgLevel( MSG::DEBUG ) )
516  debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
517  return StatusCode::SUCCESS;
518  }
519 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Gaudi::Property< int > m_maxEventsInFlight
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 828 of file ForwardSchedulerSvc.cpp.

829 {
830 
831  // Do the control flow
832  StatusCode sc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::CONTROLREADY );
833  if ( sc.isSuccess() )
834  if ( msgLevel( MSG::VERBOSE ) )
835  verbose() << "Promoting " << index2algname( iAlgo ) << " to CONTROLREADY on slot " << si << endmsg;
836 
837  return sc;
838 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 842 of file ForwardSchedulerSvc.cpp.

843 {
844 
845  StatusCode sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
846 
847  StatusCode updateSc( StatusCode::FAILURE );
848  if ( sc == StatusCode::SUCCESS )
849  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
850 
851  if ( updateSc.isSuccess() )
852  if ( msgLevel( MSG::VERBOSE ) )
853  verbose() << "Promoting " << index2algname( iAlgo ) << " to DATAREADY on slot " << si << endmsg;
854 
855  return updateSc;
856 }
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

Definition at line 921 of file ForwardSchedulerSvc.cpp.

923 {
924 
925  // Put back the instance
926  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
927  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
928  // EventContext* eventContext = castedAlgo->getContext();
929 
930  // Check if the execution failed
931  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
932 
933  Gaudi::Hive::setCurrentContext( eventContext );
934  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
935 
936  if ( !sc.isSuccess() ) {
937  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
938  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
939  return StatusCode::FAILURE;
940  }
941 
942  m_algosInFlight--;
943 
944  EventSlot& thisSlot = m_eventSlots[si];
945 
946  // Update the catalog: some new products may be there
947  m_whiteboard->selectStore( eventContext->slot() ).ignore();
948 
949  // update prods in the dataflow
950  // DP: Handles could be used. Just update what the algo wrote
951  DataObjIDColl new_products;
952  m_whiteboard->getNewDataObjects( new_products ).ignore();
953  for ( const auto& new_product : new_products )
954  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
955  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
956 
957  if ( msgLevel( MSG::DEBUG ) )
958  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
959  << m_algosInFlight << endmsg;
960 
961  // Limit number of updates
962  if ( m_updateNeeded ) {
963  // Schedule an update of the status of the algorithms
964  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1 );
965  m_actionsQueue.push( updateAction );
966  m_updateNeeded = false;
967  }
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
971  State state;
972  if ( algo->filterPassed() ) {
973  state = State::EVTACCEPTED;
974  } else {
975  state = State::EVTREJECTED;
976  }
977 
978  sc = thisSlot.algsStates.updateState( iAlgo, state );
979 
980  if ( sc.isSuccess() )
981  if ( msgLevel( MSG::VERBOSE ) )
982  verbose() << "Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
984 
985  return sc;
986 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:39
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
T bind(T...args)
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:109
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 860 of file ForwardSchedulerSvc.cpp.

861 {
862 
864 
865  const std::string& algName( index2algname( iAlgo ) );
866  IAlgorithm* ialgoPtr = nullptr;
867  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
868 
869  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
870  EventContext* eventContext( m_eventSlots[si].eventContext );
871  if ( !eventContext )
872  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
873 
874  ++m_algosInFlight;
875  // prepare a scheduler action to run once the algorithm is executed
876  auto promote2ExecutedClosure =
877  std::bind( &ForwardSchedulerSvc::promoteToExecuted, this, iAlgo, eventContext->slot(), ialgoPtr, eventContext );
878  // Avoid to use tbb if the pool size is 1 and run in this thread
879  if ( -100 != m_threadPoolSize ) {
880 
881  // this parent task is needed to promote an Algorithm as EXECUTED,
882  // it will be started as soon as the child task (see below) is completed
883  tbb::task* triggerAlgoStateUpdate =
884  new ( tbb::task::allocate_root() ) enqueueSchedulerActionTask( this, promote2ExecutedClosure );
885  // setting parent's refcount to 1 is made here only for consistency
886  // (in this case since it is not scheduled explicitly and there it has only one child task)
887  triggerAlgoStateUpdate->set_ref_count( 1 );
888  // the child task that executes an Algorithm
889  tbb::task* algoTask = new ( triggerAlgoStateUpdate->allocate_child() )
890  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
891  // schedule the algoTask
892  tbb::task::enqueue( *algoTask );
893 
894  } else {
895  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
896  theTask.execute();
897  promote2ExecutedClosure();
898  }
899 
900  if ( msgLevel( MSG::DEBUG ) )
901  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
902  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
903 
904  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
905 
906  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
907 
908  if ( updateSc.isSuccess() )
909  if ( msgLevel( MSG::VERBOSE ) )
910  verbose() << "Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
911  return updateSc;
912  } else {
913  if ( msgLevel( MSG::DEBUG ) )
914  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
915  return sc;
916  }
917 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
T bind(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 424 of file ForwardSchedulerSvc.cpp.

425 {
426 
427  if ( m_first ) {
428  m_first = false;
429  }
430 
431  if ( !eventContext ) {
432  fatal() << "Event context is nullptr" << endmsg;
433  return StatusCode::FAILURE;
434  }
435 
436  if ( m_freeSlots.load() == 0 ) {
437  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
438  return StatusCode::FAILURE;
439  }
440 
441  // no problem as push new event is only called from one thread (event loop manager)
442  m_freeSlots--;
443 
444  auto action = [this, eventContext]() -> StatusCode {
445  // Event processing slot forced to be the same as the wb slot
446  const unsigned int thisSlotNum = eventContext->slot();
447  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
448  if ( !thisSlot.complete ) {
449  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
450  return StatusCode::FAILURE;
451  }
452 
453  debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
454  thisSlot.reset( eventContext );
455 
456  return this->updateStates( thisSlotNum );
457  }; // end of lambda
458 
459  // Kick off the scheduling!
460  if ( msgLevel( MSG::VERBOSE ) ) {
461  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
462  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
463  }
464  m_actionsQueue.push( action );
465 
466  return StatusCode::SUCCESS;
467 }
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:25
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 470 of file ForwardSchedulerSvc.cpp.

471 {
472  StatusCode sc;
473  for ( auto context : eventContexts ) {
474  sc = pushNewEvent( context );
475  if ( sc != StatusCode::SUCCESS ) return sc;
476  }
477  return sc;
478 }
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 525 of file ForwardSchedulerSvc.cpp.

526 {
527  if ( m_finishedEvents.try_pop( eventContext ) ) {
528  if ( msgLevel( MSG::DEBUG ) )
529  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
530  << endmsg;
531  m_freeSlots++;
532  return StatusCode::SUCCESS;
533  }
534  return StatusCode::FAILURE;
535 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 588 of file ForwardSchedulerSvc.cpp.

589 {
590 
591  m_updateNeeded = true;
592 
593  // Fill a map of initial state / action using closures.
594  // done to update the states w/o several if/elses
595  // Posterchild for constexpr with gcc4.7 onwards!
596  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
597  statesTransitions = {
598  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
599  this,
600  std::placeholders::_1,
601  std::placeholders::_2)},
602  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
603  this,
604  std::placeholders::_1,
605  std::placeholders::_2)}
606  };*/
607 
608  StatusCode global_sc( StatusCode::FAILURE, true );
609 
610  // Sort from the oldest to the newest event
611  // Prepare a vector of pointers to the slots to avoid copies
612  std::vector<EventSlot*> eventSlotsPtrs;
613 
614  // Consider all slots if si <0 or just one otherwise
615  if ( si < 0 ) {
616  const int eventsSlotsSize( m_eventSlots.size() );
617  eventSlotsPtrs.reserve( eventsSlotsSize );
618  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
619  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
620  }
621  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
622  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
623  } else {
624  eventSlotsPtrs.push_back( &m_eventSlots[si] );
625  }
626 
627  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
628  int iSlot = thisSlotPtr->eventContext->slot();
629 
630  // Cache the states of the algos to improve readability and performance
631  auto& thisSlot = m_eventSlots[iSlot];
632  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
633 
634  // Take care of the control ready update
635  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
636 
637  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
638  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
639  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
640  if (algState==AlgsExecutionStates::ERROR)
641  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
642  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
643  partial_sc=StatusCode::SUCCESS;
644  for (auto state_transition = statesTransitions.find(algState);
645  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
646  state_transition++){
647  partial_sc = state_transition->second(iAlgo,iSlot);
648  if (partial_sc.isFailure()){
649  verbose() << "Could not apply transition from "
650  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
651  << " for algorithm " << index2algname(iAlgo)
652  << " on processing slot " << iSlot << endmsg;
653  }
654  else{global_sc=partial_sc;}
655  } // end loop on transitions
656  }*/ // end loop on algos
657 
658  StatusCode partial_sc( StatusCode::FAILURE, true );
659  // first update CONTROLREADY to DATAREADY
660  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
661  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
662 
663  uint algIndex = *it;
664  partial_sc = promoteToDataReady( algIndex, iSlot );
665  if ( partial_sc.isFailure() )
666  if ( msgLevel( MSG::VERBOSE ) )
667  verbose() << "Could not apply transition from "
668  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY] << " for algorithm "
669  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
670  }
671 
672  // now update DATAREADY to SCHEDULED
673  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
674  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
675  uint algIndex = *it;
676 
677  partial_sc = promoteToScheduled( algIndex, iSlot );
678 
679  if ( msgLevel( MSG::VERBOSE ) )
680  if ( partial_sc.isFailure() )
681  verbose() << "Could not apply transition from "
682  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
683  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
684  }
685 
686  // Not complete because this would mean that the slot is already free!
687  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
688  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
689  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
690  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling already
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  m_finishedEvents.push( thisSlot.eventContext );
697  if ( msgLevel( MSG::DEBUG ) )
698  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot " << thisSlot.eventContext->slot()
699  << ")." << endmsg;
700  }
701  // now let's return the fully evaluated result of the control flow
702  if ( msgLevel( MSG::DEBUG ) ) {
704  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
705  debug() << ss.str() << endmsg;
706  }
707 
708  thisSlot.eventContext = nullptr;
709  } else {
710  StatusCode eventStalledSC = isStalled( iSlot );
711  if ( !eventStalledSC.isSuccess() ) {
712  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
713  eventFailed( thisSlot.eventContext ).ignore();
714  }
715  }
716  } // end loop on slots
717 
718  verbose() << "States Updated." << endmsg;
719 
720  return global_sc;
721 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
ContextID_t slot() const
Definition: EventContext.h:40
concurrency::recursive_CF::ExecutionFlowManager m_efManager
Member to take care of the control flow.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
ContextEvt_t evt() const
Definition: EventContext.h:39
T push_back(T...args)
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update states and decisions of algorithms.
T sort(T...args)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
void ignore() const
Definition: StatusCode.h:109
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 216 of file ForwardSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> ForwardSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 179 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 155 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 161 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::vector<std::vector<std::string> > > ForwardSchedulerSvc::m_algosDependencies
private
Initial value:
{
this, "AlgosDependencies", {}, "[[deprecated]]"}

Definition at line 124 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 184 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 208 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 126 of file ForwardSchedulerSvc.h.

concurrency::recursive_CF::ControlFlowGraph* ForwardSchedulerSvc::m_efg
private

Definition at line 285 of file ForwardSchedulerSvc.h.

concurrency::recursive_CF::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 239 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 167 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 173 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first = true
private

Definition at line 244 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 170 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 146 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxAlgosInFlight
private
Initial value:
{this, "MaxAlgosInFlight", 1,
"[[deprecated]] Taken from the whiteboard"}

Definition at line 122 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_maxEventsInFlight
private
Initial value:
{this, "MaxEventsInFlight", 0,
"Maximum number of event processed simultaneously"}

Definition at line 116 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 134 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 130 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 132 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 276 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 275 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 149 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 118 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 242 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 204 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 127 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 164 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 121 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: