All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
2 #include "GaudiKernel/SvcFactory.h"
3 #include "GaudiKernel/IAlgorithm.h"
4 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
5 #include <GaudiAlg/GaudiAlgorithm.h>
6 #include <GaudiKernel/IDataManagerSvc.h>
7 #include "tbb/task.h"
8 
9 // C++
10 #include <unordered_set>
11 #include <algorithm>
12 #include <map>
13 #include <sstream>
14 #include <queue>
15 
16 // Local
17 #include "ForwardSchedulerSvc.h"
18 #include "AlgoExecutionTask.h"
19 #include "AlgResourcePool.h"
20 #include "EFGraphVisitors.h"
21 
22 // External libs
23 // DP waiting for the TBB service
24 #include "tbb/task_scheduler_init.h"
25 
26 // Instantiation of a static factory class used by clients to create instances of this service
28 
29 //===========================================================================
30 // Infrastructure methods
31 
33  base_class(name,svcLoc),
34  m_isActive(false),
35  m_algosInFlight(0),
36  m_updateNeeded(true)
37 {
38  declareProperty("MaxEventsInFlight", m_maxEventsInFlight = 0 );
39  declareProperty("ThreadPoolSize", m_threadPoolSize = -1 );
40  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc" );
41  // Will disappear when dependencies are properly propagated into the C++ code of the algos
42  declareProperty("AlgosDependencies", m_algosDependencies);
43  declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 0, "Taken from the whiteboard. Deprecated" );
44  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
45  declareProperty("useGraphFlowManagement", m_CFNext = false );
46  declareProperty("DataFlowManagerNext", m_DFNext = false );
47  declareProperty("SimulateExecution", m_simulateExecution = false );
48  declareProperty("Optimizer", m_optimizationMode = "", "The following modes are currently available: PCE, COD, DRE, E" );
49  declareProperty("DumpIntraEventDynamics", m_dumpIntraEventDynamics = false, "Dump intra-event concurrency dynamics to csv file" );
50 }
51 
52 //---------------------------------------------------------------------------
54 //---------------------------------------------------------------------------
55 
62 
63  // Initialise mother class (read properties, ...)
65  if (!sc.isSuccess())
66  warning () << "Base class could not be initialized" << endmsg;
67 
68  // Get the algo resource pool
69  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
71  error() << "Error retrieving AlgoResourcePool" << endmsg;
72 
73  // Get Whiteboard
74  m_whiteboard = serviceLocator()->service(m_whiteboardSvcName);
75  if (!m_whiteboard.isValid())
76  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
77 
78  // Check the MaxEventsInFlight parameters and react
79  // Deprecated for the moment
80  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
81  if (m_maxEventsInFlight!=0){
82  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
83  << "Please migrate your code options files." << endmsg;
84 
85  if (m_maxEventsInFlight != (int)numberOfWBSlots){
86  warning() << "In addition, the number of events in flight ("
87  << m_maxEventsInFlight << ") differs from the slots in the whiteboard ("
88  << numberOfWBSlots << "). Setting the number of events in flight to "
89  << numberOfWBSlots << endmsg;
90  }
91  }
92 
93  // Align the two quantities
94  m_maxEventsInFlight = numberOfWBSlots;
95 
96  // Set the number of free slots
98 
99  // Get the list of algorithms
100  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
101  const unsigned int algsNumber = algos.size();
102  info() << "Found " << algsNumber << " algorithms" << endmsg;
103 
104  const unsigned int algosDependenciesSize=m_algosDependencies.size();
105  info() << "Algodependecies size is " << algosDependenciesSize << endmsg;
106 
107  /* Dependencies
108  0) Read deps from config file
109  1) Look for handles in algo, if none
110  2) Assume none are required
111  */
112  if (algosDependenciesSize == 0) {
113  for (IAlgorithm* ialgoPtr : algos) {
114  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
115  if (nullptr == algoPtr)
116  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
117 
118 #pragma GCC diagnostic push
119 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
120  const std::vector<MinimalDataObjectHandle*>& algoHandles(algoPtr->handles());
121 #pragma GCC diagnostic pop
122  std::vector<std::string> algoDependencies;
123  if (!algoHandles.empty()) {
124  info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg;
125  for (MinimalDataObjectHandle* handlePtr : algoHandles) {
126  if (handlePtr->isValid()) {
127  if (handlePtr->accessType() == MinimalDataObjectHandle::AccessType::READ) {
128  const std::string& productName = handlePtr->dataProductName();
129  info() << " o READ Handle found for product " << productName << endmsg;
130  algoDependencies.emplace_back(productName);
131 
132  //just for info output alternative locations
133  if (handlePtr->alternativeDataProductNames().size() != 0) {
134  info() << "\t\t alternative locations";
135  for (auto s : handlePtr->alternativeDataProductNames())
136  info() << " " << s;
137  info() << endmsg;
138  }
139  } else {
140  //output WRITE handles just for info
141  info() << " o WRITE Handle found for product " << handlePtr->dataProductName() << endmsg;
142  }
143  }
144  }
145  } else {
146  info() << "Algorithm " << algoPtr->name() << " has no data dependencies." << endmsg;
147  }
148  m_algosDependencies.emplace_back(algoDependencies);
149  }
150  } else {
151  if (algsNumber != algosDependenciesSize){
152  error() << "number of Algorithms is different from size of Data Dependency list!" << endmsg;
153  return StatusCode::FAILURE;
154  }
155  }
156 
157  // Fill the containers to convert algo names to index
158  m_algname_vect.reserve(algsNumber);
159  unsigned int index=0;
160  for (IAlgorithm* algo : algos){
161  const std::string& name = algo->name();
162  m_algname_index_map[name]=index;
163  m_algname_vect.emplace_back(name);
164  index++;
165  }
166 
167  // prepare the control flow part
168  if (m_CFNext) m_DFNext = true; //force usage of new data flow machinery when new control flow is used
169  if (!m_CFNext && !m_optimizationMode.empty()) {
170  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
171  return StatusCode::FAILURE;
172  }
173  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
175  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
176  // Shortcut for the message service
177  SmartIF<IMessageSvc> messageSvc (serviceLocator());
178  if (!messageSvc.isValid())
179  error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
180 
181  m_eventSlots.assign(m_maxEventsInFlight,EventSlot(m_algosDependencies,algsNumber,controlFlowNodeNumber,messageSvc));
182  std::for_each(m_eventSlots.begin(),m_eventSlots.end(),[](EventSlot& slot){slot.complete=true;});
183 
184  // Clearly inform about the level of concurrency
185  info() << "Concurrency level information:" << endmsg;
186  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
187  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
188  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
189 
190  // Simulating execution flow by only analyzing the graph topology and logic
191  if (m_simulateExecution) {
192  auto vis = concurrency::RunSimulator(0);
194  }
195 
196  // Activate the scheduler in another thread.
197  info() << "Activating scheduler in a separate thread" << endmsg;
198  m_thread = std::thread (std::bind(&ForwardSchedulerSvc::activate,
199  this));
200 
201  return sc;
202 
203 }
204 //---------------------------------------------------------------------------
205 
210 
212  if (!sc.isSuccess())
213  warning () << "Base class could not be finalized" << endmsg;
214 
215  sc = deactivate();
216  if (!sc.isSuccess())
217  warning () << "Scheduler could not be deactivated" << endmsg;
218 
219  info() << "Joining Scheduler thread" << endmsg;
220  m_thread.join();
221 
222  //m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
223 
224  return sc;
225 
226  }
227 //---------------------------------------------------------------------------
239 
240  // Now it's running
241  m_isActive=true;
242 
244  tbb::task_scheduler_init* TBBSchedInit = nullptr;
245 
246  // -100 prevents the creation of the pool and the scheduler directly executes
247  // the tasks.
248  if (-100 != m_threadPoolSize) {
249  debug() << "Initialising a TBB thread pool of requested size " << m_threadPoolSize << endmsg;
250  // Leave -1 in case selected, increment otherwise
251  int thePoolSize = m_threadPoolSize;
252  if (thePoolSize == -1)
253  debug() << "...default TBB thread pool size amounts to " << tbb::task_scheduler_init::default_num_threads()<< endmsg;
254  if (thePoolSize != -1)
255  thePoolSize += 1;
256  TBBSchedInit = new tbb::task_scheduler_init (thePoolSize);
257  } else {
258  debug() << "Thread pool size is one. Pool not initialised." << endmsg;
259  }
260  // Wait for actions pushed into the queue by finishing tasks.
261  action thisAction;
263 
264  // Continue to wait if the scheduler is running or there is something to do
265  info() << "Start checking the actionsQueue" << endmsg;
266  while(m_isActive or m_actionsQueue.size()!=0){
267  m_actionsQueue.pop(thisAction);
268  sc = thisAction();
269  if (sc!=StatusCode::SUCCESS)
270  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
271  else
272  verbose() << "Action succeeded." << endmsg;
273  }
274 
275  if (TBBSchedInit)
276  delete TBBSchedInit;
277 }
278 
279 //---------------------------------------------------------------------------
280 
288 
289  if (m_isActive){
290  // Drain the scheduler
292  this));
293  // This would be the last action
294  m_actionsQueue.push([this]() -> StatusCode {m_isActive=false;return StatusCode::SUCCESS;});
295  }
296 
297  return StatusCode::SUCCESS;
298 }
299 
300 //===========================================================================
301 
302 //===========================================================================
303 // Utils and shortcuts
304 
305 inline const std::string& ForwardSchedulerSvc::index2algname (unsigned int index) {
306  return m_algname_vect[index];
307 }
308 
309 //---------------------------------------------------------------------------
310 
311 inline unsigned int ForwardSchedulerSvc::algname2index(const std::string& algoname) {
312  unsigned int index = m_algname_index_map[algoname];
313  return index;
314 }
315 
316 //===========================================================================
317 // EventSlot management
325 
326  if (!eventContext){
327  fatal() << "Event context is nullptr" << endmsg;
328  return StatusCode::FAILURE;
329  }
330 
331  if (m_freeSlots.load() == 0) {
332  if (msgLevel(MSG::DEBUG))
333  debug() << "A free processing slot could not be found." << endmsg;
334  return StatusCode::FAILURE;
335  }
336 
337  //no problem as push new event is only called from one thread (event loop manager)
338  m_freeSlots--;
339 
340  auto action = [this,eventContext] () -> StatusCode {
341  // Event processing slot forced to be the same as the wb slot
342  const unsigned int thisSlotNum = eventContext->slot();
343  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
344  if (!thisSlot.complete)
345  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
346  info() << "A free processing slot was found." << endmsg;
347  thisSlot.reset(eventContext);
348  // XXX: CF tests
349  if (m_CFNext) {
350  auto vis = concurrency::Trigger(thisSlotNum);
352  }
353 
354  return this->updateStates(thisSlotNum);
355  }; // end of lambda
356 
357  // Kick off the scheduling!
358  if (msgLevel(MSG::VERBOSE)) {
359  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
360  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
361  }
362  m_actionsQueue.push(action);
363 
364  return StatusCode::SUCCESS;
365 }
366 
367 //---------------------------------------------------------------------------
368 StatusCode ForwardSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){
369  StatusCode sc;
370  for (auto context : eventContexts){
371  sc = pushNewEvent(context);
372  if (sc != StatusCode::SUCCESS) return sc;
373  }
374  return sc;
375 }
376 
377 //---------------------------------------------------------------------------
379  return std::max(m_freeSlots.load(),0);
380 }
381 
382 //---------------------------------------------------------------------------
387 
388  unsigned int slotNum=0;
389  for (auto& thisSlot : m_eventSlots){
390  if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
391  updateStates(slotNum);
392  }
393  slotNum++;
394  }
395  return StatusCode::SUCCESS;
396 }
397 
398 //---------------------------------------------------------------------------
403  if (m_freeSlots.load() == m_maxEventsInFlight or
404  !m_isActive) {
405  return StatusCode::FAILURE;
406  } else {
407  m_finishedEvents.pop(eventContext);
408  m_freeSlots++;
409  debug() << "Popped slot " << eventContext->slot() << "(event "
410  << eventContext->evt() << ")" << endmsg;
411  return StatusCode::SUCCESS;
412  }
413 }
414 
415 //---------------------------------------------------------------------------
420  if (m_finishedEvents.try_pop(eventContext)) {
421  if (msgLevel(MSG::DEBUG))
422  debug() << "Try Pop successful slot " << eventContext->slot()
423  << "(event " << eventContext->evt() << ")" << endmsg;
424  m_freeSlots++;
425  return StatusCode::SUCCESS;
426  }
427  return StatusCode::FAILURE;
428 }
429 
430 //---------------------------------------------------------------------------
437 
438  // Set the number of slots available to an error code
439  m_freeSlots.store(0);
440 
441  fatal() << "*** Event " << eventContext->evt() << " on slot "
442  << eventContext->slot() << " failed! ***" << endmsg;
443 
444  //dumpSchedulerState(-1);
445 
446  // Empty queue and deactivate the service
447  action thisAction;
448  while(m_actionsQueue.try_pop(thisAction)){};
449  deactivate();
450 
451  // Push into the finished events queue the failed context
452  EventContext* thisEvtContext;
453  while(m_finishedEvents.try_pop(thisEvtContext)) { m_finishedEvents.push(thisEvtContext); };
454  m_finishedEvents.push(eventContext);
455 
456  return StatusCode::FAILURE;
457 
458 }
459 
460 //===========================================================================
461 
462 //===========================================================================
463 // States Management
464 
474 StatusCode ForwardSchedulerSvc::updateStates(int si, const std::string& algo_name){
475 
476  m_updateNeeded=true;
477 
478  // Fill a map of initial state / action using closures.
479  // done to update the states w/o several if/elses
480  // Posterchild for constexpr with gcc4.7 onwards!
481  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
482  statesTransitions = {
483  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
484  this,
485  std::placeholders::_1,
486  std::placeholders::_2)},
487  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
488  this,
489  std::placeholders::_1,
490  std::placeholders::_2)}
491  };*/
492 
493  StatusCode global_sc(StatusCode::FAILURE);
494  StatusCode partial_sc;
495 
496  // Sort from the oldest to the newest event
497  // Prepare a vector of pointers to the slots to avoid copies
498  std::vector<EventSlot*> eventSlotsPtrs;
499 
500  // Consider all slots if si <0 or just one otherwise
501  if (si<0) {
502  const int eventsSlotsSize(m_eventSlots.size());
503  eventSlotsPtrs.reserve(eventsSlotsSize);
504  for (auto slotIt=m_eventSlots.begin();slotIt!=m_eventSlots.end();slotIt++) {
505  if (!slotIt->complete)
506  eventSlotsPtrs.push_back(&(*slotIt));
507  }
508  std::sort(eventSlotsPtrs.begin(),
509  eventSlotsPtrs.end(),
510  [](EventSlot* a, EventSlot* b) {return a->eventContext->evt() < b->eventContext->evt();});
511  } else {
512  eventSlotsPtrs.push_back(&m_eventSlots[si]);
513  }
514 
515  for (EventSlot* thisSlotPtr : eventSlotsPtrs) {
516  int iSlot = thisSlotPtr->eventContext->slot();
517 
518  // Cache the states of the algos to improve readability and performance
519  auto& thisSlot = m_eventSlots[iSlot];
520  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
521 
522  // Take care of the control ready update
523  // XXX: CF tests
524  if (!m_CFNext) {
525  m_efManager.updateEventState(thisAlgsStates,thisSlot.controlFlowState);
526  } else {
527  if (!algo_name.empty())
528  m_efManager.updateDecision(algo_name,iSlot,thisAlgsStates,thisSlot.controlFlowState);
529  }
530 
531 
532  //DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
533  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
534  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
535  if (algState==AlgsExecutionStates::ERROR)
536  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
537  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
538  partial_sc=StatusCode::SUCCESS;
539  for (auto state_transition = statesTransitions.find(algState);
540  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
541  state_transition++){
542  partial_sc = state_transition->second(iAlgo,iSlot);
543  if (partial_sc.isFailure()){
544  debug() << "Could not apply transition from "
545  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
546  << " for algorithm " << index2algname(iAlgo)
547  << " on processing slot " << iSlot << endmsg;
548  }
549  else{global_sc=partial_sc;}
550  } // end loop on transitions
551  }*/ // end loop on algos
552 
553 
554  StatusCode partial_sc;
555  //first update CONTROLREADY to DATAREADY
556  if (!m_CFNext) {
557  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::CONTROLREADY);
558  it != thisAlgsStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
559 
560  uint algIndex = *it;
561  partial_sc = promoteToDataReady(algIndex, iSlot);
562  if (partial_sc.isFailure())
563  if (msgLevel(MSG::DEBUG))
564  debug() << "Could not apply transition from "
565  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
566  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
567  }
568  }
569 
570  //now update DATAREADY to SCHEDULED
571  if (!m_optimizationMode.empty()) {
572  auto comp_nodes = [this] (const uint& i,const uint& j) {
575  };
576  std::priority_queue<uint,std::vector<uint>,std::function<bool(const uint&,const uint&)>> buffer(comp_nodes,std::vector<uint>());
577  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
578  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it)
579  buffer.push(*it);
580  /*std::stringstream s;
581  auto buffer2 = buffer;
582  while (!buffer2.empty()) {
583  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
584  buffer2.pop();
585  }
586  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
587 
588  while (!buffer.empty()) {
589  partial_sc = promoteToScheduled(buffer.top(), iSlot);
590  if (partial_sc.isFailure())
591  if (msgLevel(MSG::DEBUG))
592  debug() << "Could not apply transition from "
593  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
594  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
595  buffer.pop();
596  }
597 
598  } else {
599  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
600  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it) {
601  uint algIndex = *it;
602  partial_sc = promoteToScheduled(algIndex, iSlot);
603  if (partial_sc.isFailure())
604  if (msgLevel(MSG::DEBUG))
605  debug() << "Could not apply transition from "
606  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
607  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
608 
609  }
610  }
611 
613  std::stringstream s;
614  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY)
615  << ", " << thisAlgsStates.sizeOfSubset(State::DATAREADY)
616  << ", " << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << "\n";
617  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
618  : std::to_string(tbb::task_scheduler_init::default_num_threads());
619  std::ofstream myfile;
620  myfile.open("IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app);
621  myfile << s.str();
622  myfile.close();
623  }
624 
625 
626  // Not complete because this would mean that the slot is already free!
627  if (!thisSlot.complete &&
628  m_efManager.rootDecisionResolved(thisSlot.controlFlowState) &&
629  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::CONTROLREADY) &&
630  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::DATAREADY) &&
631  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::SCHEDULED)) {
632 
633  thisSlot.complete=true;
634  // if the event did not fail, add it to the finished events
635  // otherwise it is taken care of in the error handling already
636  if (!thisSlot.eventContext->evtFail()) {
637  m_finishedEvents.push(thisSlot.eventContext);
638  if (msgLevel(MSG::DEBUG))
639  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
640  << thisSlot.eventContext->slot() << ")." << endmsg;
641  }
642  // now let's return the fully evaluated result of the control flow
643  if (msgLevel(MSG::DEBUG)) {
644  std::stringstream ss;
645  m_efManager.printEventState(ss, thisSlot.algsStates, thisSlot.controlFlowState,0);
646  debug() << ss.str() << endmsg;
647  }
648 
649  thisSlot.eventContext= nullptr;
650  } else {
651  StatusCode eventStalledSC = isStalled(iSlot);
652  if (! eventStalledSC.isSuccess())
653  eventFailed(thisSlot.eventContext);
654  }
655  } // end loop on slots
656 
657  verbose() << "States Updated." << endmsg;
658 
659  return global_sc;
660 }
661 
662 //---------------------------------------------------------------------------
663 
671  // Get the slot
672  EventSlot& thisSlot = m_eventSlots[iSlot];
673 
674  if (m_actionsQueue.empty() &&
675  m_algosInFlight == 0 &&
677 
678  info() << "About to declare a stall" << endmsg;
679  fatal() << "*** Stall detected! ***\n" << endmsg;
680  dumpSchedulerState(iSlot);
681  //throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
682 
683  return StatusCode::FAILURE;
684  }
685  return StatusCode::SUCCESS;
686 }
687 
688 //---------------------------------------------------------------------------
689 
696 
697  // To have just one big message
698  std::stringstream outputMessageStream;
699 
700  int slotCount = -1;
701  for (auto thisSlot : m_eventSlots){
702  slotCount++;
703  outputMessageStream.str(std::string());
704  if ( thisSlot.complete )
705  continue;
706 
707  outputMessageStream << "Dump of Scheduler State for slot " << thisSlot.eventContext->evt() << std::endl;
708 
709  if ( 0 > iSlot or iSlot == slotCount) {
710  outputMessageStream << "Algorithms states for event " << thisSlot.eventContext->evt() << std::endl;
711 
712  const std::vector<std::string>& wbSlotContent ( thisSlot.dataFlowMgr.content() );
713  for (unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
714  outputMessageStream << " o " << index2algname(algoIdx)
715  << " was in state " << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]]
716  << ". Its data dependencies are ";
717  std::vector<std::string> deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
718  const int depsSize=deps.size();
719  if (depsSize==0)
720  outputMessageStream << " none.";
721 
722  for (int i=0;i<depsSize;++i)
723  outputMessageStream << deps[i] << (i==(depsSize-1) ? "" :", ");
724 
725  // Now list what dependencies were available
726  // With std::algorithms, move the ones which are missing at the beginning of the vector
727  std::vector<std::string>::iterator missinngDepsEndIt =
728  std::remove_if(deps.begin(), // from the beginning of the deps
729  deps.end(), // to their end
730  [&wbSlotContent] (std::string dep) { // remove if this lambda returns true
731  return std::count(wbSlotContent.begin(),wbSlotContent.end(),dep)!=0; //look for dep in wb content
732  });
733 
734  if (deps.begin() != missinngDepsEndIt) {
735  outputMessageStream << ". The following are missing: ";
736  for (std::vector<std::string>::iterator missingDep=deps.begin();missingDep!=missinngDepsEndIt;++missingDep)
737  outputMessageStream << *missingDep << (missingDep==(missinngDepsEndIt-1)?"":", ");
738  }
739 
740  outputMessageStream << std::endl;
741  }
742 
743  fatal() << outputMessageStream.str() << endmsg;
744  outputMessageStream.str(std::string());
745 
746  // Snapshot of the WhiteBoard
747  outputMessageStream << "The content of the whiteboard for this event was:\n";
748  for (auto& product : wbSlotContent )
749  outputMessageStream << " o " << product << std::endl;
750 
751  fatal() << outputMessageStream.str()<< endmsg;
752  outputMessageStream.str(std::string());
753 
754  // Snapshot of the ControlFlow
755  outputMessageStream << "The status of the control flow for this event was:\n";
756  std::stringstream cFlowStateStringStream;
757  m_efManager.printEventState(cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState,0);
758 
759  outputMessageStream << cFlowStateStringStream.str();
760 
761  fatal() << outputMessageStream.str() << endmsg;
762  }
763  }
764 
765 }
766 
767 //---------------------------------------------------------------------------
768 
770 
771  // Do the control flow
772  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
773  if (sc.isSuccess())
774  if (msgLevel(MSG::DEBUG))
775  debug() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY" << endmsg;
776 
777  return sc;
778 
779 }
780 
781 //---------------------------------------------------------------------------
782 
784 
785  StatusCode sc;
786  if (!m_DFNext) {
787  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
788  } else {
790  }
791 
793  if (sc == StatusCode::SUCCESS)
794  updateSc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::DATAREADY);
795 
796  if (updateSc.isSuccess())
797  if (msgLevel(MSG::DEBUG))
798  debug() << "Promoting " << index2algname(iAlgo) << " to DATAREADY" << endmsg;
799 
800  return updateSc;
801 
802 }
803 
804 //---------------------------------------------------------------------------
805 
807 
809  return StatusCode::FAILURE;
810 
811  const std::string& algName(index2algname(iAlgo));
812 
813  IAlgorithm* ialgoPtr=nullptr;
814  StatusCode sc ( m_algResourcePool->acquireAlgorithm(algName,ialgoPtr) );
815 
816  if (sc.isSuccess()) {
817  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
818  EventContext* eventContext ( m_eventSlots[si].eventContext );
819  if (!eventContext)
820  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si<< ")" << endmsg;
821 
822  algoPtr->setContext(m_eventSlots[si].eventContext);
823  ++m_algosInFlight;
824  // Avoid to use tbb if the pool size is 1 and run in this thread
825  if (-100 != m_threadPoolSize) {
826  tbb::task* t = new( tbb::task::allocate_root() ) AlgoExecutionTask(ialgoPtr, iAlgo, serviceLocator(), this);
827  tbb::task::enqueue( *t);
828  } else {
829  AlgoExecutionTask theTask(ialgoPtr, iAlgo, serviceLocator(), this);
830  theTask.execute();
831  }
832 
833  if (msgLevel(MSG::DEBUG))
834  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt()
835  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
836 
837  StatusCode updateSc ( m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::SCHEDULED) );
838 
839  if (updateSc.isSuccess())
840  if (msgLevel(MSG::DEBUG))
841  debug() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED" << endmsg;
842  return updateSc;
843  } else {
844  if (msgLevel(MSG::DEBUG))
845  debug() << "Could not acquire instance for algorithm " << index2algname(iAlgo) << " on slot " << si << endmsg;
846  return sc;
847  }
848 
849 }
850 
851 //---------------------------------------------------------------------------
856 
857  // Put back the instance
858  Algorithm* castedAlgo = dynamic_cast<Algorithm*>(algo); // DP: expose context getter in IAlgo?
859  if (!castedAlgo)
860  fatal() << "The casting did not succeed!" << endmsg;
861  EventContext* eventContext = castedAlgo->getContext();
862 
863  // Check if the execution failed
864  if (eventContext->evtFail())
865  eventFailed(eventContext);
866 
867  StatusCode sc = m_algResourcePool->releaseAlgorithm(algo->name(),algo);
868 
869  if (!sc.isSuccess()) {
870  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
871  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
872  return StatusCode::FAILURE;
873  }
874 
875  m_algosInFlight--;
876 
877  EventSlot& thisSlot = m_eventSlots[si];
878  // XXX: CF tests
879  if (!m_DFNext) {
880  // Update the catalog: some new products may be there
881  m_whiteboard->selectStore(eventContext->slot()).ignore();
882 
883  // update prods in the dataflow
884  // DP: Handles could be used. Just update what the algo wrote
885  std::vector<std::string> new_products;
886  m_whiteboard->getNewDataObjects(new_products).ignore();
887  for (const auto& new_product : new_products)
888  if (msgLevel(MSG::DEBUG))
889  debug() << "Found in WB: " << new_product << endmsg;
890  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
891  }
892 
893  if (msgLevel(MSG::DEBUG))
894  debug() << "Algorithm " << algo->name() << " executed. Algorithms scheduled are " << m_algosInFlight << endmsg;
895 
896  // Limit number of updates
897  if (m_CFNext) m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
898  if (m_updateNeeded) {
899  // Schedule an update of the status of the algorithms
900  auto updateAction = std::bind(&ForwardSchedulerSvc::updateStates, this, -1, algo->name());
901  m_actionsQueue.push(updateAction);
902  m_updateNeeded = false;
903  }
904 
905  if (msgLevel(MSG::DEBUG))
906  debug() << "Trying to handle execution result of " << index2algname(iAlgo) << "." << endmsg;
907  State state;
908  if (algo->filterPassed()) {
909  state = State::EVTACCEPTED;
910  } else {
911  state = State::EVTREJECTED;
912  }
913 
914  sc = thisSlot.algsStates.updateState(iAlgo,state);
915 
916  if (sc.isSuccess())
917  if (msgLevel(MSG::DEBUG))
918  debug() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
920 
921  return sc;
922 }
923 
924 //===========================================================================
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
void simulateExecutionFlow(IGraphVisitor &visitor) const
virtual StatusCode getNewDataObjects(std::vector< std::string > &products)=0
Get the latest new data objects registred in store.
StatusCode initialize() override
Definition: Service.cpp:63
string to_string(const T &value)
Definition: mergesort.cpp:40
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
virtual StatusCode initialize()
Initialise.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
bool m_isActive
Flag to track if the scheduler is active or not.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
STL namespace.
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual tbb::task * execute()
The SchedulerSvc implements the IScheduler interface.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
bool evtFail() const
Definition: EventContext.h:40
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
void updateDataObjectsCatalog(const std::vector< std::string > &newProducts)
Update the catalog of available products in the slot.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
long int evt() const
Definition: EventContext.h:37
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:354
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
std::function< StatusCode()> action
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
ID_type slot() const
Definition: EventContext.h:38
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
friend class AlgoExecutionTask
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
void ignore() const
Definition: StatusCode.h:108
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
list i
Definition: ana.py:128
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static std::map< State, std::string > stateNames
EventContext * getContext()
get the context
Definition: Algorithm.h:553
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)