ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Classes

class  SchedulerState
 

Public Member Functions

 ~ForwardSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is availble. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void addAlg (Algorithm *, EventContext *, pthread_t)
 
bool delAlg (Algorithm *)
 
void dumpState () override
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declarePrivateTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Private tool. More...
 
template<class T >
StatusCode declarePublicTool (ToolHandle< T > &handle, std::string toolTypeAndName="", bool createIf=true)
 Declare used Public tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
 ~PropertyHolder () override=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none") const
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< TYPE > &ref, const std::string &doc="none")
 Specializations for various GaudiHandles. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ServiceHandle< TYPE > &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< TYPE > &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ServiceHandleArray< TYPE > &ref, const std::string &doc="none")
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, DataObjectHandle< TYPE > &ref, const std::string &doc="none")
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces... >
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
typedef std::function< StatusCode()> action
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, const std::string &algo_name=std::string())
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 
void dumpState (std::ostringstream &)
 

Private Attributes

Gaudi::Property< int > m_maxEventsInFlight
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxAlgosInFlight
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_CFNext
 
Gaudi::Property< bool > m_DFNext
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "[[deprecated]]"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
unsigned int m_algosInFlight = 0
 Number of algoritms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded = true
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
bool m_first = true
 

Static Private Attributes

static std::list< SchedulerStatem_sState
 
static std::mutex m_ssMut
 

Friends

class AlgoExecutionTask
 
class IOBoundAlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
typedef Gaudi::PluginService::Factory< IService *, const std::string &, ISvcLocator * > Factory
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms. This is a forward scheduler: algorithms are scheduled for execution as soon as their data dependencies are available in the whiteboard.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 84 of file ForwardSchedulerSvc.h.

Member Typedef Documentation

Definition at line 235 of file ForwardSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

ForwardSchedulerSvc::~ForwardSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 279 of file ForwardSchedulerSvc.cpp.

280 {
281 
282  if (msgLevel(MSG::DEBUG))
283  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
284 
286  error() << "problems initializing ThreadPoolSvc" << endmsg;
288  return;
289  }
290 
291  // Wait for actions pushed into the queue by finishing tasks.
292  action thisAction;
294 
295  m_isActive = ACTIVE;
296 
297  // Continue to wait if the scheduler is running or there is something to do
298  info() << "Start checking the actionsQueue" << endmsg;
299  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
300  m_actionsQueue.pop( thisAction );
301  sc = thisAction();
302  if ( sc != StatusCode::SUCCESS )
303  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
304  else
305  verbose() << "Action succeeded." << endmsg;
306  }
307 
308  info() << "Terminating thread-pool resources" << endmsg;
310  error() << "Problems terminating thread pool" << endmsg;
312  }
313 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
virtual StatusCode terminatePool()=0
Finalize the thread pool.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::function< StatusCode()> action
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::addAlg ( Algorithm a,
EventContext e,
pthread_t  t 
)

Definition at line 1152 of file ForwardSchedulerSvc.cpp.

1153 {
1154 
1156  m_sState.push_back( SchedulerState( a, e, t ) );
1157 }
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
unsigned int ForwardSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 348 of file ForwardSchedulerSvc.cpp.

349 {
350  unsigned int index = m_algname_index_map[algoname];
351  return index;
352 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 323 of file ForwardSchedulerSvc.cpp.

324 {
325 
326  if ( m_isActive == ACTIVE ) {
327  // Drain the scheduler
329  // This would be the last action
330  m_actionsQueue.push( [this]() -> StatusCode {
332  return StatusCode::SUCCESS;
333  } );
334  }
335 
336  return StatusCode::SUCCESS;
337 }
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
bool ForwardSchedulerSvc::delAlg ( Algorithm a)

Definition at line 1160 of file ForwardSchedulerSvc.cpp.

1161 {
1162 
1164 
1165  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1166  if ( *itr == a ) {
1167  m_sState.erase( itr );
1168  return true;
1169  }
1170  }
1171 
1172  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1173  return false;
1174 }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:725
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
static std::list< SchedulerState > m_sState
T lock(T...args)
STL class.
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 791 of file ForwardSchedulerSvc.cpp.

792 {
793 
794  // To have just one big message
795  std::ostringstream outputMessageStream;
796 
797  outputMessageStream << "============================== Execution Task State ============================="
798  << std::endl;
799  dumpState( outputMessageStream );
800 
801  outputMessageStream << std::endl
802  << "============================== Scheduler State ================================="
803  << std::endl;
804 
805  int slotCount = -1;
806  for ( auto thisSlot : m_eventSlots ) {
807  slotCount++;
808  if ( thisSlot.complete ) continue;
809 
810  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
811  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
812 
813  if ( 0 > iSlot or iSlot == slotCount ) {
814  outputMessageStream << "Algorithms states:" << std::endl;
815 
816  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
817  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
818  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
819  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
820  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
821  const int depsSize = deps.size();
822  if ( depsSize == 0 ) outputMessageStream << " none";
823 
824  DataObjIDColl missing;
825  for ( auto d : deps ) {
826  outputMessageStream << d << " ";
827  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
828  // outputMessageStream << "[missing] ";
829  missing.insert( d );
830  }
831  }
832 
833  if ( !missing.empty() ) {
834  outputMessageStream << ". The following are missing: ";
835  for ( auto d : missing ) {
836  outputMessageStream << d << " ";
837  }
838  }
839 
840  outputMessageStream << std::endl;
841  }
842 
843  // Snapshot of the WhiteBoard
844  outputMessageStream << "\nWhiteboard contents: " << std::endl;
845  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
846 
847  // Snapshot of the ControlFlow
848  outputMessageStream << "\nControl Flow:" << std::endl;
849  std::stringstream cFlowStateStringStream;
850  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
851 
852  outputMessageStream << cFlowStateStringStream.str() << std::endl;
853  }
854  }
855 
856  outputMessageStream << "=================================== END ======================================" << std::endl;
857 
858  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
859 }
T empty(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T str(T...args)
T insert(T...args)
T size(T...args)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
void ForwardSchedulerSvc::dumpState ( )
override

Definition at line 1188 of file ForwardSchedulerSvc.cpp.

1189 {
1190 
1192 
1193  std::ostringstream ost;
1194  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1195  dumpState( ost );
1196 
1197  info() << ost.str() << endmsg;
1198 }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void ForwardSchedulerSvc::dumpState ( std::ostringstream ost)
private

Definition at line 1177 of file ForwardSchedulerSvc.cpp.

1178 {
1179 
1181 
1182  for ( auto it : m_sState ) {
1183  ost << " " << it << std::endl;
1184  }
1185 }
T endl(T...args)
static std::list< SchedulerState > m_sState
T lock(T...args)
static std::mutex m_ssMut
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 488 of file ForwardSchedulerSvc.cpp.

489 {
490 
491  // Set the number of slots available to an error code
492  m_freeSlots.store( 0 );
493 
494  fatal() << "*** Event " << eventContext->evt() << " on slot "
495  << eventContext->slot() << " failed! ***" << endmsg;
496 
497  std::ostringstream ost;
498  m_algExecStateSvc->dump(ost, *eventContext);
499 
500  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
501  << ":\n" << ost.str() << endmsg;
502 
503  dumpSchedulerState(-1);
504 
505  // Empty queue and deactivate the service
506  action thisAction;
507  while ( m_actionsQueue.try_pop( thisAction ) ) {
508  };
509  deactivate();
510 
511  // Push into the finished events queue the failed context
512  EventContext* thisEvtContext;
513  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
514  m_finishedEvents.push( thisEvtContext );
515  };
516  m_finishedEvents.push( eventContext );
517 
518  return StatusCode::FAILURE;
519 }
StatusCode deactivate()
Deactivate scheduler.
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
ContextEvt_t evt() const
Definition: EventContext.h:40
virtual void dump(std::ostringstream &ost) const =0
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
std::function< StatusCode()> action
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 246 of file ForwardSchedulerSvc.cpp.

247 {
248 
250  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
251 
252  sc = deactivate();
253  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
254 
255  info() << "Joining Scheduler thread" << endmsg;
256  m_thread.join();
257 
258  // Final error check after thread pool termination
259  if ( m_isActive == FAILURE ) {
260  error() << "problems in scheduler thread" << endmsg;
261  return StatusCode::FAILURE;
262  }
263 
264  // m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
265 
266  return sc;
267 }
StatusCode deactivate()
Deactivate scheduler.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int ForwardSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 424 of file ForwardSchedulerSvc.cpp.

424 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 344 of file ForwardSchedulerSvc.cpp.

344 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 52 of file ForwardSchedulerSvc.cpp.

53 {
54 
55  // Initialise mother class (read properties, ...)
57  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
58 
59  // Get hold of the TBBSvc. This should initialize the thread pool
60  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
61  if ( !m_threadPoolSvc.isValid() ) {
62  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
63  return StatusCode::FAILURE;
64  }
65 
66  // Activate the scheduler in another thread.
67  info() << "Activating scheduler in a separate thread" << endmsg;
69 
70  while ( m_isActive != ACTIVE ) {
71  if ( m_isActive == FAILURE ) {
72  fatal() << "Terminating initialization" << endmsg;
73  return StatusCode::FAILURE;
74  } else {
75  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
76  sleep( 1 );
77  }
78  }
79 
80  // Get the algo resource pool
81  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
82  if ( !m_algResourcePool.isValid() ) {
83  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
84  return StatusCode::FAILURE;
85  }
86 
87  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
88  if (!m_algExecStateSvc.isValid()) {
89  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
90  return StatusCode::FAILURE;
91  }
92 
93  // Get Whiteboard
95  if ( !m_whiteboard.isValid() ) {
96  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
97  return StatusCode::FAILURE;
98  }
99 
100  // Check the MaxEventsInFlight parameters and react
101  // Deprecated for the moment
102  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
103  if ( m_maxEventsInFlight != 0 ) {
104  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
105  << "Please migrate your code options files." << endmsg;
106 
107  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
108  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
109  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
110  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
111  }
112  }
113 
114  // Get dedicated scheduler for I/O-bound algorithms
115  if ( m_useIOBoundAlgScheduler ) {
118  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
119  }
120  // Align the two quantities
121  m_maxEventsInFlight = numberOfWBSlots;
122 
123  // Set the number of free slots
125 
126  if ( m_algosDependencies.size() != 0 ) {
127  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
128  << " FIX your job options #####" << endmsg;
129  }
130 
131  // Get the list of algorithms
133  const unsigned int algsNumber = algos.size();
134  info() << "Found " << algsNumber << " algorithms" << endmsg;
135 
136  /* Dependencies
137  1) Look for handles in algo, if none
138  2) Assume none are required
139  */
140 
141  DataObjIDColl globalInp, globalOutp;
142 
143  info() << "Data Dependencies for Algorithms:";
144 
146  for ( IAlgorithm* ialgoPtr : algos ) {
147  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
148  if ( nullptr == algoPtr )
149  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
150 
151  info() << "\n " << algoPtr->name();
152 
153  // FIXME
154  DataObjIDColl algoDependencies;
155  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
156  for ( auto id : algoPtr->inputDataObjs() ) {
157  info() << "\n o INPUT " << id;
158  algoDependencies.insert( id );
159  globalInp.insert( id );
160  }
161  for ( auto id : algoPtr->outputDataObjs() ) {
162  info() << "\n o OUTPUT " << id;
163  globalOutp.insert( id );
164  }
165  } else {
166  info() << "\n none";
167  }
168  m_algosDependencies.emplace_back( algoDependencies );
169  }
170  info() << endmsg;
171 
172  // Fill the containers to convert algo names to index
173  m_algname_vect.reserve( algsNumber );
174  unsigned int index = 0;
175  for ( IAlgorithm* algo : algos ) {
176  const std::string& name = algo->name();
177  m_algname_index_map[name] = index;
179  index++;
180  }
181 
182  // Check if we have unmet global input dependencies
183  if ( m_checkDeps ) {
184  DataObjIDColl unmetDep;
185  for ( auto o : globalInp ) {
186  if ( globalOutp.find( o ) == globalOutp.end() ) {
187  unmetDep.insert( o );
188  }
189  }
190 
191  if ( unmetDep.size() > 0 ) {
192  fatal() << "The following unmet INPUT data dependencies were found: ";
193  for ( auto& o : unmetDep ) {
194  fatal() << "\n o " << o << " required by Algorithm: ";
195  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
196  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
197  fatal() << "\n * " << m_algname_vect[i];
198  }
199  }
200  }
201  fatal() << endmsg;
202  return StatusCode::FAILURE;
203  } else {
204  info() << "No unmet INPUT data dependencies were found" << endmsg;
205  }
206  }
207 
208  // prepare the control flow part
209  if ( m_CFNext ) m_DFNext = true; // force usage of new data flow machinery when new control flow is used
210  if ( !m_CFNext && !m_optimizationMode.empty() ) {
211  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
212  return StatusCode::FAILURE;
213  }
214  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
215  sc =
217  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
218 
219  // Shortcut for the message service
220  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
221  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
222 
223  m_eventSlots.assign( m_maxEventsInFlight,
224  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
225  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
226 
227  // Clearly inform about the level of concurrency
228  info() << "Concurrency level information:" << endmsg;
229  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
230  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
231  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
232 
233  // Simulating execution flow by only analyzing the graph topology and logic
234  if ( m_simulateExecution ) {
235  auto vis = concurrency::RunSimulator( 0 );
237  }
238 
239  return sc;
240 }
void simulateExecutionFlow(IGraphVisitor &visitor) const
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
StatusCode initialize() override
Definition: Service.cpp:64
Gaudi::Property< bool > m_CFNext
T empty(T...args)
Gaudi::Property< bool > m_simulateExecution
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:725
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & inputDataObjs() const override
Definition: Algorithm.h:455
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< bool > m_checkDeps
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
STL class.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< std::string > m_whiteboardSvcName
const DataObjIDColl & outputDataObjs() const override
Definition: Algorithm.h:456
T bind(T...args)
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
Gaudi::Property< int > m_maxEventsInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
Gaudi::Property< bool > m_DFNext
T insert(T...args)
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
T for_each(T...args)
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
T reserve(T...args)
T emplace_back(T...args)
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 766 of file ForwardSchedulerSvc.cpp.

767 {
768  // Get the slot
769  EventSlot& thisSlot = m_eventSlots[iSlot];
770 
771  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
773 
774  info() << "About to declare a stall" << endmsg;
775  fatal() << "*** Stall detected! ***\n" << endmsg;
776  dumpSchedulerState( iSlot );
777  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
778 
779  return StatusCode::FAILURE;
780  }
781  return StatusCode::SUCCESS;
782 }
bool algsPresent(State state) const
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 430 of file ForwardSchedulerSvc.cpp.

431 {
432 
433  unsigned int slotNum = 0;
434  for ( auto& thisSlot : m_eventSlots ) {
435  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
436  updateStates( slotNum );
437  }
438  slotNum++;
439  }
440  return StatusCode::SUCCESS;
441 }
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 447 of file ForwardSchedulerSvc.cpp.

448 {
449  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
450  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
451  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
452  // << " active: " << m_isActive << endmsg;
453  return StatusCode::FAILURE;
454  } else {
455  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
456  // << " active: " << m_isActive << endmsg;
457  m_finishedEvents.pop( eventContext );
458  m_freeSlots++;
459  if (msgLevel(MSG::DEBUG))
460  debug() << "Popped slot " << eventContext->slot() << "(event "
461  << eventContext->evt() << ")" << endmsg;
462  return StatusCode::SUCCESS;
463  }
464 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Gaudi::Property< int > m_maxEventsInFlight
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1078 of file ForwardSchedulerSvc.cpp.

1080 {
1081 
1082  // Put back the instance
1083  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1084  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1085  // EventContext* eventContext = castedAlgo->getContext();
1086 
1087  // Check if the execution failed
1088  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1089  eventFailed(eventContext).ignore();
1090 
1091  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1092 
1093  if ( !sc.isSuccess() ) {
1094  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1095  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1096  return StatusCode::FAILURE;
1097  }
1098 
1100 
1101  EventSlot& thisSlot = m_eventSlots[si];
1102  // XXX: CF tests
1103  if ( !m_DFNext ) {
1104  // Update the catalog: some new products may be there
1105  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1106 
1107  // update prods in the dataflow
1108  // DP: Handles could be used. Just update what the algo wrote
1109  DataObjIDColl new_products;
1110  m_whiteboard->getNewDataObjects(new_products).ignore();
1111  for (const auto& new_product : new_products)
1112  if (msgLevel(MSG::DEBUG))
1113  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1114  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
1115  }
1116 
1117  if (msgLevel(MSG::DEBUG))
1118  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1119  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1120 
1121  // Limit number of updates
1122  if ( m_CFNext )
1123  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1124  if ( m_updateNeeded ) {
1125  // Schedule an update of the status of the algorithms
1126  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1127  m_actionsQueue.push( updateAction );
1128  m_updateNeeded = false;
1129  }
1130 
1131  if (msgLevel(MSG::DEBUG))
1132  debug() << "[Asynchronous] Trying to handle execution result of "
1133  << index2algname(iAlgo) << " on slot " << si << endmsg;
1134  State state;
1135  if ( algo->filterPassed() ) {
1136  state = State::EVTACCEPTED;
1137  } else {
1138  state = State::EVTREJECTED;
1139  }
1140 
1141  sc = thisSlot.algsStates.updateState( iAlgo, state );
1142 
1143  if (sc.isSuccess())
1144  if (msgLevel(MSG::VERBOSE))
1145  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1146  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1147 
1148  return sc;
1149 }
Gaudi::Property< bool > m_CFNext
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
virtual const EventStatus::Status & eventStatus() const =0
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 953 of file ForwardSchedulerSvc.cpp.

954 {
955 
957 
958  // bool IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode(algName)->isIOBound();
959 
960  const std::string& algName( index2algname( iAlgo ) );
961  IAlgorithm* ialgoPtr = nullptr;
962  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
963 
964  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
965  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr ); // DP: expose the setter of the context?
966  EventContext* eventContext( m_eventSlots[si].eventContext );
967  if ( !eventContext )
968  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
969  << endmsg;
970 
971  algoPtr->setContext( m_eventSlots[si].eventContext );
973  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
974  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
975  IOBoundAlgTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
976  this, m_algExecStateSvc);
977  m_IOBoundAlgScheduler->push(*theTask);
978 
979  if (msgLevel(MSG::DEBUG))
980  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
981  << eventContext->evt() << " in slot " << si
982  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
983 
984  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
985 
986  if (updateSc.isSuccess())
987  if (msgLevel(MSG::VERBOSE))
988  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
989  << " to SCHEDULED on slot " << si << endmsg;
990  return updateSc;
991  } else {
992  if ( msgLevel( MSG::DEBUG ) )
993  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
994  << si << endmsg;
995  return sc;
996  }
997 }
Wrapper around I/O-bound Gaudi-algorithms.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void setContext(const EventContext *context) override
set the context
Definition: Algorithm.h:438
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
STL class.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode push(IAlgTask &task)=0
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 863 of file ForwardSchedulerSvc.cpp.

864 {
865 
866  // Do the control flow
867  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
868  if (sc.isSuccess())
869  if (msgLevel(MSG::VERBOSE))
870  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
871  << si << endmsg;
872 
873  return sc;
874 }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 878 of file ForwardSchedulerSvc.cpp.

879 {
880 
881  StatusCode sc;
882  if ( !m_DFNext ) {
883  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
884  } else {
886  }
887 
888  StatusCode updateSc( StatusCode::FAILURE );
889  if ( sc == StatusCode::SUCCESS )
890  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
891 
892  if (updateSc.isSuccess())
893  if (msgLevel(MSG::VERBOSE))
894  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
895  << si<< endmsg;
896 
897  return updateSc;
898 }
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Gaudi::Property< bool > m_DFNext
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 1003 of file ForwardSchedulerSvc.cpp.

1005 {
1006 
1007  // Put back the instance
1008  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1009  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1010  // EventContext* eventContext = castedAlgo->getContext();
1011 
1012  // Check if the execution failed
1013  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1014  eventFailed(eventContext).ignore();
1015 
1016  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1017 
1018  if ( !sc.isSuccess() ) {
1019  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1020  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1021  return StatusCode::FAILURE;
1022  }
1023 
1024  m_algosInFlight--;
1025 
1026  EventSlot& thisSlot = m_eventSlots[si];
1027  // XXX: CF tests
1028  if ( !m_DFNext ) {
1029  // Update the catalog: some new products may be there
1030  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1031 
1032  // update prods in the dataflow
1033  // DP: Handles could be used. Just update what the algo wrote
1034  DataObjIDColl new_products;
1035  m_whiteboard->getNewDataObjects( new_products ).ignore();
1036  for ( const auto& new_product : new_products )
1037  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1038  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
1039  }
1040 
1041  if ( msgLevel( MSG::DEBUG ) )
1042  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1043  << m_algosInFlight << endmsg;
1044 
1045  // Limit number of updates
1046  if ( m_CFNext )
1047  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1048  if ( m_updateNeeded ) {
1049  // Schedule an update of the status of the algorithms
1050  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1051  m_actionsQueue.push( updateAction );
1052  m_updateNeeded = false;
1053  }
1054 
1055  if ( msgLevel( MSG::DEBUG ) )
1056  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1057  State state;
1058  if ( algo->filterPassed() ) {
1059  state = State::EVTACCEPTED;
1060  } else {
1061  state = State::EVTREJECTED;
1062  }
1063 
1064  sc = thisSlot.algsStates.updateState( iAlgo, state );
1065 
1066  if (sc.isSuccess())
1067  if (msgLevel(MSG::VERBOSE))
1068  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1070 
1071  return sc;
1072 }
Gaudi::Property< bool > m_CFNext
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
virtual const EventStatus::Status & eventStatus() const =0
ContextID_t slot() const
Definition: EventContext.h:41
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
ContextEvt_t evt() const
Definition: EventContext.h:40
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T bind(T...args)
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 902 of file ForwardSchedulerSvc.cpp.

903 {
904 
906 
907  const std::string& algName( index2algname( iAlgo ) );
908  IAlgorithm* ialgoPtr = nullptr;
909  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
910 
911  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
912  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr ); // DP: expose the setter of the context?
913  EventContext* eventContext( m_eventSlots[si].eventContext );
914  if ( !eventContext )
915  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
916 
917  algoPtr->setContext( m_eventSlots[si].eventContext );
918  ++m_algosInFlight;
919  // Avoid to use tbb if the pool size is 1 and run in this thread
920  if (-100 != m_threadPoolSize) {
921  tbb::task* t = new( tbb::task::allocate_root() )
922  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
923  this, m_algExecStateSvc);
924  tbb::task::enqueue( *t);
925  } else {
926  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext,
928  theTask.execute();
929  }
930 
931  if ( msgLevel( MSG::DEBUG ) )
932  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
933  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
934 
935  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
936 
937  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
938 
939  if (updateSc.isSuccess())
940  if (msgLevel(MSG::VERBOSE))
941  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
942  << si << endmsg;
943  return updateSc;
944  } else {
945  if ( msgLevel( MSG::DEBUG ) )
946  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
947  return sc;
948  }
949 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void setContext(const EventContext *context) override
set the context
Definition: Algorithm.h:438
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend class AlgoExecutionTask
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 362 of file ForwardSchedulerSvc.cpp.

363 {
364 
365  if ( m_first ) {
366  m_first = false;
367  }
368 
369  if ( !eventContext ) {
370  fatal() << "Event context is nullptr" << endmsg;
371  return StatusCode::FAILURE;
372  }
373 
374  if ( m_freeSlots.load() == 0 ) {
375  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
376  return StatusCode::FAILURE;
377  }
378 
379  // no problem as push new event is only called from one thread (event loop manager)
380  m_freeSlots--;
381 
382  auto action = [this, eventContext]() -> StatusCode {
383  // Event processing slot forced to be the same as the wb slot
384  const unsigned int thisSlotNum = eventContext->slot();
385  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
386  if ( !thisSlot.complete ) {
387  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
388  return StatusCode::FAILURE;
389  }
390 
391  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
392  thisSlot.reset( eventContext );
393  // XXX: CF tests
394  if ( m_CFNext ) {
395  auto vis = concurrency::Trigger( thisSlotNum );
397  }
398 
399  return this->updateStates( thisSlotNum );
400  }; // end of lambda
401 
402  // Kick off the scheduling!
403  if ( msgLevel( MSG::VERBOSE ) ) {
404  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
405  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
406  }
407  m_actionsQueue.push( action );
408 
409  return StatusCode::SUCCESS;
410 }
Gaudi::Property< bool > m_CFNext
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
A visitor, performing full top-down traversals of a graph.
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
std::function< StatusCode()> action
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 413 of file ForwardSchedulerSvc.cpp.

414 {
415  StatusCode sc;
416  for ( auto context : eventContexts ) {
417  sc = pushNewEvent( context );
418  if ( sc != StatusCode::SUCCESS ) return sc;
419  }
420  return sc;
421 }
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 470 of file ForwardSchedulerSvc.cpp.

471 {
472  if ( m_finishedEvents.try_pop( eventContext ) ) {
473  if ( msgLevel( MSG::DEBUG ) )
474  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
475  << endmsg;
476  m_freeSlots++;
477  return StatusCode::SUCCESS;
478  }
479  return StatusCode::FAILURE;
480 }
ContextID_t slot() const
Definition: EventContext.h:41
ContextEvt_t evt() const
Definition: EventContext.h:40
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1,
const std::string algo_name = std::string() 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 535 of file ForwardSchedulerSvc.cpp.

536 {
537 
538  m_updateNeeded = true;
539 
540  // Fill a map of initial state / action using closures.
541  // done to update the states w/o several if/elses
542  // Posterchild for constexpr with gcc4.7 onwards!
543  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
544  statesTransitions = {
545  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
546  this,
547  std::placeholders::_1,
548  std::placeholders::_2)},
549  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
550  this,
551  std::placeholders::_1,
552  std::placeholders::_2)}
553  };*/
554 
555  StatusCode global_sc( StatusCode::FAILURE, true );
556 
557  // Sort from the oldest to the newest event
558  // Prepare a vector of pointers to the slots to avoid copies
559  std::vector<EventSlot*> eventSlotsPtrs;
560 
561  // Consider all slots if si <0 or just one otherwise
562  if ( si < 0 ) {
563  const int eventsSlotsSize( m_eventSlots.size() );
564  eventSlotsPtrs.reserve( eventsSlotsSize );
565  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
566  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
567  }
568  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
569  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
570  } else {
571  eventSlotsPtrs.push_back( &m_eventSlots[si] );
572  }
573 
574  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
575  int iSlot = thisSlotPtr->eventContext->slot();
576 
577  // Cache the states of the algos to improve readability and performance
578  auto& thisSlot = m_eventSlots[iSlot];
579  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
580 
581  // Take care of the control ready update
582  // XXX: CF tests
583  if ( !m_CFNext ) {
584  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
585  } else {
586  if ( !algo_name.empty() )
587  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
588  }
589 
590  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
591  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
592  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
593  if (algState==AlgsExecutionStates::ERROR)
594  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
595  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
596  partial_sc=StatusCode::SUCCESS;
597  for (auto state_transition = statesTransitions.find(algState);
598  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
599  state_transition++){
600  partial_sc = state_transition->second(iAlgo,iSlot);
601  if (partial_sc.isFailure()){
602  verbose() << "Could not apply transition from "
603  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
604  << " for algorithm " << index2algname(iAlgo)
605  << " on processing slot " << iSlot << endmsg;
606  }
607  else{global_sc=partial_sc;}
608  } // end loop on transitions
609  }*/ // end loop on algos
610 
611  StatusCode partial_sc( StatusCode::FAILURE, true );
612  // first update CONTROLREADY to DATAREADY
613  if ( !m_CFNext ) {
614  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
615  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
616 
617  uint algIndex = *it;
618  partial_sc = promoteToDataReady(algIndex, iSlot);
619  if (partial_sc.isFailure())
620  if (msgLevel(MSG::VERBOSE))
621  verbose() << "Could not apply transition from "
622  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
623  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
624  }
625  }
626 
627  // now update DATAREADY to SCHEDULED
628  if ( !m_optimizationMode.empty() ) {
629  auto comp_nodes = [this]( const uint& i, const uint& j ) {
632  };
634  comp_nodes, std::vector<uint>() );
635  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
636  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
637  buffer.push( *it );
638  /*std::stringstream s;
639  auto buffer2 = buffer;
640  while (!buffer2.empty()) {
641  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
642  buffer2.pop();
643  }
644  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
645 
646  /*while (!buffer.empty()) {
647  partial_sc = promoteToScheduled(buffer.top(), iSlot);
648  if (partial_sc.isFailure()) {
649  if (msgLevel(MSG::VERBOSE))
650  verbose() << "Could not apply transition from "
651  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
652  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
653  if (m_useIOBoundAlgScheduler) {
654  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
655  if (msgLevel(MSG::VERBOSE))
656  if (partial_sc.isFailure())
657  verbose() << "[Asynchronous] Could not apply transition from "
658  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
659  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
660  }
661  }
662  buffer.pop();
663  }*/
664  while ( !buffer.empty() ) {
665  bool IOBound = false;
667  IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode( index2algname( buffer.top() ) )->isIOBound();
668 
669  if ( !IOBound )
670  partial_sc = promoteToScheduled( buffer.top(), iSlot );
671  else
672  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
673 
674  if (msgLevel(MSG::VERBOSE))
675  if (partial_sc.isFailure())
676  verbose() << "Could not apply transition from "
677  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
678  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
679 
680  buffer.pop();
681  }
682 
683  } else {
684  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
685  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
686  uint algIndex = *it;
687 
688  bool IOBound = false;
691 
692  if ( !IOBound )
693  partial_sc = promoteToScheduled( algIndex, iSlot );
694  else
695  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
696 
697  if (msgLevel(MSG::VERBOSE))
698  if (partial_sc.isFailure())
699  verbose() << "Could not apply transition from "
700  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
701  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
702  }
703  }
704 
708  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
709  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
710  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
712  << "\n";
713  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
714  : std::to_string(tbb::task_scheduler_init::default_num_threads());
715  std::ofstream myfile;
716  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
717  myfile << s.str();
718  myfile.close();
719  }
720 
721  // Not complete because this would mean that the slot is already free!
722  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
723  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
724  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
725  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
726 
727  thisSlot.complete = true;
728  // if the event did not fail, add it to the finished events
729  // otherwise it is taken care of in the error handling already
730  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
731  m_finishedEvents.push(thisSlot.eventContext);
732  if (msgLevel(MSG::DEBUG))
733  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
734  << thisSlot.eventContext->slot() << ")." << endmsg;
735  }
736  // now let's return the fully evaluated result of the control flow
737  if ( msgLevel( MSG::DEBUG ) ) {
739  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
740  debug() << ss.str() << endmsg;
741  }
742 
743  thisSlot.eventContext = nullptr;
744  } else {
745  StatusCode eventStalledSC = isStalled(iSlot);
746  if (! eventStalledSC.isSuccess()) {
747  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
748  eventFailed(thisSlot.eventContext).ignore();
749  }
750  }
751  } // end loop on slots
752 
753  verbose() << "States Updated." << endmsg;
754 
755  return global_sc;
756 }
Gaudi::Property< bool > m_CFNext
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
T empty(T...args)
virtual void setEventStatus(const EventStatus::Status &sc)=0
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T open(T...args)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
virtual const EventStatus::Status & eventStatus() const =0
ContextID_t slot() const
Definition: EventContext.h:41
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T duration_cast(T...args)
T end(T...args)
size_t sizeOfSubset(State state) const
bool isIOBound() const
Check if algorithm is I/O-bound.
Gaudi::Property< std::string > m_optimizationMode
ContextEvt_t evt() const
Definition: EventContext.h:40
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
T str(T...args)
T count(T...args)
bool m_updateNeeded
Keep track of update actions scheduled.
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
void ignore() const
Definition: StatusCode.h:106
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
Gaudi::Property< int > m_threadPoolSize
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
const std::chrono::system_clock::time_point getInitTime() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Friends And Related Function Documentation

friend class AlgoExecutionTask
friend

Definition at line 244 of file ForwardSchedulerSvc.h.

friend class IOBoundAlgTask
friend

Definition at line 245 of file ForwardSchedulerSvc.h.

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 238 of file ForwardSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> ForwardSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 193 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 166 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 172 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::vector<std::vector<std::string> > > ForwardSchedulerSvc::m_algosDependencies
private
Initial value:
{
this, "AlgosDependencies", {}, "[[deprecated]]"}

Definition at line 144 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight = 0
private

Number of algoritms presently in flight.

Definition at line 198 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 228 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_CFNext
private
Initial value:
{this, "useGraphFlowManagement", false,
"Temporary property to switch between ControlFlow implementations"}

Definition at line 130 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "[[deprecated]]"}
private

Definition at line 146 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_DFNext
private
Initial value:
{this, "DataFlowManagerNext", false,
"Temporary property to switch between DataFlow implementations"}

Definition at line 133 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 140 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 241 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 181 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 187 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_first = true
private

Definition at line 250 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 184 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_IOBoundAlgosInFlight
private

Number of algoritms presently in flight.

Definition at line 201 of file ForwardSchedulerSvc.h.

SmartIF<IAccelerator> ForwardSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 178 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 123 of file ForwardSchedulerSvc.h.

std::atomic<ActivationState> ForwardSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 157 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxAlgosInFlight
private
Initial value:
{this, "MaxAlgosInFlight", 1,
"[[deprecated]] Taken from the whiteboard"}

Definition at line 124 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_maxEventsInFlight
private
Initial value:
{this, "MaxEventsInFlight", 0,
"Maximum number of event processed simultaneously"}

Definition at line 117 of file ForwardSchedulerSvc.h.

Gaudi::Property<unsigned int> ForwardSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 127 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 138 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 135 of file ForwardSchedulerSvc.h.

std::mutex ForwardSchedulerSvc::m_ssMut
staticprivate

Definition at line 282 of file ForwardSchedulerSvc.h.

std::list< ForwardSchedulerSvc::SchedulerState > ForwardSchedulerSvc::m_sState
staticprivate

Definition at line 281 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 160 of file ForwardSchedulerSvc.h.

Gaudi::Property<int> ForwardSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 119 of file ForwardSchedulerSvc.h.

SmartIF<IThreadPoolSvc> ForwardSchedulerSvc::m_threadPoolSvc
private

Definition at line 248 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded = true
private

Keep track of update actions scheduled.

Definition at line 224 of file ForwardSchedulerSvc.h.

Gaudi::Property<bool> ForwardSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 142 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 175 of file ForwardSchedulerSvc.h.

Gaudi::Property<std::string> ForwardSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 122 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: