ForwardSchedulerSvc Class Reference

The SchedulerSvc implements the IScheduler interface. More...

#include <GaudiKernel/ForwardSchedulerSvc.h>

Inheritance diagram for ForwardSchedulerSvc:
Collaboration diagram for ForwardSchedulerSvc:

Public Member Functions

 ForwardSchedulerSvc (const std::string &name, ISvcLocator *svc)
 Constructor. More...
 
 ~ForwardSchedulerSvc ()
 Destructor. More...
 
virtual StatusCode initialize ()
 Initialise. More...
 
virtual StatusCode finalize ()
 Finalise. More...
 
virtual StatusCode pushNewEvent (EventContext *eventContext)
 Make an event available to the scheduler. More...
 
virtual StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts)
 
virtual StatusCode popFinishedEvent (EventContext *&eventContext)
 Blocks until an event is availble. More...
 
virtual StatusCode tryPopFinishedEvent (EventContext *&eventContext)
 Try to fetch an event from the scheduler. More...
 
virtual unsigned int freeSlots ()
 Get free slots number. More...
 
- Public Member Functions inherited from extends< BASE, Interfaces >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces...>
 ~extend_interfaces () override=default
 Virtual destructor. More...
 
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Types

typedef std::function< StatusCode()> action
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::string & index2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode eventFailed (EventContext *eventContext)
 Method to check if an event failed and take appropriate actions. More...
 
StatusCode updateStates (int si=-1, const std::string &algo_name=std::string())
 Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state) More...
 
StatusCode promoteToControlReady (unsigned int iAlgo, int si)
 Algorithm promotion: Accepted by the control flow. More...
 
StatusCode promoteToDataReady (unsigned int iAlgo, int si)
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
StatusCode isStalled (int si)
 Check if the scheduling is in a stall. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 
StatusCode m_drain ()
 Drain the actions present in the queue. More...
 

Private Attributes

bool m_isActive
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::string > m_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::string m_whiteboardSvcName
 The whiteboard name. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
int m_maxEventsInFlight
 Maximum number of event processed simultaneously. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
unsigned int m_maxAlgosInFlight
 Maximum number of simultaneous algorithms. More...
 
unsigned int m_algosInFlight
 Number of algoritms presently in flight. More...
 
bool m_updateNeeded
 Keep track of update actions scheduled. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
std::vector< std::vector< std::string > > m_algosDependencies
 Ugly, will disappear when the deps are declared only within the C++ code of the algos. More...
 
int m_threadPoolSize
 Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
concurrency::ExecutionFlowManager m_efManager
 Member to take care of the control flow. More...
 
bool m_CFNext
 
bool m_DFNext
 
bool m_simulateExecution
 
std::string m_optimizationMode
 
bool m_dumpIntraEventDynamics
 

Friends

class AlgoExecutionTask
 

Additional Inherited Members

- Public Types inherited from extends< BASE, Interfaces >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
- Public Types inherited from extend_interfaces< Interfaces...>
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 

Detailed Description

The SchedulerSvc implements the IScheduler interface.

It manages all the execution states of the algorithms and interacts with the TBB runtime for the algorithm tasks submission. A state machine takes care of the tracking of the execution state of the algorithms. This is a forward scheduler: algorithms are scheduled for execution as soon as their data dependencies are available in the whiteboard.

Algorithms management

The activate() method runs in a separate thread. It checks a TBB concurrent bounded queue of closures in a loop via the Pop method. This allows not to use a cpu entirely to check the presence of new actions to be taken. In other words, the asynchronous actions are serialised via the actions queue. Once a task terminates, a call to the promoteToExecuted method will be pushed into the actions queue. The promoteToExecuted method also triggers a call to the updateStates method, which brushes all algorithms, checking if their state can be changed. It's indeed possible that upon termination of an algorithm, the control flow and/or the data flow allow the submission of more algorithms.

Algorithms dependencies

There are two ways of declaring algorithms dependencies. One which is only temporarly available to ease developments consists in declaring them through AlgosDependencies property as a list of list. The order of these sublist must be the same one of the algorithms in the TopAlg list. The second one consists in declaring the data dependencies directly within the algorithms via data object handles.

Events management

The scheduler accepts events to be processed (in the form of eventContexts) and releases processed events. This flow is implemented through three methods:

  • pushNewEvent: to make an event available to the scheduler.
  • tryPopFinishedEvent: to retrieve an event from the scheduler
  • popFinishedEvent: to retrieve an event from the scheduler (blocking)

Please refer to the full documentation of the methods for more details.

Author
Danilo Piparo
Benedikt Hegner
Version
1.1

Definition at line 72 of file ForwardSchedulerSvc.h.

Member Typedef Documentation

typedef std::function<StatusCode ()> ForwardSchedulerSvc::action
private

Definition at line 195 of file ForwardSchedulerSvc.h.

Constructor & Destructor Documentation

ForwardSchedulerSvc::ForwardSchedulerSvc ( const std::string &  name,
ISvcLocator svc 
)

Constructor.

Definition at line 32 of file ForwardSchedulerSvc.cpp.

32  :
33  base_class(name,svcLoc),
34  m_isActive(false),
35  m_algosInFlight(0),
36  m_updateNeeded(true)
37 {
38  declareProperty("MaxEventsInFlight", m_maxEventsInFlight = 0 );
39  declareProperty("ThreadPoolSize", m_threadPoolSize = -1 );
40  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc" );
41  // Will disappear when dependencies are properly propagated into the C++ code of the algos
42  declareProperty("AlgosDependencies", m_algosDependencies);
43  declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 0, "Taken from the whiteboard. Deprecated" );
44  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
45  declareProperty("useGraphFlowManagement", m_CFNext = false );
46  declareProperty("DataFlowManagerNext", m_DFNext = false );
47  declareProperty("SimulateExecution", m_simulateExecution = false );
48  declareProperty("Optimizer", m_optimizationMode = "", "The following modes are currently available: PCE, COD, DRE, E" );
49  declareProperty("DumpIntraEventDynamics", m_dumpIntraEventDynamics = false, "Dump intra-event concurrency dynamics to csv file" );
50 }
bool m_isActive
Flag to track if the scheduler is active or not.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
extends base_class
Typedef to this class.
Definition: extends.h:14
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::string m_whiteboardSvcName
The whiteboard name.
bool m_updateNeeded
Keep track of update actions scheduled.
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
ForwardSchedulerSvc::~ForwardSchedulerSvc ( )

Destructor.

Definition at line 53 of file ForwardSchedulerSvc.cpp.

53 {}

Member Function Documentation

void ForwardSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Activate a pool

Definition at line 238 of file ForwardSchedulerSvc.cpp.

238  {
239 
240  // Now it's running
241  m_isActive=true;
242 
244  tbb::task_scheduler_init* TBBSchedInit = nullptr;
245 
246  // -100 prevents the creation of the pool and the scheduler directly executes
247  // the tasks.
248  if (-100 != m_threadPoolSize) {
249  debug() << "Initialising a TBB thread pool of requested size " << m_threadPoolSize << endmsg;
250  // Leave -1 in case selected, increment otherwise
251  int thePoolSize = m_threadPoolSize;
252  if (thePoolSize == -1)
253  debug() << "...default TBB thread pool size amounts to " << tbb::task_scheduler_init::default_num_threads()<< endmsg;
254  if (thePoolSize != -1)
255  thePoolSize += 1;
256  TBBSchedInit = new tbb::task_scheduler_init (thePoolSize);
257  } else {
258  debug() << "Thread pool size is one. Pool not initialised." << endmsg;
259  }
260  // Wait for actions pushed into the queue by finishing tasks.
261  action thisAction;
263 
264  // Continue to wait if the scheduler is running or there is something to do
265  info() << "Start checking the actionsQueue" << endmsg;
266  while(m_isActive or m_actionsQueue.size()!=0){
267  m_actionsQueue.pop(thisAction);
268  sc = thisAction();
269  if (sc!=StatusCode::SUCCESS)
270  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
271  else
272  verbose() << "Action succeeded." << endmsg;
273  }
274 
275  if (TBBSchedInit)
276  delete TBBSchedInit;
277 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool m_isActive
Flag to track if the scheduler is active or not.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
unsigned int ForwardSchedulerSvc::algname2index ( const std::string &  algoname)
inlineprivate

Convert a name to an integer.

Definition at line 311 of file ForwardSchedulerSvc.cpp.

311  {
312  unsigned int index = m_algname_index_map[algoname];
313  return index;
314 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode ForwardSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 287 of file ForwardSchedulerSvc.cpp.

287  {
288 
289  if (m_isActive){
290  // Drain the scheduler
292  this));
293  // This would be the last action
294  m_actionsQueue.push([this]() -> StatusCode {m_isActive=false;return StatusCode::SUCCESS;});
295  }
296 
297  return StatusCode::SUCCESS;
298 }
bool m_isActive
Flag to track if the scheduler is active or not.
StatusCode m_drain()
Drain the actions present in the queue.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void ForwardSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

The dependencies of each algo are printed and the missing ones specified.

Definition at line 695 of file ForwardSchedulerSvc.cpp.

695  {
696 
697  // To have just one big message
698  std::stringstream outputMessageStream;
699 
700  int slotCount = -1;
701  for (auto thisSlot : m_eventSlots){
702  slotCount++;
703  outputMessageStream.str(std::string());
704  if ( thisSlot.complete )
705  continue;
706 
707  outputMessageStream << "Dump of Scheduler State for slot " << thisSlot.eventContext->evt() << std::endl;
708 
709  if ( 0 > iSlot or iSlot == slotCount) {
710  outputMessageStream << "Algorithms states for event " << thisSlot.eventContext->evt() << std::endl;
711 
712  const std::vector<std::string>& wbSlotContent ( thisSlot.dataFlowMgr.content() );
713  for (unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
714  outputMessageStream << " o " << index2algname(algoIdx)
715  << " was in state " << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]]
716  << ". Its data dependencies are ";
717  std::vector<std::string> deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
718  const int depsSize=deps.size();
719  if (depsSize==0)
720  outputMessageStream << " none.";
721 
722  for (int i=0;i<depsSize;++i)
723  outputMessageStream << deps[i] << (i==(depsSize-1) ? "" :", ");
724 
725  // Now list what dependencies were available
726  // With std::algorithms, move the ones which are missing at the beginning of the vector
727  std::vector<std::string>::iterator missinngDepsEndIt =
728  std::remove_if(deps.begin(), // from the beginning of the deps
729  deps.end(), // to their end
730  [&wbSlotContent] (std::string dep) { // remove if this lambda returns true
731  return std::count(wbSlotContent.begin(),wbSlotContent.end(),dep)!=0; //look for dep in wb content
732  });
733 
734  if (deps.begin() != missinngDepsEndIt) {
735  outputMessageStream << ". The following are missing: ";
736  for (std::vector<std::string>::iterator missingDep=deps.begin();missingDep!=missinngDepsEndIt;++missingDep)
737  outputMessageStream << *missingDep << (missingDep==(missinngDepsEndIt-1)?"":", ");
738  }
739 
740  outputMessageStream << std::endl;
741  }
742 
743  fatal() << outputMessageStream.str() << endmsg;
744  outputMessageStream.str(std::string());
745 
746  // Snapshot of the WhiteBoard
747  outputMessageStream << "The content of the whiteboard for this event was:\n";
748  for (auto& product : wbSlotContent )
749  outputMessageStream << " o " << product << std::endl;
750 
751  fatal() << outputMessageStream.str()<< endmsg;
752  outputMessageStream.str(std::string());
753 
754  // Snapshot of the ControlFlow
755  outputMessageStream << "The status of the control flow for this event was:\n";
756  std::stringstream cFlowStateStringStream;
757  m_efManager.printEventState(cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState,0);
758 
759  outputMessageStream << cFlowStateStringStream.str();
760 
761  fatal() << outputMessageStream.str() << endmsg;
762  }
763  }
764 
765 }
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::vector< EventSlot > m_eventSlots
Vector of events slots.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
list i
Definition: ana.py:128
static std::map< State, std::string > stateNames
StatusCode ForwardSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to check if an event failed and take appropriate actions.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler, drains the actions (without executing them) and events in the queues and returns a failure.

Definition at line 436 of file ForwardSchedulerSvc.cpp.

436  {
437 
438  // Set the number of slots available to an error code
439  m_freeSlots.store(0);
440 
441  fatal() << "*** Event " << eventContext->evt() << " on slot "
442  << eventContext->slot() << " failed! ***" << endmsg;
443 
444  //dumpSchedulerState(-1);
445 
446  // Empty queue and deactivate the service
447  action thisAction;
448  while(m_actionsQueue.try_pop(thisAction)){};
449  deactivate();
450 
451  // Push into the finished events queue the failed context
452  EventContext* thisEvtContext;
453  while(m_finishedEvents.try_pop(thisEvtContext)) { m_finishedEvents.push(thisEvtContext); };
454  m_finishedEvents.push(eventContext);
455 
456  return StatusCode::FAILURE;
457 
458 }
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
long int evt() const
Definition: EventContext.h:37
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
ID_type slot() const
Definition: EventContext.h:38
StatusCode ForwardSchedulerSvc::finalize ( )
virtual

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 209 of file ForwardSchedulerSvc.cpp.

209  {
210 
212  if (!sc.isSuccess())
213  warning () << "Base class could not be finalized" << endmsg;
214 
215  sc = deactivate();
216  if (!sc.isSuccess())
217  warning () << "Scheduler could not be deactivated" << endmsg;
218 
219  info() << "Joining Scheduler thread" << endmsg;
220  m_thread.join();
221 
222  //m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
223 
224  return sc;
225 
226  }
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
std::thread m_thread
The thread in which the activate function runs.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
unsigned int ForwardSchedulerSvc::freeSlots ( )
virtual

Get free slots number.

Definition at line 378 of file ForwardSchedulerSvc.cpp.

378  {
379  return std::max(m_freeSlots.load(),0);
380 }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
const std::string & ForwardSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 305 of file ForwardSchedulerSvc.cpp.

305  {
306  return m_algname_vect[index];
307 }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode ForwardSchedulerSvc::initialize ( )
virtual

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 61 of file ForwardSchedulerSvc.cpp.

61  {
62 
63  // Initialise mother class (read properties, ...)
65  if (!sc.isSuccess())
66  warning () << "Base class could not be initialized" << endmsg;
67 
68  // Get the algo resource pool
69  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
71  error() << "Error retrieving AlgoResourcePool" << endmsg;
72 
73  // Get Whiteboard
74  m_whiteboard = serviceLocator()->service(m_whiteboardSvcName);
75  if (!m_whiteboard.isValid())
76  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
77 
78  // Check the MaxEventsInFlight parameters and react
79  // Deprecated for the moment
80  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
81  if (m_maxEventsInFlight!=0){
82  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
83  << "Please migrate your code options files." << endmsg;
84 
85  if (m_maxEventsInFlight != (int)numberOfWBSlots){
86  warning() << "In addition, the number of events in flight ("
87  << m_maxEventsInFlight << ") differs from the slots in the whiteboard ("
88  << numberOfWBSlots << "). Setting the number of events in flight to "
89  << numberOfWBSlots << endmsg;
90  }
91  }
92 
93  // Align the two quantities
94  m_maxEventsInFlight = numberOfWBSlots;
95 
96  // Set the number of free slots
98 
99  // Get the list of algorithms
100  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
101  const unsigned int algsNumber = algos.size();
102  info() << "Found " << algsNumber << " algorithms" << endmsg;
103 
104  const unsigned int algosDependenciesSize=m_algosDependencies.size();
105  info() << "Algodependecies size is " << algosDependenciesSize << endmsg;
106 
107  /* Dependencies
108  0) Read deps from config file
109  1) Look for handles in algo, if none
110  2) Assume none are required
111  */
112  if (algosDependenciesSize == 0) {
113  for (IAlgorithm* ialgoPtr : algos) {
114  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
115  if (nullptr == algoPtr)
116  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
117 
118 #pragma GCC diagnostic push
119 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
120  const std::vector<MinimalDataObjectHandle*>& algoHandles(algoPtr->handles());
121 #pragma GCC diagnostic pop
122  std::vector<std::string> algoDependencies;
123  if (!algoHandles.empty()) {
124  info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg;
125  for (MinimalDataObjectHandle* handlePtr : algoHandles) {
126  if (handlePtr->isValid()) {
127  if (handlePtr->accessType() == MinimalDataObjectHandle::AccessType::READ) {
128  const std::string& productName = handlePtr->dataProductName();
129  info() << " o READ Handle found for product " << productName << endmsg;
130  algoDependencies.emplace_back(productName);
131 
132  //just for info output alternative locations
133  if (handlePtr->alternativeDataProductNames().size() != 0) {
134  info() << "\t\t alternative locations";
135  for (auto s : handlePtr->alternativeDataProductNames())
136  info() << " " << s;
137  info() << endmsg;
138  }
139  } else {
140  //output WRITE handles just for info
141  info() << " o WRITE Handle found for product " << handlePtr->dataProductName() << endmsg;
142  }
143  }
144  }
145  } else {
146  info() << "Algorithm " << algoPtr->name() << " has no data dependencies." << endmsg;
147  }
148  m_algosDependencies.emplace_back(algoDependencies);
149  }
150  } else {
151  if (algsNumber != algosDependenciesSize){
152  error() << "number of Algorithms is different from size of Data Dependency list!" << endmsg;
153  return StatusCode::FAILURE;
154  }
155  }
156 
157  // Fill the containers to convert algo names to index
158  m_algname_vect.reserve(algsNumber);
159  unsigned int index=0;
160  for (IAlgorithm* algo : algos){
161  const std::string& name = algo->name();
162  m_algname_index_map[name]=index;
163  m_algname_vect.emplace_back(name);
164  index++;
165  }
166 
167  // prepare the control flow part
168  if (m_CFNext) m_DFNext = true; //force usage of new data flow machinery when new control flow is used
169  if (!m_CFNext && !m_optimizationMode.empty()) {
170  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
171  return StatusCode::FAILURE;
172  }
173  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
175  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
176  // Shortcut for the message service
177  SmartIF<IMessageSvc> messageSvc (serviceLocator());
178  if (!messageSvc.isValid())
179  error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
180 
181  m_eventSlots.assign(m_maxEventsInFlight,EventSlot(m_algosDependencies,algsNumber,controlFlowNodeNumber,messageSvc));
182  std::for_each(m_eventSlots.begin(),m_eventSlots.end(),[](EventSlot& slot){slot.complete=true;});
183 
184  // Clearly inform about the level of concurrency
185  info() << "Concurrency level information:" << endmsg;
186  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
187  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
188  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
189 
190  // Simulating execution flow by only analyzing the graph topology and logic
191  if (m_simulateExecution) {
192  auto vis = concurrency::RunSimulator(0);
194  }
195 
196  // Activate the scheduler in another thread.
197  info() << "Activating scheduler in a separate thread" << endmsg;
198  m_thread = std::thread (std::bind(&ForwardSchedulerSvc::activate,
199  this));
200 
201  return sc;
202 
203 }
void simulateExecutionFlow(IGraphVisitor &visitor) const
StatusCode initialize() override
Definition: Service.cpp:63
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
void activate()
Activate scheduler.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
StatusCode ForwardSchedulerSvc::isStalled ( int  iSlot)
private

Check if the scheduling is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when no actions are present in the actionsQueue, no algorithm is in flight and no algorithm has all of its dependencies satisfied.

Definition at line 670 of file ForwardSchedulerSvc.cpp.

670  {
671  // Get the slot
672  EventSlot& thisSlot = m_eventSlots[iSlot];
673 
674  if (m_actionsQueue.empty() &&
675  m_algosInFlight == 0 &&
677 
678  info() << "About to declare a stall" << endmsg;
679  fatal() << "*** Stall detected! ***\n" << endmsg;
680  dumpSchedulerState(iSlot);
681  //throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
682 
683  return StatusCode::FAILURE;
684  }
685  return StatusCode::SUCCESS;
686 }
bool algsPresent(State state) const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
StatusCode ForwardSchedulerSvc::m_drain ( )
private

Drain the actions present in the queue.

Update the states for all slots until nothing is left to do.

Definition at line 386 of file ForwardSchedulerSvc.cpp.

386  {
387 
388  unsigned int slotNum=0;
389  for (auto& thisSlot : m_eventSlots){
390  if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
391  updateStates(slotNum);
392  }
393  slotNum++;
394  }
395  return StatusCode::SUCCESS;
396 }
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode ForwardSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
virtual

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 402 of file ForwardSchedulerSvc.cpp.

402  {
403  if (m_freeSlots.load() == m_maxEventsInFlight or
404  !m_isActive) {
405  return StatusCode::FAILURE;
406  } else {
407  m_finishedEvents.pop(eventContext);
408  m_freeSlots++;
409  debug() << "Popped slot " << eventContext->slot() << "(event "
410  << eventContext->evt() << ")" << endmsg;
411  return StatusCode::SUCCESS;
412  }
413 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
long int evt() const
Definition: EventContext.h:37
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
ID_type slot() const
Definition: EventContext.h:38
StatusCode ForwardSchedulerSvc::promoteToControlReady ( unsigned int  iAlgo,
int  si 
)
private

Algorithm promotion: Accepted by the control flow.

Definition at line 769 of file ForwardSchedulerSvc.cpp.

769  {
770 
771  // Do the control flow
772  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
773  if (sc.isSuccess())
774  if (msgLevel(MSG::DEBUG))
775  debug() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY" << endmsg;
776 
777  return sc;
778 
779 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode ForwardSchedulerSvc::promoteToDataReady ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 783 of file ForwardSchedulerSvc.cpp.

783  {
784 
785  StatusCode sc;
786  if (!m_DFNext) {
787  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
788  } else {
790  }
791 
793  if (sc == StatusCode::SUCCESS)
794  updateSc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::DATAREADY);
795 
796  if (updateSc.isSuccess())
797  if (msgLevel(MSG::DEBUG))
798  debug() << "Promoting " << index2algname(iAlgo) << " to DATAREADY" << endmsg;
799 
800  return updateSc;
801 
802 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode ForwardSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 855 of file ForwardSchedulerSvc.cpp.

855  {
856 
857  // Put back the instance
858  Algorithm* castedAlgo = dynamic_cast<Algorithm*>(algo); // DP: expose context getter in IAlgo?
859  if (!castedAlgo)
860  fatal() << "The casting did not succeed!" << endmsg;
861  EventContext* eventContext = castedAlgo->getContext();
862 
863  // Check if the execution failed
864  if (eventContext->evtFail())
865  eventFailed(eventContext);
866 
867  StatusCode sc = m_algResourcePool->releaseAlgorithm(algo->name(),algo);
868 
869  if (!sc.isSuccess()) {
870  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
871  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
872  return StatusCode::FAILURE;
873  }
874 
875  m_algosInFlight--;
876 
877  EventSlot& thisSlot = m_eventSlots[si];
878  // XXX: CF tests
879  if (!m_DFNext) {
880  // Update the catalog: some new products may be there
881  m_whiteboard->selectStore(eventContext->slot()).ignore();
882 
883  // update prods in the dataflow
884  // DP: Handles could be used. Just update what the algo wrote
885  std::vector<std::string> new_products;
886  m_whiteboard->getNewDataObjects(new_products).ignore();
887  for (const auto& new_product : new_products)
888  if (msgLevel(MSG::DEBUG))
889  debug() << "Found in WB: " << new_product << endmsg;
890  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
891  }
892 
893  if (msgLevel(MSG::DEBUG))
894  debug() << "Algorithm " << algo->name() << " executed. Algorithms scheduled are " << m_algosInFlight << endmsg;
895 
896  // Limit number of updates
897  if (m_CFNext) m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
898  if (m_updateNeeded) {
899  // Schedule an update of the status of the algorithms
900  auto updateAction = std::bind(&ForwardSchedulerSvc::updateStates, this, -1, algo->name());
901  m_actionsQueue.push(updateAction);
902  m_updateNeeded = false;
903  }
904 
905  if (msgLevel(MSG::DEBUG))
906  debug() << "Trying to handle execution result of " << index2algname(iAlgo) << "." << endmsg;
907  State state;
908  if (algo->filterPassed()) {
909  state = State::EVTACCEPTED;
910  } else {
911  state = State::EVTREJECTED;
912  }
913 
914  sc = thisSlot.algsStates.updateState(iAlgo,state);
915 
916  if (sc.isSuccess())
917  if (msgLevel(MSG::DEBUG))
918  debug() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
920 
921  return sc;
922 }
virtual StatusCode getNewDataObjects(std::vector< std::string > &products)=0
Get the latest new data objects registred in store.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
bool evtFail() const
Definition: EventContext.h:40
void updateDataObjectsCatalog(const std::vector< std::string > &newProducts)
Update the catalog of available products in the slot.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
long int evt() const
Definition: EventContext.h:37
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
ID_type slot() const
Definition: EventContext.h:38
Class representing the event slot.
Definition: EventSlot.h:11
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
void ignore() const
Definition: StatusCode.h:108
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
static std::map< State, std::string > stateNames
EventContext * getContext()
get the context
Definition: Algorithm.h:553
StatusCode updateState(unsigned int iAlgo, State newState)
StatusCode ForwardSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode ForwardSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si 
)
private

Definition at line 806 of file ForwardSchedulerSvc.cpp.

806  {
807 
809  return StatusCode::FAILURE;
810 
811  const std::string& algName(index2algname(iAlgo));
812 
813  IAlgorithm* ialgoPtr=nullptr;
814  StatusCode sc ( m_algResourcePool->acquireAlgorithm(algName,ialgoPtr) );
815 
816  if (sc.isSuccess()) {
817  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
818  EventContext* eventContext ( m_eventSlots[si].eventContext );
819  if (!eventContext)
820  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si<< ")" << endmsg;
821 
822  algoPtr->setContext(m_eventSlots[si].eventContext);
823  ++m_algosInFlight;
824  // Avoid to use tbb if the pool size is 1 and run in this thread
825  if (-100 != m_threadPoolSize) {
826  tbb::task* t = new( tbb::task::allocate_root() ) AlgoExecutionTask(ialgoPtr, iAlgo, serviceLocator(), this);
827  tbb::task::enqueue( *t);
828  } else {
829  AlgoExecutionTask theTask(ialgoPtr, iAlgo, serviceLocator(), this);
830  theTask.execute();
831  }
832 
833  if (msgLevel(MSG::DEBUG))
834  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt()
835  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
836 
837  StatusCode updateSc ( m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::SCHEDULED) );
838 
839  if (updateSc.isSuccess())
840  if (msgLevel(MSG::DEBUG))
841  debug() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED" << endmsg;
842  return updateSc;
843  } else {
844  if (msgLevel(MSG::DEBUG))
845  debug() << "Could not acquire instance for algorithm " << index2algname(iAlgo) << " on slot " << si << endmsg;
846  return sc;
847  }
848 
849 }
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend class AlgoExecutionTask
StatusCode ForwardSchedulerSvc::pushNewEvent ( EventContext eventContext)
virtual

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 324 of file ForwardSchedulerSvc.cpp.

324  {
325 
326  if (!eventContext){
327  fatal() << "Event context is nullptr" << endmsg;
328  return StatusCode::FAILURE;
329  }
330 
331  if (m_freeSlots.load() == 0) {
332  if (msgLevel(MSG::DEBUG))
333  debug() << "A free processing slot could not be found." << endmsg;
334  return StatusCode::FAILURE;
335  }
336 
337  //no problem as push new event is only called from one thread (event loop manager)
338  m_freeSlots--;
339 
340  auto action = [this,eventContext] () -> StatusCode {
341  // Event processing slot forced to be the same as the wb slot
342  const unsigned int thisSlotNum = eventContext->slot();
343  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
344  if (!thisSlot.complete)
345  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
346  info() << "A free processing slot was found." << endmsg;
347  thisSlot.reset(eventContext);
348  // XXX: CF tests
349  if (m_CFNext) {
350  auto vis = concurrency::Trigger(thisSlotNum);
352  }
353 
354  return this->updateStates(thisSlotNum);
355  }; // end of lambda
356 
357  // Kick off the scheduling!
358  if (msgLevel(MSG::VERBOSE)) {
359  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
360  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
361  }
362  m_actionsQueue.push(action);
363 
364  return StatusCode::SUCCESS;
365 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
A visitor, performing full top-down traversals of a graph.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::function< StatusCode()> action
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
ID_type slot() const
Definition: EventContext.h:38
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
StatusCode ForwardSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
virtual

Definition at line 368 of file ForwardSchedulerSvc.cpp.

368  {
369  StatusCode sc;
370  for (auto context : eventContexts){
371  sc = pushNewEvent(context);
372  if (sc != StatusCode::SUCCESS) return sc;
373  }
374  return sc;
375 }
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode ForwardSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
virtual

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 419 of file ForwardSchedulerSvc.cpp.

419  {
420  if (m_finishedEvents.try_pop(eventContext)) {
421  if (msgLevel(MSG::DEBUG))
422  debug() << "Try Pop successful slot " << eventContext->slot()
423  << "(event " << eventContext->evt() << ")" << endmsg;
424  m_freeSlots++;
425  return StatusCode::SUCCESS;
426  }
427  return StatusCode::FAILURE;
428 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
long int evt() const
Definition: EventContext.h:37
ID_type slot() const
Definition: EventContext.h:38
StatusCode ForwardSchedulerSvc::updateStates ( int  si = -1,
const std::string &  algo_name = std::string() 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 means all slots, while empty string means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 474 of file ForwardSchedulerSvc.cpp.

474  {
475 
476  m_updateNeeded=true;
477 
478  // Fill a map of initial state / action using closures.
479  // done to update the states w/o several if/elses
480  // Posterchild for constexpr with gcc4.7 onwards!
481  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
482  statesTransitions = {
483  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
484  this,
485  std::placeholders::_1,
486  std::placeholders::_2)},
487  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
488  this,
489  std::placeholders::_1,
490  std::placeholders::_2)}
491  };*/
492 
493  StatusCode global_sc(StatusCode::FAILURE);
494  StatusCode partial_sc;
495 
496  // Sort from the oldest to the newest event
497  // Prepare a vector of pointers to the slots to avoid copies
498  std::vector<EventSlot*> eventSlotsPtrs;
499 
500  // Consider all slots if si <0 or just one otherwise
501  if (si<0) {
502  const int eventsSlotsSize(m_eventSlots.size());
503  eventSlotsPtrs.reserve(eventsSlotsSize);
504  for (auto slotIt=m_eventSlots.begin();slotIt!=m_eventSlots.end();slotIt++) {
505  if (!slotIt->complete)
506  eventSlotsPtrs.push_back(&(*slotIt));
507  }
508  std::sort(eventSlotsPtrs.begin(),
509  eventSlotsPtrs.end(),
510  [](EventSlot* a, EventSlot* b) {return a->eventContext->evt() < b->eventContext->evt();});
511  } else {
512  eventSlotsPtrs.push_back(&m_eventSlots[si]);
513  }
514 
515  for (EventSlot* thisSlotPtr : eventSlotsPtrs) {
516  int iSlot = thisSlotPtr->eventContext->slot();
517 
518  // Cache the states of the algos to improve readability and performance
519  auto& thisSlot = m_eventSlots[iSlot];
520  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
521 
522  // Take care of the control ready update
523  // XXX: CF tests
524  if (!m_CFNext) {
525  m_efManager.updateEventState(thisAlgsStates,thisSlot.controlFlowState);
526  } else {
527  if (!algo_name.empty())
528  m_efManager.updateDecision(algo_name,iSlot,thisAlgsStates,thisSlot.controlFlowState);
529  }
530 
531 
532  //DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
533  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
534  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
535  if (algState==AlgsExecutionStates::ERROR)
536  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
537  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
538  partial_sc=StatusCode::SUCCESS;
539  for (auto state_transition = statesTransitions.find(algState);
540  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
541  state_transition++){
542  partial_sc = state_transition->second(iAlgo,iSlot);
543  if (partial_sc.isFailure()){
544  debug() << "Could not apply transition from "
545  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
546  << " for algorithm " << index2algname(iAlgo)
547  << " on processing slot " << iSlot << endmsg;
548  }
549  else{global_sc=partial_sc;}
550  } // end loop on transitions
551  }*/ // end loop on algos
552 
553 
554  StatusCode partial_sc;
555  //first update CONTROLREADY to DATAREADY
556  if (!m_CFNext) {
557  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::CONTROLREADY);
558  it != thisAlgsStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
559 
560  uint algIndex = *it;
561  partial_sc = promoteToDataReady(algIndex, iSlot);
562  if (partial_sc.isFailure())
563  if (msgLevel(MSG::DEBUG))
564  debug() << "Could not apply transition from "
565  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
566  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
567  }
568  }
569 
570  //now update DATAREADY to SCHEDULED
571  if (!m_optimizationMode.empty()) {
572  auto comp_nodes = [this] (const uint& i,const uint& j) {
575  };
576  std::priority_queue<uint,std::vector<uint>,std::function<bool(const uint&,const uint&)>> buffer(comp_nodes,std::vector<uint>());
577  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
578  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it)
579  buffer.push(*it);
580  /*std::stringstream s;
581  auto buffer2 = buffer;
582  while (!buffer2.empty()) {
583  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
584  buffer2.pop();
585  }
586  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
587 
588  while (!buffer.empty()) {
589  partial_sc = promoteToScheduled(buffer.top(), iSlot);
590  if (partial_sc.isFailure())
591  if (msgLevel(MSG::DEBUG))
592  debug() << "Could not apply transition from "
593  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
594  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
595  buffer.pop();
596  }
597 
598  } else {
599  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
600  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it) {
601  uint algIndex = *it;
602  partial_sc = promoteToScheduled(algIndex, iSlot);
603  if (partial_sc.isFailure())
604  if (msgLevel(MSG::DEBUG))
605  debug() << "Could not apply transition from "
606  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
607  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
608 
609  }
610  }
611 
613  std::stringstream s;
614  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY)
615  << ", " << thisAlgsStates.sizeOfSubset(State::DATAREADY)
616  << ", " << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << "\n";
617  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
618  : std::to_string(tbb::task_scheduler_init::default_num_threads());
619  std::ofstream myfile;
620  myfile.open("IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app);
621  myfile << s.str();
622  myfile.close();
623  }
624 
625 
626  // Not complete because this would mean that the slot is already free!
627  if (!thisSlot.complete &&
628  m_efManager.rootDecisionResolved(thisSlot.controlFlowState) &&
629  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::CONTROLREADY) &&
630  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::DATAREADY) &&
631  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::SCHEDULED)) {
632 
633  thisSlot.complete=true;
634  // if the event did not fail, add it to the finished events
635  // otherwise it is taken care of in the error handling already
636  if (!thisSlot.eventContext->evtFail()) {
637  m_finishedEvents.push(thisSlot.eventContext);
638  if (msgLevel(MSG::DEBUG))
639  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
640  << thisSlot.eventContext->slot() << ")." << endmsg;
641  }
642  // now let's return the fully evaluated result of the control flow
643  if (msgLevel(MSG::DEBUG)) {
644  std::stringstream ss;
645  m_efManager.printEventState(ss, thisSlot.algsStates, thisSlot.controlFlowState,0);
646  debug() << ss.str() << endmsg;
647  }
648 
649  thisSlot.eventContext= nullptr;
650  } else {
651  StatusCode eventStalledSC = isStalled(iSlot);
652  if (! eventStalledSC.isSuccess())
653  eventFailed(thisSlot.eventContext);
654  }
655  } // end loop on slots
656 
657  verbose() << "States Updated." << endmsg;
658 
659  return global_sc;
660 }
string to_string(const T &value)
Definition: mergesort.cpp:40
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
size_t sizeOfSubset(State state) const
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
long int evt() const
Definition: EventContext.h:37
bool m_updateNeeded
Keep track of update actions scheduled.
Iterator begin(State kind)
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
list i
Definition: ana.py:128
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static std::map< State, std::string > stateNames
Iterator end(State kind)

Friends And Related Function Documentation

friend class AlgoExecutionTask
friend

Definition at line 214 of file ForwardSchedulerSvc.h.

Member Data Documentation

tbb::concurrent_bounded_queue<action> ForwardSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 198 of file ForwardSchedulerSvc.h.

std::unordered_map<std::string,unsigned int> ForwardSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 122 of file ForwardSchedulerSvc.h.

std::vector<std::string> ForwardSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 128 of file ForwardSchedulerSvc.h.

std::vector<std::vector<std::string> > ForwardSchedulerSvc::m_algosDependencies
private

Ugly, will disappear when the deps are declared only within the C++ code of the algos.

Definition at line 185 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_algosInFlight
private

Number of algoritms presently in flight.

Definition at line 158 of file ForwardSchedulerSvc.h.

SmartIF<IAlgResourcePool> ForwardSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 182 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_CFNext
private

Definition at line 203 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_DFNext
private

Definition at line 205 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_dumpIntraEventDynamics
private

Definition at line 211 of file ForwardSchedulerSvc.h.

concurrency::ExecutionFlowManager ForwardSchedulerSvc::m_efManager
private

Member to take care of the control flow.

Definition at line 201 of file ForwardSchedulerSvc.h.

std::vector<EventSlot> ForwardSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 137 of file ForwardSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ForwardSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 146 of file ForwardSchedulerSvc.h.

std::atomic_int ForwardSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 143 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_isActive
private

Flag to track if the scheduler is active or not.

Definition at line 113 of file ForwardSchedulerSvc.h.

unsigned int ForwardSchedulerSvc::m_maxAlgosInFlight
private

Maximum number of simultaneous algorithms.

Definition at line 155 of file ForwardSchedulerSvc.h.

int ForwardSchedulerSvc::m_maxEventsInFlight
private

Maximum number of event processed simultaneously.

Definition at line 140 of file ForwardSchedulerSvc.h.

std::string ForwardSchedulerSvc::m_optimizationMode
private

Definition at line 209 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_simulateExecution
private

Definition at line 207 of file ForwardSchedulerSvc.h.

std::thread ForwardSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 116 of file ForwardSchedulerSvc.h.

int ForwardSchedulerSvc::m_threadPoolSize
private

Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose.

Definition at line 191 of file ForwardSchedulerSvc.h.

bool ForwardSchedulerSvc::m_updateNeeded
private

Keep track of update actions scheduled.

Definition at line 178 of file ForwardSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ForwardSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 131 of file ForwardSchedulerSvc.h.

std::string ForwardSchedulerSvc::m_whiteboardSvcName
private

The whiteboard name.

Definition at line 134 of file ForwardSchedulerSvc.h.


The documentation for this class was generated from the following files: