ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

class  SchedulerState
 

Public Member Functions

 ForwardSchedulerSvc (const std::string &name, ISvcLocator *svc)
 Constructor. More...
 
 ~ForwardSchedulerSvc ()
 Destructor. More...
 
virtual StatusCode initialize ()
 Initialise. More...
 
virtual StatusCode finalize ()
 Finalise. More...
 
virtual StatusCode pushNewEvent (EventContext *eventContext)
 Make an event available to the scheduler. More...
 
virtual StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts)
 
virtual StatusCode popFinishedEvent (EventContext *&eventContext)
 Blocks until an event is availble. More...
 
virtual StatusCode tryPopFinishedEvent (EventContext *&eventContext)
 Try to fetch an event from the scheduler. More...
 
virtual unsigned int freeSlots ()
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState ()
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
StatusCode setProperty (const Property &p) override
 
StatusCode setProperty (const std::string &s) override
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 
StatusCode getProperty (Property *p) const override
 
const PropertygetProperty (const std::string &name) const override
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 
const std::vector< Property * > & getProperties () const override
 
bool hasProperty (const std::string &name) const override
 
template<class TYPE >
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
PropertydeclareProperty (const std::string &name, T &property, const std::string &doc="none") const
 Declare the named property. More...
 
PropertydeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="") const
 Declare remote named properties. More...
 
template<class T >
StatusCode declarePrivateTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Private tool. More...
 
template<class T >
StatusCode declarePublicTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Public tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces...>
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
typedef std::function< StatusCode()> action
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, const std::string &algo_name=std::string())
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

std::atomic< ActivationStatem_isActive
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::string m_whiteboardSvcName
 The whiteboard name. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
int m_maxEventsInFlight
 Maximum number of event processed simultaneously. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
unsigned int m_maxAlgosInFlight
 Maximum number of simultaneous algorithms. More...
 
unsigned int m_algosInFlight
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
std::vector< std::vector< std::string > > m_algosDependencies
 DEPRECATED! More...
 
int m_threadPoolSize
 Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
bool m_CFNext
 
bool m_DFNext
 
bool m_simulateExecution
 
std::string m_optimizationMode
 
bool m_dumpIntraEventDynamics
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first
 
bool m_checkDeps
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Friends

class AlgoExecutionTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces...>
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
IntegerProperty m_outputLevel = MSG::NIL
 Service output level. More...
 
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms. This is a forward scheduler: algorithms are scheduled for execution as soon as their data dependencies are available in the whiteboard.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 73 of file ForwardSchedulerSvc.h.

Member Typedef Documentation

Definition at line 203 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 106 of file ForwardSchedulerSvc.h.

Constructor & Destructor Documentation

ForwardSchedulerSvc::ForwardSchedulerSvc ( const std::string name,
ISvcLocator svc 
)

Constructor.

Deprecated

Definition at line 37 of file ForwardSchedulerSvc.cpp.

37  :
38  base_class(name,svcLoc),
40  m_algosInFlight(0),
41  m_updateNeeded(true),
42  m_first(true), m_checkDeps(false)
43 
44 {
45  declareProperty("MaxEventsInFlight", m_maxEventsInFlight = 0 );
46  declareProperty("ThreadPoolSize", m_threadPoolSize = -1 );
47  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc" );
48  declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 0, "Taken from the whiteboard. Deprecated" );
49  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
50  declareProperty("useGraphFlowManagement", m_CFNext = false );
51  declareProperty("DataFlowManagerNext", m_DFNext = false );
52  declareProperty("SimulateExecution", m_simulateExecution = false );
53  declareProperty("Optimizer", m_optimizationMode = "",
54  "The following modes are currently available: PCE, COD, DRE, E" );
55  declareProperty("DumpIntraEventDynamics", m_dumpIntraEventDynamics = false,
56  "Dump intra-event concurrency dynamics to csv file" );
57 
59  declareProperty("AlgosDependencies", m_algosDependencies);
60 
61  declareProperty("CheckDependencies", m_checkDeps = false);
62 
63 }
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
extends base_class
Typedef to this class.
Definition: extends.h:14
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::string m_whiteboardSvcName
The whiteboard name.
bool m_updateNeeded
Keep track of update actions scheduled.
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Property * declareProperty(const std::string &name, T &property, const std::string &doc="none") const
Declare the named property.
Definition: Service.h:215
std::vector< std::vector< std::string > > m_algosDependencies
DEPRECATED!
ForwardSchedulerSvc::~ForwardSchedulerSvc ( )

Destructor.

Definition at line 66 of file ForwardSchedulerSvc.cpp.

66 {}

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 295 of file ForwardSchedulerSvc.cpp.

295  {
296 
297  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
298 
300  error() << "problems initializing ThreadPoolSvc" << endmsg;
302  return;
303  }
304 
305 
306  // Wait for actions pushed into the queue by finishing tasks.
307  action thisAction;
309 
310  m_isActive = ACTIVE;
311 
312  // Continue to wait if the scheduler is running or there is something to do
313  info() << "Start checking the actionsQueue" << endmsg;
314  while(m_isActive == ACTIVE or m_actionsQueue.size()!=0){
315  m_actionsQueue.pop(thisAction);
316  sc = thisAction();
317  if (sc!=StatusCode::SUCCESS)
318  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
319  else
320  verbose() << "Action succeeded." << endmsg;
321  }
322 
323 }
virtual StatusCode initPool(const int &poolSize)=0
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1002 of file ForwardSchedulerSvc.cpp.

1002  {
1003 
1005  m_sState.push_back(SchedulerState(a,e,t));
1006 
1007 }
static std::list< SchedulerState > m_sState
def lock(file)
Definition: locker.py:16
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 357 of file ForwardSchedulerSvc.cpp.

357  {
358  unsigned int index = m_algname_index_map[algoname];
359  return index;
360 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 333 of file ForwardSchedulerSvc.cpp.

333  {
334 
335  if (m_isActive == ACTIVE){
336  // Drain the scheduler
338  this));
339  // This would be the last action
341  }
342 
343  return StatusCode::SUCCESS;
344 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1011 of file ForwardSchedulerSvc.cpp.

1011  {
1012 
1014 
1015  for (std::list<SchedulerState>::iterator itr = m_sState.begin();
1016  itr != m_sState.end(); ++itr) {
1017  if (*itr == a) {
1018  m_sState.erase(itr);
1019  return true;
1020  }
1021  }
1022 
1023  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1024  return false;
1025 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
def lock(file)
Definition: locker.py:16
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 754 of file ForwardSchedulerSvc.cpp.

754  {
755 
756  // To have just one big message
757  std::ostringstream outputMessageStream;
758 
759  outputMessageStream
760  << "============================== Execution Task State ============================="
761  << std::endl;
762  dumpState(outputMessageStream);
763 
764  outputMessageStream
765  << std::endl
766  << "============================== Scheduler State ================================="
767  << std::endl;
768 
769  int slotCount = -1;
770  for (auto thisSlot : m_eventSlots){
771  slotCount++;
772  if ( thisSlot.complete )
773  continue;
774 
775  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
776  << " event: " << thisSlot.eventContext->evt()
777  << " -----------"<< std::endl;
778 
779  if ( 0 > iSlot or iSlot == slotCount) {
780  outputMessageStream << "Algorithms states:" << std::endl;
781 
782  const DataObjIDColl& wbSlotContent ( thisSlot.dataFlowMgr.content() );
783  for (unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
784  outputMessageStream << " o " << index2algname(algoIdx)
785  << " [" << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]]
786  << "] Data deps: ";
787  DataObjIDColl deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
788  const int depsSize=deps.size();
789  if (depsSize==0)
790  outputMessageStream << " none";
791 
792  DataObjIDColl missing;
793  for (auto d: deps) {
794  outputMessageStream << d << " ";
795  if ( wbSlotContent.find(d) == wbSlotContent.end() ) {
796  // outputMessageStream << "[missing] ";
797  missing.insert(d);
798  }
799  }
800 
801  if (! missing.empty()) {
802  outputMessageStream << ". The following are missing: ";
803  for (auto d: missing) {
804  outputMessageStream << d << " ";
805  }
806  }
807 
808  outputMessageStream << std::endl;
809  }
810 
811  // Snapshot of the WhiteBoard
812  outputMessageStream << "\nWhiteboard contents: "<< std::endl;
813  for (auto& product : wbSlotContent )
814  outputMessageStream << " o " << product << std::endl;
815 
816  // Snapshot of the ControlFlow
817  outputMessageStream << "\nControl Flow:" << std::endl;
818  std::stringstream cFlowStateStringStream;
819  m_efManager.printEventState(cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState,0);
820 
821  outputMessageStream << cFlowStateStringStream.str() << std::endl;
822  }
823  }
824 
825  outputMessageStream
826  << "=================================== END ======================================"
827  << std::endl;
828 
829  info() << "Dumping Scheduler State " << std::endl
830  << outputMessageStream.str() << endmsg;
831 
832 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )

Definition at line 1041 of file ForwardSchedulerSvc.cpp.

1041  {
1042 
1044 
1045  std::ostringstream ost;
1046  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1047  dumpState(ost);
1048 
1049  info() << ost.str() << endmsg;
1050 
1051 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
def lock(file)
Definition: locker.py:16
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1029 of file ForwardSchedulerSvc.cpp.

1029  {
1030 
1032 
1033  for (auto it : m_sState) {
1034  ost << " " << it << std::endl;
1035  }
1036 
1037 }
T endl(T...args)
static std::list< SchedulerState > m_sState
def lock(file)
Definition: locker.py:16
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 496 of file ForwardSchedulerSvc.cpp.

496  {
497 
498  // Set the number of slots available to an error code
499  m_freeSlots.store(0);
500 
501  fatal() << "*** Event " << eventContext->evt() << " on slot "
502  << eventContext->slot() << " failed! ***" << endmsg;
503 
504  dumpSchedulerState(-1);
505 
506  // Empty queue and deactivate the service
507  action thisAction;
508  while(m_actionsQueue.try_pop(thisAction)){};
509  deactivate();
510 
511  // Push into the finished events queue the failed context
512  EventContext* thisEvtContext;
513  while(m_finishedEvents.try_pop(thisEvtContext)) { m_finishedEvents.push(thisEvtContext); };
514  m_finishedEvents.push(eventContext);
515 
516  return StatusCode::FAILURE;
517 
518 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:41
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::finalize ( )
virtual

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 266 of file ForwardSchedulerSvc.cpp.

266  {
267 
269  if (!sc.isSuccess())
270  warning () << "Base class could not be finalized" << endmsg;
271 
272  sc = deactivate();
273  if (!sc.isSuccess())
274  warning () << "Scheduler could not be deactivated" << endmsg;
275 
276  info() << "Joining Scheduler thread" << endmsg;
277  m_thread.join();
278 
279  //m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
280 
281  return sc;
282 
283  }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:193
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int ForwardSchedulerSvc::freeSlots ( )
virtual

Get free slots number.

Definition at line 433 of file ForwardSchedulerSvc.cpp.

433  {
434  return std::max(m_freeSlots.load(),0);
435 }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 351 of file ForwardSchedulerSvc.cpp.

351  {
352  return m_algname_vect[index];
353 }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
virtual

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 74 of file ForwardSchedulerSvc.cpp.

74  {
75 
76  // Initialise mother class (read properties, ...)
78  if (!sc.isSuccess())
79  warning () << "Base class could not be initialized" << endmsg;
80 
81  // Get hold of the TBBSvc. This should initialize the thread pool
82  m_threadPoolSvc = serviceLocator()->service("ThreadPoolSvc");
83  if (!m_threadPoolSvc.isValid()) {
84  fatal() << "Error retrieving ThreadPoolSvc" << endreq;
85  return StatusCode::FAILURE;
86  }
87 
88 
89  // Activate the scheduler in another thread.
90  info() << "Activating scheduler in a separate thread" << endmsg;
92  this));
93 
94  while(m_isActive != ACTIVE) {
95  if (m_isActive == FAILURE) {
96  fatal() << "Terminating initialization" << endmsg;
97  return StatusCode::FAILURE;
98  } else {
99  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
100  sleep(1);
101  }
102  }
103 
104  // Get the algo resource pool
105  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
106  if (!m_algResourcePool.isValid()) {
107  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
108  return StatusCode::FAILURE;
109  }
110 
111  // Get Whiteboard
113  if (!m_whiteboard.isValid()) {
114  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard."
115  << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Check the MaxEventsInFlight parameters and react
120  // Deprecated for the moment
121  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
122  if (m_maxEventsInFlight!=0){
123  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
124  << "Please migrate your code options files." << endmsg;
125 
126  if (m_maxEventsInFlight != (int)numberOfWBSlots){
127  warning() << "In addition, the number of events in flight ("
128  << m_maxEventsInFlight << ") differs from the slots in the whiteboard ("
129  << numberOfWBSlots << "). Setting the number of events in flight to "
130  << numberOfWBSlots << endmsg;
131  }
132  }
133 
134  // Align the two quantities
135  m_maxEventsInFlight = numberOfWBSlots;
136 
137  // Set the number of free slots
139 
140  if (m_algosDependencies.size() != 0) {
141  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
142  << " FIX your job options #####" << endmsg;
143  }
144 
145  // Get the list of algorithms
147  const unsigned int algsNumber = algos.size();
148  info() << "Found " << algsNumber << " algorithms" << endmsg;
149 
150  /* Dependencies
151  1) Look for handles in algo, if none
152  2) Assume none are required
153  */
154 
155  DataObjIDColl globalInp, globalOutp;
156 
157  info() << "Data Dependencies for Algorithms:";
158 
160  for (IAlgorithm* ialgoPtr : algos) {
161  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
162  if (nullptr == algoPtr)
163  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
164 
165  info() << "\n " << algoPtr->name();
166 
167  // FIXME
168  DataObjIDColl algoDependencies;
169  if (!algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty()) {
170  for (auto id : algoPtr->inputDataObjs()) {
171  info() << "\n o INPUT " << id;
172  algoDependencies.insert(id);
173  globalInp.insert(id);
174  }
175  for (auto id : algoPtr->outputDataObjs()) {
176  info() << "\n o OUTPUT " << id;
177  globalOutp.insert(id);
178  }
179  } else {
180  info() << "\n none";
181  }
182  m_algosDependencies.emplace_back(algoDependencies);
183  }
184  info() << endmsg;
185 
186  // Fill the containers to convert algo names to index
187  m_algname_vect.reserve(algsNumber);
188  unsigned int index=0;
189  for (IAlgorithm* algo : algos){
190  const std::string& name = algo->name();
191  m_algname_index_map[name]=index;
193  index++;
194  }
195 
196 
197  // Check if we have unmet global input dependencies
198  if (m_checkDeps) {
199  DataObjIDColl unmetDep;
200  for (auto o : globalInp) {
201  if (globalOutp.find(o) == globalOutp.end()) {
202  unmetDep.insert(o);
203  }
204  }
205 
206  if (unmetDep.size() > 0) {
207  fatal() << "The following unmet INPUT data dependencies were found: ";
208  for (auto &o : unmetDep) {
209  fatal() << "\n o " << o << " required by Algorithm: ";
210  for (size_t i =0; i<m_algosDependencies.size(); ++i) {
211  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
212  fatal() << "\n * " << m_algname_vect[i];
213  }
214  }
215  }
216  fatal() << endmsg;
217  return StatusCode::FAILURE;
218  } else {
219  info() << "No unmet INPUT data dependencies were found" << endmsg;
220  }
221  }
222 
223  // prepare the control flow part
224  if (m_CFNext) m_DFNext = true; //force usage of new data flow machinery when new control flow is used
225  if (!m_CFNext && !m_optimizationMode.empty()) {
226  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
227  return StatusCode::FAILURE;
228  }
229  const AlgResourcePool* algPool =
230  dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
233  unsigned int controlFlowNodeNumber =
235 
236  // Shortcut for the message service
237  SmartIF<IMessageSvc> messageSvc (serviceLocator());
238  if (!messageSvc.isValid())
239  error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
240 
241  m_eventSlots.assign(m_maxEventsInFlight,EventSlot(m_algosDependencies,algsNumber,
242  controlFlowNodeNumber,messageSvc));
243  std::for_each(m_eventSlots.begin(),m_eventSlots.end(),
244  [](EventSlot& slot){slot.complete=true;});
245 
246  // Clearly inform about the level of concurrency
247  info() << "Concurrency level information:" << endmsg;
248  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
249  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
250  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
251 
252  // Simulating execution flow by only analyzing the graph topology and logic
253  if (m_simulateExecution) {
254  auto vis = concurrency::RunSimulator(0);
256  }
257 
258  return sc;
259 
260 }
void simulateExecutionFlow(IGraphVisitor &visitor) const
StatusCode initialize() override
Definition: Service.cpp:68
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:324
T empty(T...args)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & inputDataObjs() const
Definition: Algorithm.h:599
SmartIF< IThreadPoolSvc > m_threadPoolSvc
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
STL class.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:319
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
T bind(T...args)
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
T insert(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T find(T...args)
T size(T...args)
STL class.
void activate()
Activate scheduler.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
const DataObjIDColl & outputDataObjs() const
Definition: Algorithm.h:600
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
T for_each(T...args)
list i
Definition: ana.py:128
STL class.
std::vector< std::vector< std::string > > m_algosDependencies
DEPRECATED!
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 729 of file ForwardSchedulerSvc.cpp.

729  {
730  // Get the slot
731  EventSlot& thisSlot = m_eventSlots[iSlot];
732 
733  if (m_actionsQueue.empty() &&
734  m_algosInFlight == 0 &&
736 
737  info() << "About to declare a stall" << endmsg;
738  fatal() << "*** Stall detected! ***\n" << endmsg;
739  dumpSchedulerState(iSlot);
740  //throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
741 
742  return StatusCode::FAILURE;
743  }
744  return StatusCode::SUCCESS;
745 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 441 of file ForwardSchedulerSvc.cpp.

441  {
442 
443  unsigned int slotNum=0;
444  for (auto& thisSlot : m_eventSlots){
445  if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
446  updateStates(slotNum);
447  }
448  slotNum++;
449  }
450  return StatusCode::SUCCESS;
451 }
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
virtual

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 457 of file ForwardSchedulerSvc.cpp.

457  {
458  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
459  if (m_freeSlots.load() == m_maxEventsInFlight or
460  m_isActive == INACTIVE) {
461  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
462  // << " active: " << m_isActive << endmsg;
463  return StatusCode::FAILURE;
464  } else {
465  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
466  // << " active: " << m_isActive << endmsg;
467  m_finishedEvents.pop(eventContext);
468  m_freeSlots++;
469  debug() << "Popped slot " << eventContext->slot() << "(event "
470  << eventContext->evt() << ")" << endmsg;
471  return StatusCode::SUCCESS;
472  }
473 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 836 of file ForwardSchedulerSvc.cpp.

836  {
837 
838  // Do the control flow
839  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
840  if (sc.isSuccess())
841  if (msgLevel(MSG::VERBOSE))
842  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
843  << si << endmsg;
844 
845  return sc;
846 
847 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 851 of file ForwardSchedulerSvc.cpp.

851  {
852 
853  StatusCode sc;
854  if (!m_DFNext) {
855  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
856  } else {
858  }
859 
861  if (sc == StatusCode::SUCCESS)
862  updateSc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::DATAREADY);
863 
864  if (updateSc.isSuccess())
865  if (msgLevel(MSG::VERBOSE))
866  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
867  << si<< endmsg;
868 
869  return updateSc;
870 
871 }
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 928 of file ForwardSchedulerSvc.cpp.

928  {
929 
930  // Put back the instance
931  Algorithm* castedAlgo = dynamic_cast<Algorithm*>(algo); // DP: expose context getter in IAlgo?
932  if (!castedAlgo)
933  fatal() << "The casting did not succeed!" << endmsg;
934  EventContext* eventContext = castedAlgo->getContext();
935 
936  // Check if the execution failed
937  if (eventContext->evtFail())
938  eventFailed(eventContext);
939 
940  StatusCode sc = m_algResourcePool->releaseAlgorithm(algo->name(),algo);
941 
942  if (!sc.isSuccess()) {
943  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot()
944  << "] " << "Instance of algorithm " << algo->name()
945  << " could not be properly put back." << endmsg;
946  return StatusCode::FAILURE;
947  }
948 
949  m_algosInFlight--;
950 
951  EventSlot& thisSlot = m_eventSlots[si];
952  // XXX: CF tests
953  if (!m_DFNext) {
954  // Update the catalog: some new products may be there
955  m_whiteboard->selectStore(eventContext->slot()).ignore();
956 
957  // update prods in the dataflow
958  // DP: Handles could be used. Just update what the algo wrote
959  DataObjIDColl new_products;
960  m_whiteboard->getNewDataObjects(new_products).ignore();
961  for (const auto& new_product : new_products)
962  if (msgLevel(MSG::DEBUG))
963  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
964  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
965  }
966 
967  if (msgLevel(MSG::DEBUG))
968  debug() << "Algorithm " << algo->name() << " executed in slot " << si
969  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
970 
971  // Limit number of updates
972  if (m_CFNext) m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
973  if (m_updateNeeded) {
974  // Schedule an update of the status of the algorithms
975  auto updateAction = std::bind(&ForwardSchedulerSvc::updateStates, this, -1, algo->name());
976  m_actionsQueue.push(updateAction);
977  m_updateNeeded = false;
978  }
979 
980  if (msgLevel(MSG::DEBUG))
981  debug() << "Trying to handle execution result of " << index2algname(iAlgo)
982  << " on slot " << si << endmsg;
983  State state;
984  if (algo->filterPassed()) {
985  state = State::EVTACCEPTED;
986  } else {
987  state = State::EVTREJECTED;
988  }
989 
990  sc = thisSlot.algsStates.updateState(iAlgo,state);
991 
992  if (sc.isSuccess())
993  if (msgLevel(MSG::VERBOSE))
994  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
996 
997  return sc;
998 }
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
EventContext * getContext() const
get the context
Definition: Algorithm.h:571
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
bool evtFail() const
Definition: EventContext.h:43
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:108
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 875 of file ForwardSchedulerSvc.cpp.

875  {
876 
878  return StatusCode::FAILURE;
879 
880  const std::string& algName(index2algname(iAlgo));
881 
882  IAlgorithm* ialgoPtr=nullptr;
883  StatusCode sc ( m_algResourcePool->acquireAlgorithm(algName,ialgoPtr) );
884 
885  if (sc.isSuccess()) {
886  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
887  EventContext* eventContext ( m_eventSlots[si].eventContext );
888  if (!eventContext)
889  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si<< ")" << endmsg;
890 
891  algoPtr->setContext(m_eventSlots[si].eventContext);
892  ++m_algosInFlight;
893  // Avoid to use tbb if the pool size is 1 and run in this thread
894  if (-100 != m_threadPoolSize) {
895  tbb::task* t = new( tbb::task::allocate_root() ) AlgoExecutionTask(ialgoPtr, iAlgo, serviceLocator(), this);
896  tbb::task::enqueue( *t);
897  } else {
898  AlgoExecutionTask theTask(ialgoPtr, iAlgo, serviceLocator(), this);
899  theTask.execute();
900  }
901 
902  if (msgLevel(MSG::DEBUG))
903  debug() << "Algorithm " << algName << " was submitted on event "
904  << eventContext->evt() << " in slot " << si
905  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
906 
907  StatusCode updateSc ( m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::SCHEDULED) );
908 
910 
911  if (updateSc.isSuccess())
912  if (msgLevel(MSG::VERBOSE))
913  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
914  << si << endmsg;
915  return updateSc;
916  } else {
917  if (msgLevel(MSG::DEBUG))
918  debug() << "Could not acquire instance for algorithm " << index2algname(iAlgo) << " on slot " << si << endmsg;
919  return sc;
920  }
921 
922 }
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:324
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:574
STL class.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend class AlgoExecutionTask
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
virtual

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 370 of file ForwardSchedulerSvc.cpp.

370  {
371 
372  if (m_first) {
373  m_first = false;
374  }
375 
376  if (!eventContext){
377  fatal() << "Event context is nullptr" << endmsg;
378  return StatusCode::FAILURE;
379  }
380 
381  if (m_freeSlots.load() == 0) {
382  if (msgLevel(MSG::DEBUG))
383  debug() << "A free processing slot could not be found." << endmsg;
384  return StatusCode::FAILURE;
385  }
386 
387  //no problem as push new event is only called from one thread (event loop manager)
388  m_freeSlots--;
389 
390  auto action = [this,eventContext] () -> StatusCode {
391  // Event processing slot forced to be the same as the wb slot
392  const unsigned int thisSlotNum = eventContext->slot();
393  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
394  if (!thisSlot.complete) {
395  fatal() << "The slot " << thisSlotNum
396  << " is supposed to be a finished event but it's not" << endmsg;
397  return StatusCode::FAILURE;
398  }
399 
400  info() << "Executing event " << eventContext->evt() << " on slot "
401  << thisSlotNum << endmsg;
402  thisSlot.reset(eventContext);
403  // XXX: CF tests
404  if (m_CFNext) {
405  auto vis = concurrency::Trigger(thisSlotNum);
407  }
408 
409  return this->updateStates(thisSlotNum);
410  }; // end of lambda
411 
412  // Kick off the scheduling!
413  if (msgLevel(MSG::VERBOSE)) {
414  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
415  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
416  }
417  m_actionsQueue.push(action);
418 
419  return StatusCode::SUCCESS;
420 }
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
A visitor, performing full top-down traversals of a graph.
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
virtual

Definition at line 423 of file ForwardSchedulerSvc.cpp.

423  {
424  StatusCode sc;
425  for (auto context : eventContexts){
426  sc = pushNewEvent(context);
427  if (sc != StatusCode::SUCCESS) return sc;
428  }
429  return sc;
430 }
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
virtual

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 479 of file ForwardSchedulerSvc.cpp.

479  {
480  if (m_finishedEvents.try_pop(eventContext)) {
481  if (msgLevel(MSG::DEBUG))
482  debug() << "Try Pop successful slot " << eventContext->slot()
483  << "(event " << eventContext->evt() << ")" << endmsg;
484  m_freeSlots++;
485  return StatusCode::SUCCESS;
486  }
487  return StatusCode::FAILURE;
488 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1,
const std::string algo_name = std::string() 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 534 of file ForwardSchedulerSvc.cpp.

534  {
535 
536  m_updateNeeded=true;
537 
538  // Fill a map of initial state / action using closures.
539  // done to update the states w/o several if/elses
540  // Posterchild for constexpr with gcc4.7 onwards!
541  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
542  statesTransitions = {
543  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
544  this,
545  std::placeholders::_1,
546  std::placeholders::_2)},
547  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
548  this,
549  std::placeholders::_1,
550  std::placeholders::_2)}
551  };*/
552 
553  StatusCode global_sc(StatusCode::FAILURE,true);
554 
555  // Sort from the oldest to the newest event
556  // Prepare a vector of pointers to the slots to avoid copies
557  std::vector<EventSlot*> eventSlotsPtrs;
558 
559  // Consider all slots if si <0 or just one otherwise
560  if (si<0) {
561  const int eventsSlotsSize(m_eventSlots.size());
562  eventSlotsPtrs.reserve(eventsSlotsSize);
563  for (auto slotIt=m_eventSlots.begin();slotIt!=m_eventSlots.end();slotIt++) {
564  if (!slotIt->complete)
565  eventSlotsPtrs.push_back(&(*slotIt));
566  }
567  std::sort(eventSlotsPtrs.begin(),
568  eventSlotsPtrs.end(),
569  [](EventSlot* a, EventSlot* b) {return a->eventContext->evt() < b->eventContext->evt();});
570  } else {
571  eventSlotsPtrs.push_back(&m_eventSlots[si]);
572  }
573 
574  for (EventSlot* thisSlotPtr : eventSlotsPtrs) {
575  int iSlot = thisSlotPtr->eventContext->slot();
576 
577  // Cache the states of the algos to improve readability and performance
578  auto& thisSlot = m_eventSlots[iSlot];
579  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
580 
581  // Take care of the control ready update
582  // XXX: CF tests
583  if (!m_CFNext) {
584  m_efManager.updateEventState(thisAlgsStates,thisSlot.controlFlowState);
585  } else {
586  if (!algo_name.empty())
587  m_efManager.updateDecision(algo_name,iSlot,thisAlgsStates,thisSlot.controlFlowState);
588  }
589 
590 
591  //DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
592  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
593  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
594  if (algState==AlgsExecutionStates::ERROR)
595  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
596  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
597  partial_sc=StatusCode::SUCCESS;
598  for (auto state_transition = statesTransitions.find(algState);
599  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
600  state_transition++){
601  partial_sc = state_transition->second(iAlgo,iSlot);
602  if (partial_sc.isFailure()){
603  verbose() << "Could not apply transition from "
604  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
605  << " for algorithm " << index2algname(iAlgo)
606  << " on processing slot " << iSlot << endmsg;
607  }
608  else{global_sc=partial_sc;}
609  } // end loop on transitions
610  }*/ // end loop on algos
611 
612 
613  StatusCode partial_sc(StatusCode::FAILURE,true);
614  //first update CONTROLREADY to DATAREADY
615  if (!m_CFNext) {
616  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::CONTROLREADY);
617  it != thisAlgsStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
618 
619  uint algIndex = *it;
620  partial_sc = promoteToDataReady(algIndex, iSlot);
621  if (partial_sc.isFailure())
622  if (msgLevel(MSG::DEBUG))
623  verbose() << "Could not apply transition from "
624  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
625  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
626  }
627  }
628 
629  //now update DATAREADY to SCHEDULED
630  if (!m_optimizationMode.empty()) {
631  auto comp_nodes = [this] (const uint& i,const uint& j) {
634  };
636  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
637  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it)
638  buffer.push(*it);
639  /*std::stringstream s;
640  auto buffer2 = buffer;
641  while (!buffer2.empty()) {
642  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
643  buffer2.pop();
644  }
645  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
646 
647  while (!buffer.empty()) {
648  partial_sc = promoteToScheduled(buffer.top(), iSlot);
649  if (partial_sc.isFailure())
650  if (msgLevel(MSG::DEBUG))
651  verbose() << "Could not apply transition from "
652  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
653  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
654  buffer.pop();
655  }
656 
657  } else {
658  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
659  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it) {
660  uint algIndex = *it;
661  partial_sc = promoteToScheduled(algIndex, iSlot);
662  if (partial_sc.isFailure())
663  if (msgLevel(MSG::DEBUG))
664  verbose() << "Could not apply transition from "
665  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
666  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
667 
668  }
669  }
670 
673  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY)
674  << ", " << thisAlgsStates.sizeOfSubset(State::DATAREADY)
675  << ", " << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << "\n";
676  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
677  : std::to_string(tbb::task_scheduler_init::default_num_threads());
678  std::ofstream myfile;
679  myfile.open("IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app);
680  myfile << s.str();
681  myfile.close();
682  }
683 
684 
685  // Not complete because this would mean that the slot is already free!
686  if (!thisSlot.complete &&
687  m_efManager.rootDecisionResolved(thisSlot.controlFlowState) &&
688  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::CONTROLREADY) &&
689  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::DATAREADY) &&
690  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::SCHEDULED)) {
691 
692  thisSlot.complete=true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling already
695  if (!thisSlot.eventContext->evtFail()) {
696  m_finishedEvents.push(thisSlot.eventContext);
697  if (msgLevel(MSG::DEBUG))
698  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
699  << thisSlot.eventContext->slot() << ")." << endmsg;
700  }
701  // now let's return the fully evaluated result of the control flow
702  if (msgLevel(MSG::DEBUG)) {
704  m_efManager.printEventState(ss, thisSlot.algsStates, thisSlot.controlFlowState,0);
705  debug() << ss.str() << endmsg;
706  }
707 
708  thisSlot.eventContext= nullptr;
709  } else {
710  StatusCode eventStalledSC = isStalled(iSlot);
711  if (! eventStalledSC.isSuccess())
712  eventFailed(thisSlot.eventContext);
713  }
714  } // end loop on slots
715 
716  verbose() << "States Updated." << endmsg;
717 
718  return global_sc;
719 }
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T open(T...args)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
ContextID_t slot() const
Definition: EventContext.h:41
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T end(T...args)
size_t sizeOfSubset(State state) const
ContextEvt_t evt() const
Definition: EventContext.h:40
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
T str(T...args)
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
list i
Definition: ana.py:128
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Friends And Related Function Documentation

friend class AlgoExecutionTask
friend

Definition at line 222 of file ForwardSchedulerSvc.h.

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 206 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string,unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 130 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 136 of file ForwardSchedulerSvc.h.

std::vector<std::vector<std::string> > ForwardSchedulerSvc::m_algosDependencies
private

DEPRECATED!

Definition at line 193 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight
private

Number of algoritms presently in flight.

Definition at line 166 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 190 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_CFNext
private

Definition at line 211 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_checkDeps
private

Definition at line 279 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_DFNext
private

Definition at line 213 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_dumpIntraEventDynamics
private

Definition at line 219 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 209 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 145 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 154 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first
private

Definition at line 227 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 151 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive
private

Flag to track if the scheduler is active or not.

Definition at line 121 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_maxAlgosInFlight
private

Maximum number of simultaneous algorithms.

Definition at line 163 of file ForwardSchedulerSvc.h.

int ForwardSchedulerSvc::m_maxEventsInFlight
private

Maximum number of event processed simultaneously.

Definition at line 148 of file ForwardSchedulerSvc.h.

std::string ForwardSchedulerSvc::m_optimizationMode
private

Definition at line 217 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_simulateExecution
private

Definition at line 215 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 269 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 268 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 124 of file ForwardSchedulerSvc.h.

int ForwardSchedulerSvc::m_threadPoolSize
private

Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose.

Definition at line 199 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 225 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded
private

Keep track of update actions scheduled.

Definition at line 186 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 139 of file ForwardSchedulerSvc.h.

std::string ForwardSchedulerSvc::m_whiteboardSvcName
private

The whiteboard name.

Definition at line 142 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: