The Gaudi Framework  v28r2p1 (f1a77ff4)
ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

struct  enqueueSchedulerActionTask
 
class  SchedulerState
 

Public Member Functions

 ~ForwardSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declarePrivateTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Private tool. More...
 
template<class T >
StatusCode declarePublicTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Public tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleBase &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleArrayBase &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandleBase &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1)
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_maxEventsInFlight
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< unsigned int > m_maxAlgosInFlight
 
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< std::stringm_useDataLoader
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first = true
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms.

This is a forward scheduler: an algorithm is scheduled for execution as soon as its data dependencies are available in the whiteboard, and as soon as a worker thread becomes available. The scheduler has no means for automatic intra-event throughput maximization. Consider the AvalancheScheduler if you need those.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 83 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

ForwardSchedulerSvc::~ForwardSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 330 of file ForwardSchedulerSvc.cpp.

330  {
331 
332  if (msgLevel(MSG::DEBUG))
333  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
334 
336  error() << "problems initializing ThreadPoolSvc" << endmsg;
338  return;
339  }
340 
341  // Wait for actions pushed into the queue by finishing tasks.
342  action thisAction;
344 
345  m_isActive = ACTIVE;
346 
347  // Continue to wait if the scheduler is running or there is something to do
348  info() << "Start checking the actionsQueue" << endmsg;
349  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
350  m_actionsQueue.pop( thisAction );
351  sc = thisAction();
352  if ( sc != StatusCode::SUCCESS )
353  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
354  else
355  verbose() << "Action succeeded." << endmsg;
356  }
357 
358  info() << "Terminating thread-pool resources" << endmsg;
360  error() << "Problems terminating thread pool" << endmsg;
362  }
363 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 977 of file ForwardSchedulerSvc.cpp.

977  {
978 
980  m_sState.push_back( SchedulerState( a, e, t ) );
981 }
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 399 of file ForwardSchedulerSvc.cpp.

399  {
400  unsigned int index = m_algname_index_map[algoname];
401  return index;
402 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 373 of file ForwardSchedulerSvc.cpp.

373  {
374 
375  if ( m_isActive == ACTIVE ) {
376  // Drain the scheduler
378  // This would be the last action
379  m_actionsQueue.push( [this]() -> StatusCode {
381  return StatusCode::SUCCESS;
382  } );
383  }
384 
385  return StatusCode::SUCCESS;
386 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 984 of file ForwardSchedulerSvc.cpp.

984  {
985 
987 
988  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
989  if ( *itr == a ) {
990  m_sState.erase( itr );
991  return true;
992  }
993  }
994 
995  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
996  return false;
997 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:715
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
T lock(T...args)
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 741 of file ForwardSchedulerSvc.cpp.

741  {
742 
743  // To have just one big message
744  std::ostringstream outputMessageStream;
745 
746  outputMessageStream << "============================== Execution Task State ============================="
747  << std::endl;
748  dumpState( outputMessageStream );
749 
750  outputMessageStream << std::endl
751  << "============================== Scheduler State ================================="
752  << std::endl;
753 
754  int slotCount = -1;
755  for ( auto thisSlot : m_eventSlots ) {
756  slotCount++;
757  if ( thisSlot.complete ) continue;
758 
759  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
760  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
761 
762  if ( 0 > iSlot or iSlot == slotCount ) {
763  outputMessageStream << "Algorithms states:" << std::endl;
764 
765  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
766  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
767  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
768  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
769  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
770  const int depsSize = deps.size();
771  if ( depsSize == 0 ) outputMessageStream << " none";
772 
773  DataObjIDColl missing;
774  for ( auto d : deps ) {
775  outputMessageStream << d << " ";
776  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
777  // outputMessageStream << "[missing] ";
778  missing.insert( d );
779  }
780  }
781 
782  if ( !missing.empty() ) {
783  outputMessageStream << ". The following are missing: ";
784  for ( auto d : missing ) {
785  outputMessageStream << d << " ";
786  }
787  }
788 
789  outputMessageStream << std::endl;
790  }
791 
792  // Snapshot of the WhiteBoard
793  outputMessageStream << "\nWhiteboard contents: " << std::endl;
794  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
795 
796  // Snapshot of the ControlFlow
797  outputMessageStream << "\nControl Flow:" << std::endl;
798  std::stringstream cFlowStateStringStream;
799  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
800 
801  outputMessageStream << cFlowStateStringStream.str() << std::endl;
802  }
803  }
804 
805  outputMessageStream << "=================================== END ======================================" << std::endl;
806 
807  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
808 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )
override

Definition at line 1010 of file ForwardSchedulerSvc.cpp.

1010  {
1011 
1013 
1014  std::ostringstream ost;
1015  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1016  dumpState( ost );
1017 
1018  info() << ost.str() << endmsg;
1019 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1000 of file ForwardSchedulerSvc.cpp.

1000  {
1001 
1003 
1004  for ( auto it : m_sState ) {
1005  ost << " " << it << std::endl;
1006  }
1007 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 529 of file ForwardSchedulerSvc.cpp.

529  {
530 
531  // Set the number of slots available to an error code
532  m_freeSlots.store( 0 );
533 
534  fatal() << "*** Event " << eventContext->evt() << " on slot "
535  << eventContext->slot() << " failed! ***" << endmsg;
536 
537  std::ostringstream ost;
538  m_algExecStateSvc->dump(ost, *eventContext);
539 
540  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
541  << ":\n" << ost.str() << endmsg;
542 
543  dumpSchedulerState(-1);
544 
545  // Empty queue and deactivate the service
546  action thisAction;
547  while ( m_actionsQueue.try_pop( thisAction ) ) {
548  };
549  deactivate();
550 
551  // Push into the finished events queue the failed context
552  EventContext* thisEvtContext;
553  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
554  m_finishedEvents.push( thisEvtContext );
555  };
556  m_finishedEvents.push( eventContext );
557 
558  return StatusCode::FAILURE;
559 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 298 of file ForwardSchedulerSvc.cpp.

298  {
299 
301  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
302 
303  sc = deactivate();
304  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
305 
306  info() << "Joining Scheduler thread" << endmsg;
307  m_thread.join();
308 
309  // Final error check after thread pool termination
310  if ( m_isActive == FAILURE ) {
311  error() << "problems in scheduler thread" << endmsg;
312  return StatusCode::FAILURE;
313  }
314 
315  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
316 
317  return sc;
318 }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int ForwardSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 467 of file ForwardSchedulerSvc.cpp.

467  {
468  return std::max( m_freeSlots.load(), 0 );
469 }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 393 of file ForwardSchedulerSvc.cpp.

393  {
394  return m_algname_vect[index];
395 }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 44 of file ForwardSchedulerSvc.cpp.

44  {
45 
46  // Initialise mother class (read properties, ...)
48  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
49 
50  // Get hold of the TBBSvc. This should initialize the thread pool
51  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
52  if ( !m_threadPoolSvc.isValid() ) {
53  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
54  return StatusCode::FAILURE;
55  }
56 
57  // Activate the scheduler in another thread.
58  info() << "Activating scheduler in a separate thread" << endmsg;
60 
61  while ( m_isActive != ACTIVE ) {
62  if ( m_isActive == FAILURE ) {
63  fatal() << "Terminating initialization" << endmsg;
64  return StatusCode::FAILURE;
65  } else {
66  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
67  sleep( 1 );
68  }
69  }
70 
71  // Get the algo resource pool
72  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
73  if ( !m_algResourcePool.isValid() ) {
74  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
75  return StatusCode::FAILURE;
76  }
77 
78  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
79  if (!m_algExecStateSvc.isValid()) {
80  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
81  return StatusCode::FAILURE;
82  }
83 
84  // Get Whiteboard
86  if ( !m_whiteboard.isValid() ) {
87  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
88  return StatusCode::FAILURE;
89  }
90 
91  // Check the MaxEventsInFlight parameters and react
92  // Deprecated for the moment
93  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
94  if ( m_maxEventsInFlight != 0 ) {
95  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
96  << "Please migrate your code options files." << endmsg;
97 
98  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
99  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
100  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
101  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
102  }
103  }
104 
105  // set global concurrency flags
107 
108  // Align the two quantities
109  m_maxEventsInFlight = numberOfWBSlots;
110 
111  // Set the number of free slots
113 
114  if ( m_algosDependencies.size() != 0 ) {
115  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
116  << " FIX your job options #####" << endmsg;
117  }
118 
119  // Get the list of algorithms
121  const unsigned int algsNumber = algos.size();
122  info() << "Found " << algsNumber << " algorithms" << endmsg;
123 
124  /* Dependencies
125  1) Look for handles in algo, if none
126  2) Assume none are required
127  */
128 
129  DataObjIDColl globalInp, globalOutp;
130 
131  // figure out all outputs
132  for (IAlgorithm* ialgoPtr : algos) {
133  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
134  if (!algoPtr) {
135  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
136  }
137  for (auto id : algoPtr->outputDataObjs()) {
138  auto r = globalOutp.insert(id);
139  if (!r.second) {
140  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
141  }
142  }
143  }
144  info() << "outputs:\n" ;
145  for (const auto& i : globalOutp ) {
146  info() << i << '\n' ;
147  }
148  info() << endmsg;
149 
150 
151 
152  info() << "Data Dependencies for Algorithms:";
153 
155  for ( IAlgorithm* ialgoPtr : algos ) {
156  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
157  if ( nullptr == algoPtr )
158  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
159 
160  info() << "\n " << algoPtr->name();
161 
162  // FIXME
163  DataObjIDColl algoDependencies;
164  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
165  for ( auto id : algoPtr->inputDataObjs() ) {
166  info() << "\n o INPUT " << id;
167  if (id.key().find(":")!=std::string::npos) {
168  info() << " contains alternatives which require resolution... " << endmsg;
169  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
170  auto itok = std::find_if( tokens.begin(), tokens.end(),
171  [&](const std::string& t) {
172  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
173  } );
174  if (itok!=tokens.end()) {
175  info() << "found matching output for " << *itok << " -- updating scheduler info" << endmsg;
176  id.updateKey(*itok);
177  } else {
178  error() << "failed to find alternate in global output list" << endmsg;
179  }
180  }
181  algoDependencies.insert( id );
182  globalInp.insert( id );
183  }
184  for ( auto id : algoPtr->outputDataObjs() ) {
185  info() << "\n o OUTPUT " << id;
186  if (id.key().find(":")!=std::string::npos) {
187  info() << " alternatives are NOT allowed for outputs..." << endmsg;
188  }
189  }
190  } else {
191  info() << "\n none";
192  }
193  m_algosDependencies.emplace_back( algoDependencies );
194  }
195  info() << endmsg;
196 
197  // Fill the containers to convert algo names to index
198  m_algname_vect.reserve( algsNumber );
199  unsigned int index = 0;
200  IAlgorithm* dataLoaderAlg( nullptr );
201  for ( IAlgorithm* algo : algos ) {
202  const std::string& name = algo->name();
203  m_algname_index_map[name] = index;
205  if (algo->name() == m_useDataLoader) {
206  dataLoaderAlg = algo;
207  }
208  index++;
209  }
210 
211  // Check if we have unmet global input dependencies
212  if ( m_checkDeps ) {
213  DataObjIDColl unmetDep;
214  for ( auto o : globalInp ) {
215  if ( globalOutp.find( o ) == globalOutp.end() ) {
216  unmetDep.insert( o );
217  }
218  }
219 
220  if ( unmetDep.size() > 0 ) {
221 
222  std::ostringstream ost;
223  for ( auto& o : unmetDep ) {
224  ost << "\n o " << o << " required by Algorithm: ";
225  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
226  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
227  ost << "\n * " << m_algname_vect[i];
228  }
229  }
230  }
231 
232  if ( m_useDataLoader != "" ) {
233  // Find the DataLoader Alg
234  if (dataLoaderAlg == nullptr) {
235  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
236  << "\" found, and unmet INPUT dependencies "
237  << "detected:\n" << ost.str() << endmsg;
238  return StatusCode::FAILURE;
239  }
240 
241  info() << "Will attribute the following unmet INPUT dependencies to \""
242  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
243  << "\" Algorithm"
244  << ost.str() << endmsg;
245 
246  // Set the property Load of DataLoader Alg
247  Algorithm *dataAlg = dynamic_cast<Algorithm*>(dataLoaderAlg);
248  if ( !dataAlg ) {
249  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value()
250  << "\" IAlg to Algorithm" << endmsg;
251  return StatusCode::FAILURE;
252  }
253 
254  for (auto& id : unmetDep) {
255  debug() << "adding OUTPUT dep \"" << id << "\" to "
256  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
257  << endmsg;
259  }
260 
261  } else {
262  fatal() << "Auto DataLoading not requested, "
263  << "and the following unmet INPUT dependencies were found:"
264  << ost.str() << endmsg;
265  return StatusCode::FAILURE;
266  }
267 
268  } else {
269  info() << "No unmet INPUT data dependencies were found" << endmsg;
270  }
271  }
272 
273  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
275  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
276 
277  // Shortcut for the message service
278  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
279  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
280 
282  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
283  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
284 
285  // Clearly inform about the level of concurrency
286  info() << "Concurrency level information:" << endmsg;
287  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
288  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
289  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
290 
291  return sc;
292 }
StatusCode initialize() override
Definition: Service.cpp:64
T empty(T...args)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:715
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
SmartIF< IThreadPoolSvc > m_threadPoolSvc
StatusCode initialize(PrecedenceRulesGraph *graph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
T end(T...args)
Gaudi::Property< bool > m_checkDeps
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
Gaudi::Property< std::string > m_whiteboardSvcName
T bind(T...args)
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
T size(T...args)
T assign(T...args)
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
PrecedenceRulesGraph * getPrecedenceRulesGraph() const
Get the flow graph instance.
T for_each(T...args)
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 717 of file ForwardSchedulerSvc.cpp.

717  {
718  // Get the slot
719  EventSlot& thisSlot = m_eventSlots[iSlot];
720 
721  if ( m_actionsQueue.empty() && m_algosInFlight == 0 &&
723 
724  info() << "About to declare a stall" << endmsg;
725  fatal() << "*** Stall detected! ***\n" << endmsg;
726  dumpSchedulerState( iSlot );
727  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
728 
729  return StatusCode::FAILURE;
730  }
731  return StatusCode::SUCCESS;
732 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 475 of file ForwardSchedulerSvc.cpp.

475  {
476  unsigned int slotNum = 0;
477  for ( auto& thisSlot : m_eventSlots ) {
478  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
479  updateStates( slotNum );
480  }
481  slotNum++;
482  }
483  return StatusCode::SUCCESS;
484 }
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 490 of file ForwardSchedulerSvc.cpp.

490  {
491  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
492  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
493  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
494  // << " active: " << m_isActive << endmsg;
495  return StatusCode::FAILURE;
496  } else {
497  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
498  // << " active: " << m_isActive << endmsg;
499  m_finishedEvents.pop( eventContext );
500  m_freeSlots++;
501  if (msgLevel(MSG::DEBUG))
502  debug() << "Popped slot " << eventContext->slot() << "(event "
503  << eventContext->evt() << ")" << endmsg;
504  return StatusCode::SUCCESS;
505  }
506 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Gaudi::Property< int > m_maxEventsInFlight
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 812 of file ForwardSchedulerSvc.cpp.

812  {
813 
814  // Do the control flow
815  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
816  if (sc.isSuccess())
817  if (msgLevel(MSG::VERBOSE))
818  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
819  << si << endmsg;
820 
821  return sc;
822 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 826 of file ForwardSchedulerSvc.cpp.

826  {
827 
828  StatusCode sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
829 
830  StatusCode updateSc( StatusCode::FAILURE );
831  if ( sc == StatusCode::SUCCESS )
832  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
833 
834  if (updateSc.isSuccess())
835  if (msgLevel(MSG::VERBOSE))
836  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
837  << si<< endmsg;
838 
839  return updateSc;
840 }
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

Definition at line 909 of file ForwardSchedulerSvc.cpp.

910  {
911 
912  // Put back the instance
913  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
914  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
915  // EventContext* eventContext = castedAlgo->getContext();
916 
917  // Check if the execution failed
918  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
919  eventFailed(eventContext).ignore();
920 
921  Gaudi::Hive::setCurrentContext(eventContext);
922  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
923 
924  if ( !sc.isSuccess() ) {
925  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
926  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
927  return StatusCode::FAILURE;
928  }
929 
930  m_algosInFlight--;
931 
932  EventSlot& thisSlot = m_eventSlots[si];
933 
934  // Update the catalog: some new products may be there
935  m_whiteboard->selectStore( eventContext->slot() ).ignore();
936 
937  // update prods in the dataflow
938  // DP: Handles could be used. Just update what the algo wrote
939  DataObjIDColl new_products;
940  m_whiteboard->getNewDataObjects( new_products ).ignore();
941  for ( const auto& new_product : new_products )
942  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
943  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
944 
945  if ( msgLevel( MSG::DEBUG ) )
946  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
947  << m_algosInFlight << endmsg;
948 
949  // Limit number of updates
950  if ( m_updateNeeded ) {
951  // Schedule an update of the status of the algorithms
952  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1);
953  m_actionsQueue.push( updateAction );
954  m_updateNeeded = false;
955  }
956 
957  if ( msgLevel( MSG::DEBUG ) )
958  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
959  State state;
960  if ( algo->filterPassed() ) {
961  state = State::EVTACCEPTED;
962  } else {
963  state = State::EVTREJECTED;
964  }
965 
966  sc = thisSlot.algsStates.updateState( iAlgo, state );
967 
968  if (sc.isSuccess())
969  if (msgLevel(MSG::VERBOSE))
970  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
972 
973  return sc;
974 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 844 of file ForwardSchedulerSvc.cpp.

844  {
845 
847 
848  const std::string& algName( index2algname( iAlgo ) );
849  IAlgorithm* ialgoPtr = nullptr;
850  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
851 
852  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
853  EventContext* eventContext( m_eventSlots[si].eventContext );
854  if ( !eventContext )
855  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
856 
857  ++m_algosInFlight;
858  // prepare a scheduler action to run once the algorithm is executed
859  auto promote2ExecutedClosure = std::bind(&ForwardSchedulerSvc::promoteToExecuted,
860  this,
861  iAlgo,
862  eventContext->slot(),
863  ialgoPtr,
864  eventContext);
865  // Avoid to use tbb if the pool size is 1 and run in this thread
866  if (-100 != m_threadPoolSize) {
867 
868  // this parent task is needed to promote an Algorithm as EXECUTED,
869  // it will be started as soon as the child task (see below) is completed
870  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
871  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
872  // setting parent's refcount to 1 is made here only for consistency
873  // (in this case since it is not scheduled explicitly and there it has only one child task)
874  triggerAlgoStateUpdate->set_ref_count(1);
875  // the child task that executes an Algorithm
876  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
877  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(), m_algExecStateSvc);
878  // schedule the algoTask
879  tbb::task::enqueue( *algoTask);
880 
881  } else {
882  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext, serviceLocator(), m_algExecStateSvc);
883  theTask.execute();
884  promote2ExecutedClosure();
885  }
886 
887  if ( msgLevel( MSG::DEBUG ) )
888  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
889  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
890 
891  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
892 
893  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
894 
895  if (updateSc.isSuccess())
896  if (msgLevel(MSG::VERBOSE))
897  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
898  << si << endmsg;
899  return updateSc;
900  } else {
901  if ( msgLevel( MSG::DEBUG ) )
902  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
903  return sc;
904  }
905 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
T bind(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 412 of file ForwardSchedulerSvc.cpp.

412  {
413 
414  if ( m_first ) {
415  m_first = false;
416  }
417 
418  if ( !eventContext ) {
419  fatal() << "Event context is nullptr" << endmsg;
420  return StatusCode::FAILURE;
421  }
422 
423  if ( m_freeSlots.load() == 0 ) {
424  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
425  return StatusCode::FAILURE;
426  }
427 
428  // no problem as push new event is only called from one thread (event loop manager)
429  m_freeSlots--;
430 
431  auto action = [this, eventContext]() -> StatusCode {
432  // Event processing slot forced to be the same as the wb slot
433  const unsigned int thisSlotNum = eventContext->slot();
434  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
435  if ( !thisSlot.complete ) {
436  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
437  return StatusCode::FAILURE;
438  }
439 
440  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
441  thisSlot.reset( eventContext );
442 
443  return this->updateStates( thisSlotNum );
444  }; // end of lambda
445 
446  // Kick off the scheduling!
447  if ( msgLevel( MSG::VERBOSE ) ) {
448  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
449  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
450  }
451  m_actionsQueue.push( action );
452 
453  return StatusCode::SUCCESS;
454 }
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 457 of file ForwardSchedulerSvc.cpp.

457  {
458  StatusCode sc;
459  for ( auto context : eventContexts ) {
460  sc = pushNewEvent( context );
461  if ( sc != StatusCode::SUCCESS ) return sc;
462  }
463  return sc;
464 }
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 512 of file ForwardSchedulerSvc.cpp.

512  {
513  if ( m_finishedEvents.try_pop( eventContext ) ) {
514  if ( msgLevel( MSG::DEBUG ) )
515  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
516  << endmsg;
517  m_freeSlots++;
518  return StatusCode::SUCCESS;
519  }
520  return StatusCode::FAILURE;
521 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 575 of file ForwardSchedulerSvc.cpp.

575  {
576 
577  m_updateNeeded = true;
578 
579  // Fill a map of initial state / action using closures.
580  // done to update the states w/o several if/elses
581  // Posterchild for constexpr with gcc4.7 onwards!
582  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
583  statesTransitions = {
584  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
585  this,
586  std::placeholders::_1,
587  std::placeholders::_2)},
588  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
589  this,
590  std::placeholders::_1,
591  std::placeholders::_2)}
592  };*/
593 
594  StatusCode global_sc( StatusCode::FAILURE, true );
595 
596  // Sort from the oldest to the newest event
597  // Prepare a vector of pointers to the slots to avoid copies
598  std::vector<EventSlot*> eventSlotsPtrs;
599 
600  // Consider all slots if si <0 or just one otherwise
601  if ( si < 0 ) {
602  const int eventsSlotsSize( m_eventSlots.size() );
603  eventSlotsPtrs.reserve( eventsSlotsSize );
604  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
605  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
606  }
607  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
608  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
609  } else {
610  eventSlotsPtrs.push_back( &m_eventSlots[si] );
611  }
612 
613  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
614  int iSlot = thisSlotPtr->eventContext->slot();
615 
616  // Cache the states of the algos to improve readability and performance
617  auto& thisSlot = m_eventSlots[iSlot];
618  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
619 
620  // Take care of the control ready update
621  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
622 
623  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
624  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
625  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
626  if (algState==AlgsExecutionStates::ERROR)
627  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
628  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
629  partial_sc=StatusCode::SUCCESS;
630  for (auto state_transition = statesTransitions.find(algState);
631  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
632  state_transition++){
633  partial_sc = state_transition->second(iAlgo,iSlot);
634  if (partial_sc.isFailure()){
635  verbose() << "Could not apply transition from "
636  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
637  << " for algorithm " << index2algname(iAlgo)
638  << " on processing slot " << iSlot << endmsg;
639  }
640  else{global_sc=partial_sc;}
641  } // end loop on transitions
642  }*/ // end loop on algos
643 
644  StatusCode partial_sc( StatusCode::FAILURE, true );
645  // first update CONTROLREADY to DATAREADY
646  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
647  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
648 
649  uint algIndex = *it;
650  partial_sc = promoteToDataReady(algIndex, iSlot);
651  if (partial_sc.isFailure())
652  if (msgLevel(MSG::VERBOSE))
653  verbose() << "Could not apply transition from "
654  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
655  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
656  }
657 
658  // now update DATAREADY to SCHEDULED
659  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
660  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
661  uint algIndex = *it;
662 
663  partial_sc = promoteToScheduled( algIndex, iSlot );
664 
665  if (msgLevel(MSG::VERBOSE))
666  if (partial_sc.isFailure())
667  verbose() << "Could not apply transition from "
668  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
669  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
670  }
671 
672  // Not complete because this would mean that the slot is already free!
673  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
674  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
675  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
676  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
677 
678  thisSlot.complete = true;
679  // if the event did not fail, add it to the finished events
680  // otherwise it is taken care of in the error handling already
681  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
682  m_finishedEvents.push(thisSlot.eventContext);
683  if (msgLevel(MSG::DEBUG))
684  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
685  << thisSlot.eventContext->slot() << ")." << endmsg;
686  }
687  // now let's return the fully evaluated result of the control flow
688  if ( msgLevel( MSG::DEBUG ) ) {
690  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
691  debug() << ss.str() << endmsg;
692  }
693 
694  thisSlot.eventContext = nullptr;
695  } else {
696  StatusCode eventStalledSC = isStalled(iSlot);
697  if (! eventStalledSC.isSuccess()) {
698  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
699  eventFailed(thisSlot.eventContext).ignore();
700  }
701  }
702  } // end loop on slots
703 
704  verbose() << "States Updated." << endmsg;
705 
706  return global_sc;
707 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
ContextID_t slot() const
Definition: EventContext.h:41
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
ContextEvt_t evt() const
Definition: EventContext.h:40
T push_back(T...args)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
void ignore() const
Definition: StatusCode.h:106
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 209 of file ForwardSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> ForwardSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 172 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 148 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 154 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::vector<std::vector<std::string> > > ForwardSchedulerSvc::m_algosDependencies
private
Initial value:
{
this, "AlgosDependencies", {}, "[[deprecated]]"}

Definition at line 123 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 177 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 201 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_checkDeps
private
Initial value:
{this, "CheckDependencies", false,
"Runtime check of Algorithm Data Dependencies"}

Definition at line 125 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 229 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 160 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 166 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first = true
private

Definition at line 234 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 163 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 139 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxAlgosInFlight
private
Initial value:
{this, "MaxAlgosInFlight", 1,
"[[deprecated]] Taken from the whiteboard"}

Definition at line 121 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_maxEventsInFlight
private
Initial value:
{this, "MaxEventsInFlight", 0,
"Maximum number of event processed simultaneously"}

Definition at line 116 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 266 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 265 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 142 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_threadPoolSize
private
Initial value:
{this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 118 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 232 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 197 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 127 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 157 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 120 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: