ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

class  SchedulerState
 

Public Member Functions

 ~ForwardSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declarePrivateTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Private tool. More...
 
template<class T >
StatusCode declarePublicTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Public tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleBase &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, GaudiHandleArrayBase &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandleBase &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
typedef std::function< StatusCode()> action
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, const std::string &algo_name=std::string())
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_maxEventsInFlight
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxAlgosInFlight
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_CFNext
 
Gaudi::Property< bool > m_DFNext
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "[[deprecated]]"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first = true
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Friends

class AlgoExecutionTask
 
class IOBoundAlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms. This is a forward scheduler: algorithms are scheduled for execution as soon as their data dependencies are available in the whiteboard.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 84 of file ForwardSchedulerSvc.h.

Member Typedef Documentation

Definition at line 235 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

ForwardSchedulerSvc::~ForwardSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 317 of file ForwardSchedulerSvc.cpp.

318 {
319 
320  if (msgLevel(MSG::DEBUG))
321  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
322 
324  error() << "problems initializing ThreadPoolSvc" << endmsg;
326  return;
327  }
328 
329  // Wait for actions pushed into the queue by finishing tasks.
330  action thisAction;
332 
333  m_isActive = ACTIVE;
334 
335  // Continue to wait if the scheduler is running or there is something to do
336  info() << "Start checking the actionsQueue" << endmsg;
337  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
338  m_actionsQueue.pop( thisAction );
339  sc = thisAction();
340  if ( sc != StatusCode::SUCCESS )
341  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
342  else
343  verbose() << "Action succeeded." << endmsg;
344  }
345 
346  info() << "Terminating thread-pool resources" << endmsg;
348  error() << "Problems terminating thread pool" << endmsg;
350  }
351 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::function< StatusCode()> action
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1187 of file ForwardSchedulerSvc.cpp.

1188 {
1189 
1191  m_sState.push_back( SchedulerState( a, e, t ) );
1192 }
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 386 of file ForwardSchedulerSvc.cpp.

387 {
388  unsigned int index = m_algname_index_map[algoname];
389  return index;
390 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 361 of file ForwardSchedulerSvc.cpp.

362 {
363 
364  if ( m_isActive == ACTIVE ) {
365  // Drain the scheduler
367  // This would be the last action
368  m_actionsQueue.push( [this]() -> StatusCode {
370  return StatusCode::SUCCESS;
371  } );
372  }
373 
374  return StatusCode::SUCCESS;
375 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1195 of file ForwardSchedulerSvc.cpp.

1196 {
1197 
1199 
1200  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1201  if ( *itr == a ) {
1202  m_sState.erase( itr );
1203  return true;
1204  }
1205  }
1206 
1207  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1208  return false;
1209 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
T lock(T...args)
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 829 of file ForwardSchedulerSvc.cpp.

830 {
831 
832  // To have just one big message
833  std::ostringstream outputMessageStream;
834 
835  outputMessageStream << "============================== Execution Task State ============================="
836  << std::endl;
837  dumpState( outputMessageStream );
838 
839  outputMessageStream << std::endl
840  << "============================== Scheduler State ================================="
841  << std::endl;
842 
843  int slotCount = -1;
844  for ( auto thisSlot : m_eventSlots ) {
845  slotCount++;
846  if ( thisSlot.complete ) continue;
847 
848  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
849  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
850 
851  if ( 0 > iSlot or iSlot == slotCount ) {
852  outputMessageStream << "Algorithms states:" << std::endl;
853 
854  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
855  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
856  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
857  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
858  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
859  const int depsSize = deps.size();
860  if ( depsSize == 0 ) outputMessageStream << " none";
861 
862  DataObjIDColl missing;
863  for ( auto d : deps ) {
864  outputMessageStream << d << " ";
865  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
866  // outputMessageStream << "[missing] ";
867  missing.insert( d );
868  }
869  }
870 
871  if ( !missing.empty() ) {
872  outputMessageStream << ". The following are missing: ";
873  for ( auto d : missing ) {
874  outputMessageStream << d << " ";
875  }
876  }
877 
878  outputMessageStream << std::endl;
879  }
880 
881  // Snapshot of the WhiteBoard
882  outputMessageStream << "\nWhiteboard contents: " << std::endl;
883  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
884 
885  // Snapshot of the ControlFlow
886  outputMessageStream << "\nControl Flow:" << std::endl;
887  std::stringstream cFlowStateStringStream;
888  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
889 
890  outputMessageStream << cFlowStateStringStream.str() << std::endl;
891  }
892  }
893 
894  outputMessageStream << "=================================== END ======================================" << std::endl;
895 
896  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
897 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )
override

Definition at line 1223 of file ForwardSchedulerSvc.cpp.

1224 {
1225 
1227 
1228  std::ostringstream ost;
1229  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1230  dumpState( ost );
1231 
1232  info() << ost.str() << endmsg;
1233 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1212 of file ForwardSchedulerSvc.cpp.

1213 {
1214 
1216 
1217  for ( auto it : m_sState ) {
1218  ost << " " << it << std::endl;
1219  }
1220 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 526 of file ForwardSchedulerSvc.cpp.

527 {
528 
529  // Set the number of slots available to an error code
530  m_freeSlots.store( 0 );
531 
532  fatal() << "*** Event " << eventContext->evt() << " on slot "
533  << eventContext->slot() << " failed! ***" << endmsg;
534 
535  std::ostringstream ost;
536  m_algExecStateSvc->dump(ost, *eventContext);
537 
538  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
539  << ":\n" << ost.str() << endmsg;
540 
541  dumpSchedulerState(-1);
542 
543  // Empty queue and deactivate the service
544  action thisAction;
545  while ( m_actionsQueue.try_pop( thisAction ) ) {
546  };
547  deactivate();
548 
549  // Push into the finished events queue the failed context
550  EventContext* thisEvtContext;
551  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
552  m_finishedEvents.push( thisEvtContext );
553  };
554  m_finishedEvents.push( eventContext );
555 
556  return StatusCode::FAILURE;
557 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
std::function< StatusCode()> action
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 284 of file ForwardSchedulerSvc.cpp.

285 {
286 
288  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
289 
290  sc = deactivate();
291  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
292 
293  info() << "Joining Scheduler thread" << endmsg;
294  m_thread.join();
295 
296  // Final error check after thread pool termination
297  if ( m_isActive == FAILURE ) {
298  error() << "problems in scheduler thread" << endmsg;
299  return StatusCode::FAILURE;
300  }
301 
302  // m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
303 
304  return sc;
305 }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int ForwardSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 462 of file ForwardSchedulerSvc.cpp.

462 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 382 of file ForwardSchedulerSvc.cpp.

382 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 53 of file ForwardSchedulerSvc.cpp.

54 {
55 
56  // Initialise mother class (read properties, ...)
58  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
59 
60  // Get hold of the TBBSvc. This should initialize the thread pool
61  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
62  if ( !m_threadPoolSvc.isValid() ) {
63  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
64  return StatusCode::FAILURE;
65  }
66 
67  // Activate the scheduler in another thread.
68  info() << "Activating scheduler in a separate thread" << endmsg;
70 
71  while ( m_isActive != ACTIVE ) {
72  if ( m_isActive == FAILURE ) {
73  fatal() << "Terminating initialization" << endmsg;
74  return StatusCode::FAILURE;
75  } else {
76  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
77  sleep( 1 );
78  }
79  }
80 
81  // Get the algo resource pool
82  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
83  if ( !m_algResourcePool.isValid() ) {
84  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
85  return StatusCode::FAILURE;
86  }
87 
88  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
89  if (!m_algExecStateSvc.isValid()) {
90  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
91  return StatusCode::FAILURE;
92  }
93 
94  // Get Whiteboard
96  if ( !m_whiteboard.isValid() ) {
97  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
98  return StatusCode::FAILURE;
99  }
100 
101  // Check the MaxEventsInFlight parameters and react
102  // Deprecated for the moment
103  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
104  if ( m_maxEventsInFlight != 0 ) {
105  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
106  << "Please migrate your code options files." << endmsg;
107 
108  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
109  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
110  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
111  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
112  }
113  }
114 
115  // Get dedicated scheduler for I/O-bound algorithms
116  if ( m_useIOBoundAlgScheduler ) {
119  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
120  }
121  // Align the two quantities
122  m_maxEventsInFlight = numberOfWBSlots;
123 
124  // Set the number of free slots
126 
127  if ( m_algosDependencies.size() != 0 ) {
128  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
129  << " FIX your job options #####" << endmsg;
130  }
131 
132  // Get the list of algorithms
134  const unsigned int algsNumber = algos.size();
135  info() << "Found " << algsNumber << " algorithms" << endmsg;
136 
137  /* Dependencies
138  1) Look for handles in algo, if none
139  2) Assume none are required
140  */
141 
142  DataObjIDColl globalInp, globalOutp;
143 
144  // figure out all outputs
145  for (IAlgorithm* ialgoPtr : algos) {
146  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
147  if (!algoPtr) {
148  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
149  }
150  for (auto id : algoPtr->outputDataObjs()) {
151  auto r = globalOutp.insert(id);
152  if (!r.second) {
153  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
154  }
155  }
156  }
157  info() << "outputs:\n" ;
158  for (const auto& i : globalOutp ) {
159  info() << i << '\n' ;
160  }
161  info() << endmsg;
162 
163 
164 
165  info() << "Data Dependencies for Algorithms:";
166 
168  for ( IAlgorithm* ialgoPtr : algos ) {
169  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
170  if ( nullptr == algoPtr )
171  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
172 
173  info() << "\n " << algoPtr->name();
174 
175  // FIXME
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( auto id : algoPtr->inputDataObjs() ) {
179  info() << "\n o INPUT " << id;
180  if (id.key().find(":")!=std::string::npos) {
181  info() << " contains alternatives which require resolution... " << endmsg;
182  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
183  auto itok = std::find_if( tokens.begin(), tokens.end(),
184  [&](const std::string& t) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if (itok!=tokens.end()) {
188  info() << "found matching output for " << *itok << " -- updating scheduler info" << endmsg;
189  id.updateKey(*itok);
190  } else {
191  error() << "failed to find alternate in global output list" << endmsg;
192  }
193  }
194  algoDependencies.insert( id );
195  globalInp.insert( id );
196  }
197  for ( auto id : algoPtr->outputDataObjs() ) {
198  info() << "\n o OUTPUT " << id;
199  if (id.key().find(":")!=std::string::npos) {
200  info() << " alternatives are NOT allowed for outputs..." << endmsg;
201  }
202  }
203  } else {
204  info() << "\n none";
205  }
206  m_algosDependencies.emplace_back( algoDependencies );
207  }
208  info() << endmsg;
209 
210  // Fill the containers to convert algo names to index
211  m_algname_vect.reserve( algsNumber );
212  unsigned int index = 0;
213  for ( IAlgorithm* algo : algos ) {
214  const std::string& name = algo->name();
215  m_algname_index_map[name] = index;
217  index++;
218  }
219 
220  // Check if we have unmet global input dependencies
221  if ( m_checkDeps ) {
222  DataObjIDColl unmetDep;
223  for ( auto o : globalInp ) {
224  if ( globalOutp.find( o ) == globalOutp.end() ) {
225  unmetDep.insert( o );
226  }
227  }
228 
229  if ( unmetDep.size() > 0 ) {
230  fatal() << "The following unmet INPUT data dependencies were found: ";
231  for ( auto& o : unmetDep ) {
232  fatal() << "\n o " << o << " required by Algorithm: ";
233  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
234  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
235  fatal() << "\n * " << m_algname_vect[i];
236  }
237  }
238  }
239  fatal() << endmsg;
240  return StatusCode::FAILURE;
241  } else {
242  info() << "No unmet INPUT data dependencies were found" << endmsg;
243  }
244  }
245 
246  // prepare the control flow part
247  if ( m_CFNext ) m_DFNext = true; // force usage of new data flow machinery when new control flow is used
248  if ( !m_CFNext && !m_optimizationMode.empty() ) {
249  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
250  return StatusCode::FAILURE;
251  }
252  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
253  sc =
255  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
256 
257  // Shortcut for the message service
258  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
259  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
260 
261  m_eventSlots.assign( m_maxEventsInFlight,
262  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
263  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
264 
265  // Clearly inform about the level of concurrency
266  info() << "Concurrency level information:" << endmsg;
267  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
268  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
269  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
270 
271  // Simulating execution flow by only analyzing the graph topology and logic
272  if ( m_simulateExecution ) {
273  auto vis = concurrency::RunSimulator( 0 );
275  }
276 
277  return sc;
278 }
void simulateExecutionFlow(IGraphVisitor &visitor) const
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
StatusCode initialize() override
Definition: Service.cpp:64
Gaudi::Property< bool > m_CFNext
T empty(T...args)
Gaudi::Property< bool > m_simulateExecution
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & inputDataObjs() const override
Definition: Algorithm.h:433
SmartIF< IThreadPoolSvc > m_threadPoolSvc
T end(T...args)
Gaudi::Property< bool > m_checkDeps
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
STL class.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< std::string > m_whiteboardSvcName
const DataObjIDColl & outputDataObjs() const override
Definition: Algorithm.h:434
T bind(T...args)
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
Gaudi::Property< int > m_maxEventsInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
Gaudi::Property< bool > m_DFNext
T insert(T...args)
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
T for_each(T...args)
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 804 of file ForwardSchedulerSvc.cpp.

805 {
806  // Get the slot
807  EventSlot& thisSlot = m_eventSlots[iSlot];
808 
809  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
811 
812  info() << "About to declare a stall" << endmsg;
813  fatal() << "*** Stall detected! ***\n" << endmsg;
814  dumpSchedulerState( iSlot );
815  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
816 
817  return StatusCode::FAILURE;
818  }
819  return StatusCode::SUCCESS;
820 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 468 of file ForwardSchedulerSvc.cpp.

469 {
470 
471  unsigned int slotNum = 0;
472  for ( auto& thisSlot : m_eventSlots ) {
473  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
474  updateStates( slotNum );
475  }
476  slotNum++;
477  }
478  return StatusCode::SUCCESS;
479 }
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 485 of file ForwardSchedulerSvc.cpp.

486 {
487  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
488  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
489  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
490  // << " active: " << m_isActive << endmsg;
491  return StatusCode::FAILURE;
492  } else {
493  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
494  // << " active: " << m_isActive << endmsg;
495  m_finishedEvents.pop( eventContext );
496  m_freeSlots++;
497  if (msgLevel(MSG::DEBUG))
498  debug() << "Popped slot " << eventContext->slot() << "(event "
499  << eventContext->evt() << ")" << endmsg;
500  return StatusCode::SUCCESS;
501  }
502 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Gaudi::Property< int > m_maxEventsInFlight
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1113 of file ForwardSchedulerSvc.cpp.

1115 {
1116 
1117  // Put back the instance
1118  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1119  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1120  // EventContext* eventContext = castedAlgo->getContext();
1121 
1122  // Check if the execution failed
1123  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1124  eventFailed(eventContext).ignore();
1125 
1126  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1127 
1128  if ( !sc.isSuccess() ) {
1129  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1130  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1131  return StatusCode::FAILURE;
1132  }
1133 
1135 
1136  EventSlot& thisSlot = m_eventSlots[si];
1137  // XXX: CF tests
1138  if ( !m_DFNext ) {
1139  // Update the catalog: some new products may be there
1140  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1141 
1142  // update prods in the dataflow
1143  // DP: Handles could be used. Just update what the algo wrote
1144  DataObjIDColl new_products;
1145  m_whiteboard->getNewDataObjects(new_products).ignore();
1146  for (const auto& new_product : new_products)
1147  if (msgLevel(MSG::DEBUG))
1148  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1149  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
1150  }
1151 
1152  if (msgLevel(MSG::DEBUG))
1153  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1154  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1155 
1156  // Limit number of updates
1157  if ( m_CFNext )
1158  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1159  if ( m_updateNeeded ) {
1160  // Schedule an update of the status of the algorithms
1161  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1162  m_actionsQueue.push( updateAction );
1163  m_updateNeeded = false;
1164  }
1165 
1166  if (msgLevel(MSG::DEBUG))
1167  debug() << "[Asynchronous] Trying to handle execution result of "
1168  << index2algname(iAlgo) << " on slot " << si << endmsg;
1169  State state;
1170  if ( algo->filterPassed() ) {
1171  state = State::EVTACCEPTED;
1172  } else {
1173  state = State::EVTREJECTED;
1174  }
1175 
1176  sc = thisSlot.algsStates.updateState( iAlgo, state );
1177 
1178  if (sc.isSuccess())
1179  if (msgLevel(MSG::VERBOSE))
1180  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1181  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1182 
1183  return sc;
1184 }
Gaudi::Property< bool > m_CFNext
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 989 of file ForwardSchedulerSvc.cpp.

990 {
991 
993 
994  // bool IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode(algName)->isIOBound();
995 
996  const std::string& algName( index2algname( iAlgo ) );
997  IAlgorithm* ialgoPtr = nullptr;
998  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
999 
1000  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
1001  EventContext* eventContext( m_eventSlots[si].eventContext );
1002  if ( !eventContext )
1003  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
1004  << endmsg;
1005 
1007  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
1008  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
1009  IOBoundAlgTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
1010  this, m_algExecStateSvc);
1011  m_IOBoundAlgScheduler->push(*theTask);
1012 
1013  if (msgLevel(MSG::DEBUG))
1014  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
1015  << eventContext->evt() << " in slot " << si
1016  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1017 
1018  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
1019 
1020  if (updateSc.isSuccess())
1021  if (msgLevel(MSG::VERBOSE))
1022  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
1023  << " to SCHEDULED on slot " << si << endmsg;
1024  return updateSc;
1025  } else {
1026  if ( msgLevel( MSG::DEBUG ) )
1027  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
1028  << si << endmsg;
1029  return sc;
1030  }
1031 }
Wrapper around I/O-bound Gaudi-algorithms.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
STL class.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode push(IAlgTask &task)=0
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 901 of file ForwardSchedulerSvc.cpp.

902 {
903 
904  // Do the control flow
905  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
906  if (sc.isSuccess())
907  if (msgLevel(MSG::VERBOSE))
908  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
909  << si << endmsg;
910 
911  return sc;
912 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 916 of file ForwardSchedulerSvc.cpp.

917 {
918 
919  StatusCode sc;
920  if ( !m_DFNext ) {
921  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
922  } else {
924  }
925 
926  StatusCode updateSc( StatusCode::FAILURE );
927  if ( sc == StatusCode::SUCCESS )
928  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
929 
930  if (updateSc.isSuccess())
931  if (msgLevel(MSG::VERBOSE))
932  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
933  << si<< endmsg;
934 
935  return updateSc;
936 }
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< bool > m_DFNext
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 1037 of file ForwardSchedulerSvc.cpp.

1039 {
1040 
1041  // Put back the instance
1042  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1043  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1044  // EventContext* eventContext = castedAlgo->getContext();
1045 
1046  // Check if the execution failed
1047  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1048  eventFailed(eventContext).ignore();
1049 
1050  Gaudi::Hive::setCurrentContext(eventContext);
1051  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1052 
1053  if ( !sc.isSuccess() ) {
1054  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1055  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1056  return StatusCode::FAILURE;
1057  }
1058 
1059  m_algosInFlight--;
1060 
1061  EventSlot& thisSlot = m_eventSlots[si];
1062  // XXX: CF tests
1063  if ( !m_DFNext ) {
1064  // Update the catalog: some new products may be there
1065  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1066 
1067  // update prods in the dataflow
1068  // DP: Handles could be used. Just update what the algo wrote
1069  DataObjIDColl new_products;
1070  m_whiteboard->getNewDataObjects( new_products ).ignore();
1071  for ( const auto& new_product : new_products )
1072  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1073  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
1074  }
1075 
1076  if ( msgLevel( MSG::DEBUG ) )
1077  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1078  << m_algosInFlight << endmsg;
1079 
1080  // Limit number of updates
1081  if ( m_CFNext )
1082  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1083  if ( m_updateNeeded ) {
1084  // Schedule an update of the status of the algorithms
1085  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1086  m_actionsQueue.push( updateAction );
1087  m_updateNeeded = false;
1088  }
1089 
1090  if ( msgLevel( MSG::DEBUG ) )
1091  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1092  State state;
1093  if ( algo->filterPassed() ) {
1094  state = State::EVTACCEPTED;
1095  } else {
1096  state = State::EVTREJECTED;
1097  }
1098 
1099  sc = thisSlot.algsStates.updateState( iAlgo, state );
1100 
1101  if (sc.isSuccess())
1102  if (msgLevel(MSG::VERBOSE))
1103  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1105 
1106  return sc;
1107 }
Gaudi::Property< bool > m_CFNext
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
GAUDI_API void setCurrentContext(const EventContext *ctx)
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 940 of file ForwardSchedulerSvc.cpp.

941 {
942 
944 
945  const std::string& algName( index2algname( iAlgo ) );
946  IAlgorithm* ialgoPtr = nullptr;
947  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
948 
949  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
950  EventContext* eventContext( m_eventSlots[si].eventContext );
951  if ( !eventContext )
952  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
953 
954  ++m_algosInFlight;
955  // Avoid to use tbb if the pool size is 1 and run in this thread
956  if (-100 != m_threadPoolSize) {
957  tbb::task* t = new( tbb::task::allocate_root() )
958  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
959  this, m_algExecStateSvc);
960  tbb::task::enqueue( *t);
961  } else {
962  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext,
964  theTask.execute();
965  }
966 
967  if ( msgLevel( MSG::DEBUG ) )
968  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
969  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
970 
971  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
972 
973  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
974 
975  if (updateSc.isSuccess())
976  if (msgLevel(MSG::VERBOSE))
977  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
978  << si << endmsg;
979  return updateSc;
980  } else {
981  if ( msgLevel( MSG::DEBUG ) )
982  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
983  return sc;
984  }
985 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend class AlgoExecutionTask
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 400 of file ForwardSchedulerSvc.cpp.

401 {
402 
403  if ( m_first ) {
404  m_first = false;
405  }
406 
407  if ( !eventContext ) {
408  fatal() << "Event context is nullptr" << endmsg;
409  return StatusCode::FAILURE;
410  }
411 
412  if ( m_freeSlots.load() == 0 ) {
413  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
414  return StatusCode::FAILURE;
415  }
416 
417  // no problem as push new event is only called from one thread (event loop manager)
418  m_freeSlots--;
419 
420  auto action = [this, eventContext]() -> StatusCode {
421  // Event processing slot forced to be the same as the wb slot
422  const unsigned int thisSlotNum = eventContext->slot();
423  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
424  if ( !thisSlot.complete ) {
425  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
426  return StatusCode::FAILURE;
427  }
428 
429  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
430  thisSlot.reset( eventContext );
431  // XXX: CF tests
432  if ( m_CFNext ) {
433  auto vis = concurrency::Trigger( thisSlotNum );
435  }
436 
437  return this->updateStates( thisSlotNum );
438  }; // end of lambda
439 
440  // Kick off the scheduling!
441  if ( msgLevel( MSG::VERBOSE ) ) {
442  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
443  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
444  }
445  m_actionsQueue.push( action );
446 
447  return StatusCode::SUCCESS;
448 }
Gaudi::Property< bool > m_CFNext
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
A visitor, performing full top-down traversals of a graph.
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
std::function< StatusCode()> action
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 451 of file ForwardSchedulerSvc.cpp.

452 {
453  StatusCode sc;
454  for ( auto context : eventContexts ) {
455  sc = pushNewEvent( context );
456  if ( sc != StatusCode::SUCCESS ) return sc;
457  }
458  return sc;
459 }
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 508 of file ForwardSchedulerSvc.cpp.

509 {
510  if ( m_finishedEvents.try_pop( eventContext ) ) {
511  if ( msgLevel( MSG::DEBUG ) )
512  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
513  << endmsg;
514  m_freeSlots++;
515  return StatusCode::SUCCESS;
516  }
517  return StatusCode::FAILURE;
518 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1,
const std::string algo_name = std::string() 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 573 of file ForwardSchedulerSvc.cpp.

574 {
575 
576  m_updateNeeded = true;
577 
578  // Fill a map of initial state / action using closures.
579  // done to update the states w/o several if/elses
580  // Posterchild for constexpr with gcc4.7 onwards!
581  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
582  statesTransitions = {
583  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
584  this,
585  std::placeholders::_1,
586  std::placeholders::_2)},
587  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
588  this,
589  std::placeholders::_1,
590  std::placeholders::_2)}
591  };*/
592 
593  StatusCode global_sc( StatusCode::FAILURE, true );
594 
595  // Sort from the oldest to the newest event
596  // Prepare a vector of pointers to the slots to avoid copies
597  std::vector<EventSlot*> eventSlotsPtrs;
598 
599  // Consider all slots if si <0 or just one otherwise
600  if ( si < 0 ) {
601  const int eventsSlotsSize( m_eventSlots.size() );
602  eventSlotsPtrs.reserve( eventsSlotsSize );
603  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
604  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
605  }
606  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
607  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
608  } else {
609  eventSlotsPtrs.push_back( &m_eventSlots[si] );
610  }
611 
612  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
613  int iSlot = thisSlotPtr->eventContext->slot();
614 
615  // Cache the states of the algos to improve readability and performance
616  auto& thisSlot = m_eventSlots[iSlot];
617  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
618 
619  // Take care of the control ready update
620  // XXX: CF tests
621  if ( !m_CFNext ) {
622  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
623  } else {
624  if ( !algo_name.empty() )
625  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
626  }
627 
628  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
629  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
630  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
631  if (algState==AlgsExecutionStates::ERROR)
632  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
633  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
634  partial_sc=StatusCode::SUCCESS;
635  for (auto state_transition = statesTransitions.find(algState);
636  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
637  state_transition++){
638  partial_sc = state_transition->second(iAlgo,iSlot);
639  if (partial_sc.isFailure()){
640  verbose() << "Could not apply transition from "
641  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
642  << " for algorithm " << index2algname(iAlgo)
643  << " on processing slot " << iSlot << endmsg;
644  }
645  else{global_sc=partial_sc;}
646  } // end loop on transitions
647  }*/ // end loop on algos
648 
649  StatusCode partial_sc( StatusCode::FAILURE, true );
650  // first update CONTROLREADY to DATAREADY
651  if ( !m_CFNext ) {
652  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
653  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
654 
655  uint algIndex = *it;
656  partial_sc = promoteToDataReady(algIndex, iSlot);
657  if (partial_sc.isFailure())
658  if (msgLevel(MSG::VERBOSE))
659  verbose() << "Could not apply transition from "
660  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
661  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
662  }
663  }
664 
665  // now update DATAREADY to SCHEDULED
666  if ( !m_optimizationMode.empty() ) {
667  auto comp_nodes = [this]( const uint& i, const uint& j ) {
670  };
672  comp_nodes, std::vector<uint>() );
673  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
674  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
675  buffer.push( *it );
676  /*std::stringstream s;
677  auto buffer2 = buffer;
678  while (!buffer2.empty()) {
679  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
680  buffer2.pop();
681  }
682  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
683 
684  /*while (!buffer.empty()) {
685  partial_sc = promoteToScheduled(buffer.top(), iSlot);
686  if (partial_sc.isFailure()) {
687  if (msgLevel(MSG::VERBOSE))
688  verbose() << "Could not apply transition from "
689  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
690  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
691  if (m_useIOBoundAlgScheduler) {
692  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
693  if (msgLevel(MSG::VERBOSE))
694  if (partial_sc.isFailure())
695  verbose() << "[Asynchronous] Could not apply transition from "
696  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
697  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
698  }
699  }
700  buffer.pop();
701  }*/
702  while ( !buffer.empty() ) {
703  bool IOBound = false;
705  IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode( index2algname( buffer.top() ) )->isIOBound();
706 
707  if ( !IOBound )
708  partial_sc = promoteToScheduled( buffer.top(), iSlot );
709  else
710  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
711 
712  if (msgLevel(MSG::VERBOSE))
713  if (partial_sc.isFailure())
714  verbose() << "Could not apply transition from "
715  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
716  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
717 
718  buffer.pop();
719  }
720 
721  } else {
722  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
723  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
724  uint algIndex = *it;
725 
726  bool IOBound = false;
729 
730  if ( !IOBound )
731  partial_sc = promoteToScheduled( algIndex, iSlot );
732  else
733  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
734 
735  if (msgLevel(MSG::VERBOSE))
736  if (partial_sc.isFailure())
737  verbose() << "Could not apply transition from "
738  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
739  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
740  }
741  }
742 
744  auto now = std::chrono::system_clock::now();
746  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
747  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
748  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
750  << "\n";
751  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
752  : std::to_string(tbb::task_scheduler_init::default_num_threads());
753  std::ofstream myfile;
754  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
755  myfile << s.str();
756  myfile.close();
757  }
758 
759  // Not complete because this would mean that the slot is already free!
760  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
761  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
762  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
763  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
764 
765  thisSlot.complete = true;
766  // if the event did not fail, add it to the finished events
767  // otherwise it is taken care of in the error handling already
768  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
769  m_finishedEvents.push(thisSlot.eventContext);
770  if (msgLevel(MSG::DEBUG))
771  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
772  << thisSlot.eventContext->slot() << ")." << endmsg;
773  }
774  // now let's return the fully evaluated result of the control flow
775  if ( msgLevel( MSG::DEBUG ) ) {
777  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
778  debug() << ss.str() << endmsg;
779  }
780 
781  thisSlot.eventContext = nullptr;
782  } else {
783  StatusCode eventStalledSC = isStalled(iSlot);
784  if (! eventStalledSC.isSuccess()) {
785  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
786  eventFailed(thisSlot.eventContext).ignore();
787  }
788  }
789  } // end loop on slots
790 
791  verbose() << "States Updated." << endmsg;
792 
793  return global_sc;
794 }
Gaudi::Property< bool > m_CFNext
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T open(T...args)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
ContextID_t slot() const
Definition: EventContext.h:41
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T duration_cast(T...args)
T end(T...args)
size_t sizeOfSubset(State state) const
bool isIOBound() const
Check if algorithm is I/O-bound.
Gaudi::Property< std::string > m_optimizationMode
ContextEvt_t evt() const
Definition: EventContext.h:40
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
T count(T...args)
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
void ignore() const
Definition: StatusCode.h:106
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
const std::chrono::system_clock::time_point getInitTime() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Friends And Related Function Documentation

friend class AlgoExecutionTask
friend

Definition at line 244 of file ForwardSchedulerSvc.h.

friend class IOBoundAlgTask
friend

Definition at line 245 of file ForwardSchedulerSvc.h.

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 238 of file ForwardSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> ForwardSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 193 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 166 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 172 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::vector<std::vector<std::string> > > ForwardSchedulerSvc::m_algosDependencies
private
Initial value:
{
this, "AlgosDependencies", {}, "[[deprecated]]"}

Definition at line 144 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 198 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 228 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_CFNext
private
Initial value:
{this, "useGraphFlowManagement", false,
"Temporary property to switch between ControlFlow implementations"}

Definition at line 130 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "[[deprecated]]"}
private

Definition at line 146 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_DFNext
private
Initial value:
{this, "DataFlowManagerNext", false,
"Temporary property to switch between DataFlow implementations"}

Definition at line 133 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 140 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 241 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 181 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 187 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first = true
private

Definition at line 250 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 184 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 201 of file ForwardSchedulerSvc.h.

SmartIF<IAccelerator> ForwardSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 178 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 123 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 157 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxAlgosInFlight
private
Initial value:
{this, "MaxAlgosInFlight", 1,
"[[deprecated]] Taken from the whiteboard"}

Definition at line 124 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_maxEventsInFlight
private
Initial value:
{this, "MaxEventsInFlight", 0,
"Maximum number of event processed simultaneously"}

Definition at line 117 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 127 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 138 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 135 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 282 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 281 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 160 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 119 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 248 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 224 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 142 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 175 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 122 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: