The Gaudi Framework  v28r3 (cc1cf868)
ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

struct  enqueueSchedulerActionTask
 
class  SchedulerState
 

Public Member Functions

 ~ForwardSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleBase &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleArrayBase &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandleBase &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1)
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_maxEventsInFlight
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< unsigned int > m_maxAlgosInFlight
 
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first = true
 
concurrency::PrecedenceRulesGraphm_efg
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms.

This is a forward scheduler: an algorithm is scheduled for execution as soon as its data dependencies are available in the whiteboard, and as soon as a worker thread becomes available. The scheduler has no means for automatic intra-event throughput maximization. Consider the AvalancheScheduler if you need those.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 83 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

ForwardSchedulerSvc::~ForwardSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 353 of file ForwardSchedulerSvc.cpp.

353  {
354 
355  if (msgLevel(MSG::DEBUG))
356  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
357 
359  error() << "problems initializing ThreadPoolSvc" << endmsg;
361  return;
362  }
363 
364  // Wait for actions pushed into the queue by finishing tasks.
365  action thisAction;
367 
368  m_isActive = ACTIVE;
369 
370  // Continue to wait if the scheduler is running or there is something to do
371  info() << "Start checking the actionsQueue" << endmsg;
372  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
373  m_actionsQueue.pop( thisAction );
374  sc = thisAction();
375  if ( sc != StatusCode::SUCCESS )
376  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
377  else
378  verbose() << "Action succeeded." << endmsg;
379  }
380 
381  info() << "Terminating thread-pool resources" << endmsg;
383  error() << "Problems terminating thread pool" << endmsg;
385  }
386 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1001 of file ForwardSchedulerSvc.cpp.

1001  {
1002 
1004  m_sState.push_back( SchedulerState( a, e, t ) );
1005 }
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 422 of file ForwardSchedulerSvc.cpp.

422  {
423  unsigned int index = m_algname_index_map[algoname];
424  return index;
425 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 396 of file ForwardSchedulerSvc.cpp.

396  {
397 
398  if ( m_isActive == ACTIVE ) {
399  // Drain the scheduler
401  // This would be the last action
402  m_actionsQueue.push( [this]() -> StatusCode {
404  return StatusCode::SUCCESS;
405  } );
406  }
407 
408  return StatusCode::SUCCESS;
409 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1008 of file ForwardSchedulerSvc.cpp.

1008  {
1009 
1011 
1012  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1013  if ( *itr == a ) {
1014  m_sState.erase( itr );
1015  return true;
1016  }
1017  }
1018 
1019  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1020  return false;
1021 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
T lock(T...args)
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 765 of file ForwardSchedulerSvc.cpp.

765  {
766 
767  // To have just one big message
768  std::ostringstream outputMessageStream;
769 
770  outputMessageStream << "============================== Execution Task State ============================="
771  << std::endl;
772  dumpState( outputMessageStream );
773 
774  outputMessageStream << std::endl
775  << "============================== Scheduler State ================================="
776  << std::endl;
777 
778  int slotCount = -1;
779  for ( auto thisSlot : m_eventSlots ) {
780  slotCount++;
781  if ( thisSlot.complete ) continue;
782 
783  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
784  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
785 
786  if ( 0 > iSlot or iSlot == slotCount ) {
787  outputMessageStream << "Algorithms states:" << std::endl;
788 
789  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
790  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
791  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
792  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
793  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
794  const int depsSize = deps.size();
795  if ( depsSize == 0 ) outputMessageStream << " none";
796 
797  DataObjIDColl missing;
798  for ( auto d : deps ) {
799  outputMessageStream << d << " ";
800  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
801  // outputMessageStream << "[missing] ";
802  missing.insert( d );
803  }
804  }
805 
806  if ( !missing.empty() ) {
807  outputMessageStream << ". The following are missing: ";
808  for ( auto d : missing ) {
809  outputMessageStream << d << " ";
810  }
811  }
812 
813  outputMessageStream << std::endl;
814  }
815 
816  // Snapshot of the WhiteBoard
817  outputMessageStream << "\nWhiteboard contents: " << std::endl;
818  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
819 
820  // Snapshot of the ControlFlow
821  outputMessageStream << "\nControl Flow:" << std::endl;
822  std::stringstream cFlowStateStringStream;
823  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
824 
825  outputMessageStream << cFlowStateStringStream.str() << std::endl;
826  }
827  }
828 
829  outputMessageStream << "=================================== END ======================================" << std::endl;
830 
831  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
832 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )
override

Definition at line 1034 of file ForwardSchedulerSvc.cpp.

1034  {
1035 
1037 
1038  std::ostringstream ost;
1039  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1040  dumpState( ost );
1041 
1042  info() << ost.str() << endmsg;
1043 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1024 of file ForwardSchedulerSvc.cpp.

1024  {
1025 
1027 
1028  for ( auto it : m_sState ) {
1029  ost << " " << it << std::endl;
1030  }
1031 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 553 of file ForwardSchedulerSvc.cpp.

553  {
554 
555  // Set the number of slots available to an error code
556  m_freeSlots.store( 0 );
557 
558  fatal() << "*** Event " << eventContext->evt() << " on slot "
559  << eventContext->slot() << " failed! ***" << endmsg;
560 
561  std::ostringstream ost;
562  m_algExecStateSvc->dump(ost, *eventContext);
563 
564  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
565  << ":\n" << ost.str() << endmsg;
566 
567  dumpSchedulerState(-1);
568 
569  // Empty queue and deactivate the service
570  action thisAction;
571  while ( m_actionsQueue.try_pop( thisAction ) ) {
572  };
573  deactivate();
574 
575  // Push into the finished events queue the failed context
576  EventContext* thisEvtContext;
577  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
578  m_finishedEvents.push( thisEvtContext );
579  };
580  m_finishedEvents.push( eventContext );
581 
582  return StatusCode::FAILURE;
583 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 321 of file ForwardSchedulerSvc.cpp.

321  {
322 
324  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
325 
326  sc = deactivate();
327  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
328 
329  info() << "Joining Scheduler thread" << endmsg;
330  m_thread.join();
331 
332  // Final error check after thread pool termination
333  if ( m_isActive == FAILURE ) {
334  error() << "problems in scheduler thread" << endmsg;
335  return StatusCode::FAILURE;
336  }
337 
338  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
339 
340  return sc;
341 }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int ForwardSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 491 of file ForwardSchedulerSvc.cpp.

491  {
492  return std::max( m_freeSlots.load(), 0 );
493 }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 416 of file ForwardSchedulerSvc.cpp.

416  {
417  return m_algname_vect[index];
418 }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 44 of file ForwardSchedulerSvc.cpp.

44  {
45 
46  // Initialise mother class (read properties, ...)
48  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
49 
50  // Get hold of the TBBSvc. This should initialize the thread pool
51  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
52  if ( !m_threadPoolSvc.isValid() ) {
53  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
54  return StatusCode::FAILURE;
55  }
56 
57  // Activate the scheduler in another thread.
58  info() << "Activating scheduler in a separate thread" << endmsg;
60 
61  while ( m_isActive != ACTIVE ) {
62  if ( m_isActive == FAILURE ) {
63  fatal() << "Terminating initialization" << endmsg;
64  return StatusCode::FAILURE;
65  } else {
66  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
67  sleep( 1 );
68  }
69  }
70 
71  // Get the algo resource pool
72  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
73  if ( !m_algResourcePool.isValid() ) {
74  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
75  return StatusCode::FAILURE;
76  }
77 
78  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
79  if (!m_algExecStateSvc.isValid()) {
80  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
81  return StatusCode::FAILURE;
82  }
83 
84  // Get Whiteboard
86  if ( !m_whiteboard.isValid() ) {
87  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
88  return StatusCode::FAILURE;
89  }
90 
91  // Check the MaxEventsInFlight parameters and react
92  // Deprecated for the moment
93  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
94  if ( m_maxEventsInFlight != 0 ) {
95  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
96  << "Please migrate your code options files." << endmsg;
97 
98  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
99  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
100  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
101  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
102  }
103  }
104 
105  // set global concurrency flags
107 
108  // Align the two quantities
109  m_maxEventsInFlight = numberOfWBSlots;
110 
111  // Set the number of free slots
113 
114  if ( m_algosDependencies.size() != 0 ) {
115  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
116  << " FIX your job options #####" << endmsg;
117  }
118 
119  // Get the list of algorithms
121  const unsigned int algsNumber = algos.size();
122  info() << "Found " << algsNumber << " algorithms" << endmsg;
123 
124  /* Dependencies
125  1) Look for handles in algo, if none
126  2) Assume none are required
127  */
128 
129  DataObjIDColl globalInp, globalOutp;
130 
131  // figure out all outputs
132  for (IAlgorithm* ialgoPtr : algos) {
133  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
134  if (!algoPtr) {
135  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
136  }
137  for (auto id : algoPtr->outputDataObjs()) {
138  auto r = globalOutp.insert(id);
139  if (!r.second) {
140  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
141  }
142  }
143  }
144 
145  std::ostringstream ostdd;
146  ostdd << "Data Dependencies for Algorithms:";
147 
149  for ( IAlgorithm* ialgoPtr : algos ) {
150  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
151  if ( nullptr == algoPtr ) {
152  fatal() << "Could not convert IAlgorithm into Algorithm for "
153  << ialgoPtr->name()
154  << ": this will result in a crash." << endmsg;
155  return StatusCode::FAILURE;
156  }
157 
158  ostdd << "\n " << algoPtr->name();
159 
160  DataObjIDColl algoDependencies;
161  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
162  for ( auto id : algoPtr->inputDataObjs() ) {
163  ostdd << "\n o INPUT " << id;
164  if (id.key().find(":")!=std::string::npos) {
165  ostdd << " contains alternatives which require resolution...\n";
166  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
167  auto itok = std::find_if( tokens.begin(), tokens.end(),
168  [&](const std::string& t) {
169  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
170  } );
171  if (itok!=tokens.end()) {
172  ostdd << "found matching output for " << *itok
173  << " -- updating scheduler info\n";
174  id.updateKey(*itok);
175  } else {
176  error() << "failed to find alternate in global output list"
177  << " for id: " << id << " in Alg " << algoPtr->name()
178  << endmsg;
179  m_showDataDeps = true;
180  }
181  }
182  algoDependencies.insert( id );
183  globalInp.insert( id );
184  }
185  for ( auto id : algoPtr->outputDataObjs() ) {
186  ostdd << "\n o OUTPUT " << id;
187  if (id.key().find(":")!=std::string::npos) {
188  error() << " in Alg " << algoPtr->name()
189  << " alternatives are NOT allowed for outputs! id: "
190  << id << endmsg;
191  m_showDataDeps = true;
192  }
193  }
194  } else {
195  ostdd << "\n none";
196  }
197  m_algosDependencies.emplace_back( algoDependencies );
198  }
199 
200  if ( m_showDataDeps ) {
201  info() << ostdd.str() << endmsg;
202  }
203 
204  // Fill the containers to convert algo names to index
205  m_algname_vect.reserve( algsNumber );
206  unsigned int index = 0;
207  IAlgorithm* dataLoaderAlg( nullptr );
208  for ( IAlgorithm* algo : algos ) {
209  const std::string& name = algo->name();
210  m_algname_index_map[name] = index;
212  if (algo->name() == m_useDataLoader) {
213  dataLoaderAlg = algo;
214  }
215  index++;
216  }
217 
218  // Check if we have unmet global input dependencies
219  if ( m_checkDeps ) {
220  DataObjIDColl unmetDep;
221  for ( auto o : globalInp ) {
222  if ( globalOutp.find( o ) == globalOutp.end() ) {
223  unmetDep.insert( o );
224  }
225  }
226 
227  if ( unmetDep.size() > 0 ) {
228 
229  std::ostringstream ost;
230  for ( auto& o : unmetDep ) {
231  ost << "\n o " << o << " required by Algorithm: ";
232  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
233  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
234  ost << "\n * " << m_algname_vect[i];
235  }
236  }
237  }
238 
239  if ( m_useDataLoader != "" ) {
240  // Find the DataLoader Alg
241  if (dataLoaderAlg == nullptr) {
242  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
243  << "\" found, and unmet INPUT dependencies "
244  << "detected:\n" << ost.str() << endmsg;
245  return StatusCode::FAILURE;
246  }
247 
248  info() << "Will attribute the following unmet INPUT dependencies to \""
249  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
250  << "\" Algorithm"
251  << ost.str() << endmsg;
252 
253  // Set the property Load of DataLoader Alg
254  Algorithm *dataAlg = dynamic_cast<Algorithm*>(dataLoaderAlg);
255  if ( !dataAlg ) {
256  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value()
257  << "\" IAlg to Algorithm" << endmsg;
258  return StatusCode::FAILURE;
259  }
260 
261  for (auto& id : unmetDep) {
262  debug() << "adding OUTPUT dep \"" << id << "\" to "
263  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
264  << endmsg;
266  }
267 
268  } else {
269  fatal() << "Auto DataLoading not requested, "
270  << "and the following unmet INPUT dependencies were found:"
271  << ost.str() << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  } else {
276  info() << "No unmet INPUT data dependencies were found" << endmsg;
277  }
278  }
279 
280  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
282  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
283 
284  // Shortcut for the message service
285  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
286  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
287 
289  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
290  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
291 
292  // Clearly inform about the level of concurrency
293  info() << "Concurrency level information:" << endmsg;
294  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
295  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
296  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
297 
298  m_efg = algPool->getPRGraph();
299 
300  if (m_showControlFlow) {
301  info() << std::endl
302  << "========== Algorithm and Sequence Configuration =========="
303  << std::endl << std::endl;
304  info() << m_efg->dumpControlFlow() << endmsg;
305  }
306 
307  if (m_showDataFlow) {
308  info() << std::endl
309  << "======================= Data Flow ========================"
310  << std::endl;
311  info() << m_efg->dumpDataFlow() << endmsg;
312  }
313 
314  return sc;
315 }
Gaudi::Property< bool > m_showControlFlow
StatusCode initialize() override
Definition: Service.cpp:64
T empty(T...args)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
T endl(T...args)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
StatusCode initialize(PrecedenceRulesGraph *graph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
Gaudi::Property< bool > m_checkDeps
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
concurrency::PrecedenceRulesGraph * m_efg
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_showDataFlow
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
Gaudi::Property< std::string > m_whiteboardSvcName
T bind(T...args)
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
T assign(T...args)
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
Gaudi::Property< bool > m_showDataDeps
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
PrecedenceRulesGraph * getPrecedenceRulesGraph() const
Get the flow graph instance.
std::string dumpControlFlow() const
Print out control flow of Algorithms and Sequences.
T for_each(T...args)
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 741 of file ForwardSchedulerSvc.cpp.

741  {
742  // Get the slot
743  EventSlot& thisSlot = m_eventSlots[iSlot];
744 
745  if ( m_actionsQueue.empty() && m_algosInFlight == 0 &&
747 
748  info() << "About to declare a stall" << endmsg;
749  fatal() << "*** Stall detected! ***\n" << endmsg;
750  dumpSchedulerState( iSlot );
751  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
752 
753  return StatusCode::FAILURE;
754  }
755  return StatusCode::SUCCESS;
756 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 499 of file ForwardSchedulerSvc.cpp.

499  {
500  unsigned int slotNum = 0;
501  for ( auto& thisSlot : m_eventSlots ) {
502  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
503  updateStates( slotNum );
504  }
505  slotNum++;
506  }
507  return StatusCode::SUCCESS;
508 }
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 514 of file ForwardSchedulerSvc.cpp.

514  {
515  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
516  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
517  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
518  // << " active: " << m_isActive << endmsg;
519  return StatusCode::FAILURE;
520  } else {
521  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
522  // << " active: " << m_isActive << endmsg;
523  m_finishedEvents.pop( eventContext );
524  m_freeSlots++;
525  if (msgLevel(MSG::DEBUG))
526  debug() << "Popped slot " << eventContext->slot() << "(event "
527  << eventContext->evt() << ")" << endmsg;
528  return StatusCode::SUCCESS;
529  }
530 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Gaudi::Property< int > m_maxEventsInFlight
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 836 of file ForwardSchedulerSvc.cpp.

836  {
837 
838  // Do the control flow
839  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
840  if (sc.isSuccess())
841  if (msgLevel(MSG::VERBOSE))
842  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
843  << si << endmsg;
844 
845  return sc;
846 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 850 of file ForwardSchedulerSvc.cpp.

850  {
851 
852  StatusCode sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
853 
854  StatusCode updateSc( StatusCode::FAILURE );
855  if ( sc == StatusCode::SUCCESS )
856  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
857 
858  if (updateSc.isSuccess())
859  if (msgLevel(MSG::VERBOSE))
860  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
861  << si<< endmsg;
862 
863  return updateSc;
864 }
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

Definition at line 933 of file ForwardSchedulerSvc.cpp.

934  {
935 
936  // Put back the instance
937  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
938  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
939  // EventContext* eventContext = castedAlgo->getContext();
940 
941  // Check if the execution failed
942  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
943  eventFailed(eventContext).ignore();
944 
945  Gaudi::Hive::setCurrentContext(eventContext);
946  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
947 
948  if ( !sc.isSuccess() ) {
949  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
950  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
951  return StatusCode::FAILURE;
952  }
953 
954  m_algosInFlight--;
955 
956  EventSlot& thisSlot = m_eventSlots[si];
957 
958  // Update the catalog: some new products may be there
959  m_whiteboard->selectStore( eventContext->slot() ).ignore();
960 
961  // update prods in the dataflow
962  // DP: Handles could be used. Just update what the algo wrote
963  DataObjIDColl new_products;
964  m_whiteboard->getNewDataObjects( new_products ).ignore();
965  for ( const auto& new_product : new_products )
966  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
967  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
971  << m_algosInFlight << endmsg;
972 
973  // Limit number of updates
974  if ( m_updateNeeded ) {
975  // Schedule an update of the status of the algorithms
976  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1);
977  m_actionsQueue.push( updateAction );
978  m_updateNeeded = false;
979  }
980 
981  if ( msgLevel( MSG::DEBUG ) )
982  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
983  State state;
984  if ( algo->filterPassed() ) {
985  state = State::EVTACCEPTED;
986  } else {
987  state = State::EVTREJECTED;
988  }
989 
990  sc = thisSlot.algsStates.updateState( iAlgo, state );
991 
992  if (sc.isSuccess())
993  if (msgLevel(MSG::VERBOSE))
994  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
996 
997  return sc;
998 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:40
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:39
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 868 of file ForwardSchedulerSvc.cpp.

868  {
869 
871 
872  const std::string& algName( index2algname( iAlgo ) );
873  IAlgorithm* ialgoPtr = nullptr;
874  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
875 
876  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
877  EventContext* eventContext( m_eventSlots[si].eventContext );
878  if ( !eventContext )
879  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
880 
881  ++m_algosInFlight;
882  // prepare a scheduler action to run once the algorithm is executed
883  auto promote2ExecutedClosure = std::bind(&ForwardSchedulerSvc::promoteToExecuted,
884  this,
885  iAlgo,
886  eventContext->slot(),
887  ialgoPtr,
888  eventContext);
889  // Avoid to use tbb if the pool size is 1 and run in this thread
890  if (-100 != m_threadPoolSize) {
891 
892  // this parent task is needed to promote an Algorithm as EXECUTED,
893  // it will be started as soon as the child task (see below) is completed
894  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
895  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
896  // setting parent's refcount to 1 is made here only for consistency
897  // (in this case since it is not scheduled explicitly and there it has only one child task)
898  triggerAlgoStateUpdate->set_ref_count(1);
899  // the child task that executes an Algorithm
900  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
901  AlgoExecutionTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
902  // schedule the algoTask
903  tbb::task::enqueue( *algoTask);
904 
905  } else {
906  AlgoExecutionTask theTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
907  theTask.execute();
908  promote2ExecutedClosure();
909  }
910 
911  if ( msgLevel( MSG::DEBUG ) )
912  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
913  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
914 
915  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
916 
917  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
918 
919  if (updateSc.isSuccess())
920  if (msgLevel(MSG::VERBOSE))
921  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
922  << si << endmsg;
923  return updateSc;
924  } else {
925  if ( msgLevel( MSG::DEBUG ) )
926  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
927  return sc;
928  }
929 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
T bind(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 435 of file ForwardSchedulerSvc.cpp.

435  {
436 
437  if ( m_first ) {
438  m_first = false;
439  }
440 
441  if ( !eventContext ) {
442  fatal() << "Event context is nullptr" << endmsg;
443  return StatusCode::FAILURE;
444  }
445 
446  if ( m_freeSlots.load() == 0 ) {
447  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
448  return StatusCode::FAILURE;
449  }
450 
451  // no problem as push new event is only called from one thread (event loop manager)
452  m_freeSlots--;
453 
454  auto action = [this, eventContext]() -> StatusCode {
455  // Event processing slot forced to be the same as the wb slot
456  const unsigned int thisSlotNum = eventContext->slot();
457  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
458  if ( !thisSlot.complete ) {
459  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
460  return StatusCode::FAILURE;
461  }
462 
463  debug() << "Executing event " << eventContext->evt() << " on slot "
464  << thisSlotNum << endmsg;
465  thisSlot.reset( eventContext );
466 
467  return this->updateStates( thisSlotNum );
468  }; // end of lambda
469 
470  // Kick off the scheduling!
471  if ( msgLevel( MSG::VERBOSE ) ) {
472  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
473  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
474  }
475  m_actionsQueue.push( action );
476 
477  return StatusCode::SUCCESS;
478 }
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 481 of file ForwardSchedulerSvc.cpp.

481  {
482  StatusCode sc;
483  for ( auto context : eventContexts ) {
484  sc = pushNewEvent( context );
485  if ( sc != StatusCode::SUCCESS ) return sc;
486  }
487  return sc;
488 }
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 536 of file ForwardSchedulerSvc.cpp.

536  {
537  if ( m_finishedEvents.try_pop( eventContext ) ) {
538  if ( msgLevel( MSG::DEBUG ) )
539  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
540  << endmsg;
541  m_freeSlots++;
542  return StatusCode::SUCCESS;
543  }
544  return StatusCode::FAILURE;
545 }
ContextID_t slot() const
Definition: EventContext.h:40
ContextEvt_t evt() const
Definition: EventContext.h:39
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 599 of file ForwardSchedulerSvc.cpp.

599  {
600 
601  m_updateNeeded = true;
602 
603  // Fill a map of initial state / action using closures.
604  // done to update the states w/o several if/elses
605  // Posterchild for constexpr with gcc4.7 onwards!
606  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
607  statesTransitions = {
608  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
609  this,
610  std::placeholders::_1,
611  std::placeholders::_2)},
612  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
613  this,
614  std::placeholders::_1,
615  std::placeholders::_2)}
616  };*/
617 
618  StatusCode global_sc( StatusCode::FAILURE, true );
619 
620  // Sort from the oldest to the newest event
621  // Prepare a vector of pointers to the slots to avoid copies
622  std::vector<EventSlot*> eventSlotsPtrs;
623 
624  // Consider all slots if si <0 or just one otherwise
625  if ( si < 0 ) {
626  const int eventsSlotsSize( m_eventSlots.size() );
627  eventSlotsPtrs.reserve( eventsSlotsSize );
628  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
629  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
630  }
631  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
632  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
633  } else {
634  eventSlotsPtrs.push_back( &m_eventSlots[si] );
635  }
636 
637  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
638  int iSlot = thisSlotPtr->eventContext->slot();
639 
640  // Cache the states of the algos to improve readability and performance
641  auto& thisSlot = m_eventSlots[iSlot];
642  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
643 
644  // Take care of the control ready update
645  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
646 
647  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
648  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
649  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
650  if (algState==AlgsExecutionStates::ERROR)
651  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
652  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
653  partial_sc=StatusCode::SUCCESS;
654  for (auto state_transition = statesTransitions.find(algState);
655  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
656  state_transition++){
657  partial_sc = state_transition->second(iAlgo,iSlot);
658  if (partial_sc.isFailure()){
659  verbose() << "Could not apply transition from "
660  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
661  << " for algorithm " << index2algname(iAlgo)
662  << " on processing slot " << iSlot << endmsg;
663  }
664  else{global_sc=partial_sc;}
665  } // end loop on transitions
666  }*/ // end loop on algos
667 
668  StatusCode partial_sc( StatusCode::FAILURE, true );
669  // first update CONTROLREADY to DATAREADY
670  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
671  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
672 
673  uint algIndex = *it;
674  partial_sc = promoteToDataReady(algIndex, iSlot);
675  if (partial_sc.isFailure())
676  if (msgLevel(MSG::VERBOSE))
677  verbose() << "Could not apply transition from "
678  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
679  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
680  }
681 
682  // now update DATAREADY to SCHEDULED
683  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
684  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
685  uint algIndex = *it;
686 
687  partial_sc = promoteToScheduled( algIndex, iSlot );
688 
689  if (msgLevel(MSG::VERBOSE))
690  if (partial_sc.isFailure())
691  verbose() << "Could not apply transition from "
692  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
693  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
694  }
695 
696  // Not complete because this would mean that the slot is already free!
697  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
698  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
699  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
700  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
701 
702  thisSlot.complete = true;
703  // if the event did not fail, add it to the finished events
704  // otherwise it is taken care of in the error handling already
705  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
706  m_finishedEvents.push(thisSlot.eventContext);
707  if (msgLevel(MSG::DEBUG))
708  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
709  << thisSlot.eventContext->slot() << ")." << endmsg;
710  }
711  // now let's return the fully evaluated result of the control flow
712  if ( msgLevel( MSG::DEBUG ) ) {
714  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
715  debug() << ss.str() << endmsg;
716  }
717 
718  thisSlot.eventContext = nullptr;
719  } else {
720  StatusCode eventStalledSC = isStalled(iSlot);
721  if (! eventStalledSC.isSuccess()) {
722  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
723  eventFailed(thisSlot.eventContext).ignore();
724  }
725  }
726  } // end loop on slots
727 
728  verbose() << "States Updated." << endmsg;
729 
730  return global_sc;
731 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
ContextID_t slot() const
Definition: EventContext.h:40
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
ContextEvt_t evt() const
Definition: EventContext.h:39
T push_back(T...args)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
void ignore() const
Definition: StatusCode.h:106
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 216 of file ForwardSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> ForwardSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 179 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 155 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 161 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::vector<std::vector<std::string> > > ForwardSchedulerSvc::m_algosDependencies
private
Initial value:
{
this, "AlgosDependencies", {}, "[[deprecated]]"}

Definition at line 123 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 184 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 208 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_checkDeps
private
Initial value:
{this, "CheckDependencies", false,
"Runtime check of Algorithm Data Dependencies"}

Definition at line 125 of file ForwardSchedulerSvc.h.

concurrency::PrecedenceRulesGraph* ForwardSchedulerSvc::m_efg
private

Definition at line 282 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 236 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 167 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 173 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first = true
private

Definition at line 241 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 170 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 146 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxAlgosInFlight
private
Initial value:
{this, "MaxAlgosInFlight", 1,
"[[deprecated]] Taken from the whiteboard"}

Definition at line 121 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_maxEventsInFlight
private
Initial value:
{this, "MaxEventsInFlight", 0,
"Maximum number of event processed simultaneously"}

Definition at line 116 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 134 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 130 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 132 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 273 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 272 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 149 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_threadPoolSize
private
Initial value:
{this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 118 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 239 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 204 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 127 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 164 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 120 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: