ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
4 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
8 #include "tbb/task.h"
9 #include "boost/thread.hpp"
10 
11 // C++
12 #include <unordered_set>
13 #include <algorithm>
14 #include <map>
15 #include <sstream>
16 #include <queue>
17 
18 // Local
19 #include "ForwardSchedulerSvc.h"
20 #include "AlgoExecutionTask.h"
21 #include "AlgResourcePool.h"
22 #include "EFGraphVisitors.h"
23 
26 
27 // External libs
28 // DP waiting for the TBB service
29 #include "tbb/task_scheduler_init.h"
30 
31 // Instantiation of a static factory class used by clients to create instances of this service
33 
34 //===========================================================================
35 // Infrastructure methods
36 
38  base_class(name,svcLoc),
39  m_isActive(INACTIVE),
40  m_algosInFlight(0),
41  m_updateNeeded(true),
42  m_first(true), m_checkDeps(false)
43 
44 {
45  declareProperty("MaxEventsInFlight", m_maxEventsInFlight = 0 );
46  declareProperty("ThreadPoolSize", m_threadPoolSize = -1 );
47  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc" );
48  declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 0, "Taken from the whiteboard. Deprecated" );
49  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
50  declareProperty("useGraphFlowManagement", m_CFNext = false );
51  declareProperty("DataFlowManagerNext", m_DFNext = false );
52  declareProperty("SimulateExecution", m_simulateExecution = false );
53  declareProperty("Optimizer", m_optimizationMode = "",
54  "The following modes are currently available: PCE, COD, DRE, E" );
55  declareProperty("DumpIntraEventDynamics", m_dumpIntraEventDynamics = false,
56  "Dump intra-event concurrency dynamics to csv file" );
57 
59  declareProperty("AlgosDependencies", m_algosDependencies);
60 
61  declareProperty("CheckDependencies", m_checkDeps = false);
62 
63 }
64 
65 //---------------------------------------------------------------------------
67 //---------------------------------------------------------------------------
68 
75 
76  // Initialise mother class (read properties, ...)
78  if (!sc.isSuccess())
79  warning () << "Base class could not be initialized" << endmsg;
80 
81  // Get hold of the TBBSvc. This should initialize the thread pool
82  m_threadPoolSvc = serviceLocator()->service("ThreadPoolSvc");
83  if (!m_threadPoolSvc.isValid()) {
84  fatal() << "Error retrieving ThreadPoolSvc" << endreq;
85  return StatusCode::FAILURE;
86  }
87 
88 
89  // Activate the scheduler in another thread.
90  info() << "Activating scheduler in a separate thread" << endmsg;
92  this));
93 
94  while(m_isActive != ACTIVE) {
95  if (m_isActive == FAILURE) {
96  fatal() << "Terminating initialization" << endmsg;
97  return StatusCode::FAILURE;
98  } else {
99  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
100  sleep(1);
101  }
102  }
103 
104  // Get the algo resource pool
105  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
106  if (!m_algResourcePool.isValid()) {
107  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
108  return StatusCode::FAILURE;
109  }
110 
111  // Get Whiteboard
113  if (!m_whiteboard.isValid()) {
114  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard."
115  << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Check the MaxEventsInFlight parameters and react
120  // Deprecated for the moment
121  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
122  if (m_maxEventsInFlight!=0){
123  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
124  << "Please migrate your code options files." << endmsg;
125 
126  if (m_maxEventsInFlight != (int)numberOfWBSlots){
127  warning() << "In addition, the number of events in flight ("
128  << m_maxEventsInFlight << ") differs from the slots in the whiteboard ("
129  << numberOfWBSlots << "). Setting the number of events in flight to "
130  << numberOfWBSlots << endmsg;
131  }
132  }
133 
134  // Align the two quantities
135  m_maxEventsInFlight = numberOfWBSlots;
136 
137  // Set the number of free slots
139 
140  if (m_algosDependencies.size() != 0) {
141  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
142  << " FIX your job options #####" << endmsg;
143  }
144 
145  // Get the list of algorithms
147  const unsigned int algsNumber = algos.size();
148  info() << "Found " << algsNumber << " algorithms" << endmsg;
149 
150  /* Dependencies
151  1) Look for handles in algo, if none
152  2) Assume none are required
153  */
154 
155  DataObjIDColl globalInp, globalOutp;
156 
157  info() << "Data Dependencies for Algorithms:";
158 
160  for (IAlgorithm* ialgoPtr : algos) {
161  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
162  if (nullptr == algoPtr)
163  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
164 
165  info() << "\n " << algoPtr->name();
166 
167  // FIXME
168  DataObjIDColl algoDependencies;
169  if (!algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty()) {
170  for (auto id : algoPtr->inputDataObjs()) {
171  info() << "\n o INPUT " << id;
172  algoDependencies.insert(id);
173  globalInp.insert(id);
174  }
175  for (auto id : algoPtr->outputDataObjs()) {
176  info() << "\n o OUTPUT " << id;
177  globalOutp.insert(id);
178  }
179  } else {
180  info() << "\n none";
181  }
182  m_algosDependencies.emplace_back(algoDependencies);
183  }
184  info() << endmsg;
185 
186  // Fill the containers to convert algo names to index
187  m_algname_vect.reserve(algsNumber);
188  unsigned int index=0;
189  for (IAlgorithm* algo : algos){
190  const std::string& name = algo->name();
191  m_algname_index_map[name]=index;
193  index++;
194  }
195 
196 
197  // Check if we have unmet global input dependencies
198  if (m_checkDeps) {
199  DataObjIDColl unmetDep;
200  for (auto o : globalInp) {
201  if (globalOutp.find(o) == globalOutp.end()) {
202  unmetDep.insert(o);
203  }
204  }
205 
206  if (unmetDep.size() > 0) {
207  fatal() << "The following unmet INPUT data dependencies were found: ";
208  for (auto &o : unmetDep) {
209  fatal() << "\n o " << o << " required by Algorithm: ";
210  for (size_t i =0; i<m_algosDependencies.size(); ++i) {
211  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
212  fatal() << "\n * " << m_algname_vect[i];
213  }
214  }
215  }
216  fatal() << endmsg;
217  return StatusCode::FAILURE;
218  } else {
219  info() << "No unmet INPUT data dependencies were found" << endmsg;
220  }
221  }
222 
223  // prepare the control flow part
224  if (m_CFNext) m_DFNext = true; //force usage of new data flow machinery when new control flow is used
225  if (!m_CFNext && !m_optimizationMode.empty()) {
226  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
227  return StatusCode::FAILURE;
228  }
229  const AlgResourcePool* algPool =
230  dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
233  unsigned int controlFlowNodeNumber =
235 
236  // Shortcut for the message service
237  SmartIF<IMessageSvc> messageSvc (serviceLocator());
238  if (!messageSvc.isValid())
239  error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
240 
241  m_eventSlots.assign(m_maxEventsInFlight,EventSlot(m_algosDependencies,algsNumber,
242  controlFlowNodeNumber,messageSvc));
243  std::for_each(m_eventSlots.begin(),m_eventSlots.end(),
244  [](EventSlot& slot){slot.complete=true;});
245 
246  // Clearly inform about the level of concurrency
247  info() << "Concurrency level information:" << endmsg;
248  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
249  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
250  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
251 
252  // Simulating execution flow by only analyzing the graph topology and logic
253  if (m_simulateExecution) {
254  auto vis = concurrency::RunSimulator(0);
256  }
257 
258  return sc;
259 
260 }
261 //---------------------------------------------------------------------------
262 
267 
269  if (!sc.isSuccess())
270  warning () << "Base class could not be finalized" << endmsg;
271 
272  sc = deactivate();
273  if (!sc.isSuccess())
274  warning () << "Scheduler could not be deactivated" << endmsg;
275 
276  info() << "Joining Scheduler thread" << endmsg;
277  m_thread.join();
278 
279  //m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
280 
281  return sc;
282 
283  }
284 //---------------------------------------------------------------------------
296 
297  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
298 
300  error() << "problems initializing ThreadPoolSvc" << endmsg;
302  return;
303  }
304 
305 
306  // Wait for actions pushed into the queue by finishing tasks.
307  action thisAction;
309 
310  m_isActive = ACTIVE;
311 
312  // Continue to wait if the scheduler is running or there is something to do
313  info() << "Start checking the actionsQueue" << endmsg;
314  while(m_isActive == ACTIVE or m_actionsQueue.size()!=0){
315  m_actionsQueue.pop(thisAction);
316  sc = thisAction();
317  if (sc!=StatusCode::SUCCESS)
318  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
319  else
320  verbose() << "Action succeeded." << endmsg;
321  }
322 
323 }
324 
325 //---------------------------------------------------------------------------
326 
334 
335  if (m_isActive == ACTIVE){
336  // Drain the scheduler
338  this));
339  // This would be the last action
341  }
342 
343  return StatusCode::SUCCESS;
344 }
345 
346 //===========================================================================
347 
348 //===========================================================================
349 // Utils and shortcuts
350 
351 inline const std::string& ForwardSchedulerSvc::index2algname (unsigned int index) {
352  return m_algname_vect[index];
353 }
354 
355 //---------------------------------------------------------------------------
356 
357 inline unsigned int ForwardSchedulerSvc::algname2index(const std::string& algoname) {
358  unsigned int index = m_algname_index_map[algoname];
359  return index;
360 }
361 
362 //===========================================================================
363 // EventSlot management
371 
372  if (m_first) {
373  m_first = false;
374  }
375 
376  if (!eventContext){
377  fatal() << "Event context is nullptr" << endmsg;
378  return StatusCode::FAILURE;
379  }
380 
381  if (m_freeSlots.load() == 0) {
382  if (msgLevel(MSG::DEBUG))
383  debug() << "A free processing slot could not be found." << endmsg;
384  return StatusCode::FAILURE;
385  }
386 
387  //no problem as push new event is only called from one thread (event loop manager)
388  m_freeSlots--;
389 
390  auto action = [this,eventContext] () -> StatusCode {
391  // Event processing slot forced to be the same as the wb slot
392  const unsigned int thisSlotNum = eventContext->slot();
393  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
394  if (!thisSlot.complete) {
395  fatal() << "The slot " << thisSlotNum
396  << " is supposed to be a finished event but it's not" << endmsg;
397  return StatusCode::FAILURE;
398  }
399 
400  info() << "Executing event " << eventContext->evt() << " on slot "
401  << thisSlotNum << endmsg;
402  thisSlot.reset(eventContext);
403  // XXX: CF tests
404  if (m_CFNext) {
405  auto vis = concurrency::Trigger(thisSlotNum);
407  }
408 
409  return this->updateStates(thisSlotNum);
410  }; // end of lambda
411 
412  // Kick off the scheduling!
413  if (msgLevel(MSG::VERBOSE)) {
414  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
415  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
416  }
417  m_actionsQueue.push(action);
418 
419  return StatusCode::SUCCESS;
420 }
421 
422 //---------------------------------------------------------------------------
424  StatusCode sc;
425  for (auto context : eventContexts){
426  sc = pushNewEvent(context);
427  if (sc != StatusCode::SUCCESS) return sc;
428  }
429  return sc;
430 }
431 
432 //---------------------------------------------------------------------------
434  return std::max(m_freeSlots.load(),0);
435 }
436 
437 //---------------------------------------------------------------------------
442 
443  unsigned int slotNum=0;
444  for (auto& thisSlot : m_eventSlots){
445  if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
446  updateStates(slotNum);
447  }
448  slotNum++;
449  }
450  return StatusCode::SUCCESS;
451 }
452 
453 //---------------------------------------------------------------------------
458  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
459  if (m_freeSlots.load() == m_maxEventsInFlight or
460  m_isActive == INACTIVE) {
461  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
462  // << " active: " << m_isActive << endmsg;
463  return StatusCode::FAILURE;
464  } else {
465  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
466  // << " active: " << m_isActive << endmsg;
467  m_finishedEvents.pop(eventContext);
468  m_freeSlots++;
469  debug() << "Popped slot " << eventContext->slot() << "(event "
470  << eventContext->evt() << ")" << endmsg;
471  return StatusCode::SUCCESS;
472  }
473 }
474 
475 //---------------------------------------------------------------------------
480  if (m_finishedEvents.try_pop(eventContext)) {
481  if (msgLevel(MSG::DEBUG))
482  debug() << "Try Pop successful slot " << eventContext->slot()
483  << "(event " << eventContext->evt() << ")" << endmsg;
484  m_freeSlots++;
485  return StatusCode::SUCCESS;
486  }
487  return StatusCode::FAILURE;
488 }
489 
490 //---------------------------------------------------------------------------
497 
498  // Set the number of slots available to an error code
499  m_freeSlots.store(0);
500 
501  fatal() << "*** Event " << eventContext->evt() << " on slot "
502  << eventContext->slot() << " failed! ***" << endmsg;
503 
504  dumpSchedulerState(-1);
505 
506  // Empty queue and deactivate the service
507  action thisAction;
508  while(m_actionsQueue.try_pop(thisAction)){};
509  deactivate();
510 
511  // Push into the finished events queue the failed context
512  EventContext* thisEvtContext;
513  while(m_finishedEvents.try_pop(thisEvtContext)) { m_finishedEvents.push(thisEvtContext); };
514  m_finishedEvents.push(eventContext);
515 
516  return StatusCode::FAILURE;
517 
518 }
519 
520 //===========================================================================
521 
522 //===========================================================================
523 // States Management
524 
535 
536  m_updateNeeded=true;
537 
538  // Fill a map of initial state / action using closures.
539  // done to update the states w/o several if/elses
540  // Posterchild for constexpr with gcc4.7 onwards!
541  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
542  statesTransitions = {
543  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
544  this,
545  std::placeholders::_1,
546  std::placeholders::_2)},
547  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
548  this,
549  std::placeholders::_1,
550  std::placeholders::_2)}
551  };*/
552 
553  StatusCode global_sc(StatusCode::FAILURE,true);
554 
555  // Sort from the oldest to the newest event
556  // Prepare a vector of pointers to the slots to avoid copies
557  std::vector<EventSlot*> eventSlotsPtrs;
558 
559  // Consider all slots if si <0 or just one otherwise
560  if (si<0) {
561  const int eventsSlotsSize(m_eventSlots.size());
562  eventSlotsPtrs.reserve(eventsSlotsSize);
563  for (auto slotIt=m_eventSlots.begin();slotIt!=m_eventSlots.end();slotIt++) {
564  if (!slotIt->complete)
565  eventSlotsPtrs.push_back(&(*slotIt));
566  }
567  std::sort(eventSlotsPtrs.begin(),
568  eventSlotsPtrs.end(),
569  [](EventSlot* a, EventSlot* b) {return a->eventContext->evt() < b->eventContext->evt();});
570  } else {
571  eventSlotsPtrs.push_back(&m_eventSlots[si]);
572  }
573 
574  for (EventSlot* thisSlotPtr : eventSlotsPtrs) {
575  int iSlot = thisSlotPtr->eventContext->slot();
576 
577  // Cache the states of the algos to improve readability and performance
578  auto& thisSlot = m_eventSlots[iSlot];
579  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
580 
581  // Take care of the control ready update
582  // XXX: CF tests
583  if (!m_CFNext) {
584  m_efManager.updateEventState(thisAlgsStates,thisSlot.controlFlowState);
585  } else {
586  if (!algo_name.empty())
587  m_efManager.updateDecision(algo_name,iSlot,thisAlgsStates,thisSlot.controlFlowState);
588  }
589 
590 
591  //DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
592  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
593  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
594  if (algState==AlgsExecutionStates::ERROR)
595  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
596  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
597  partial_sc=StatusCode::SUCCESS;
598  for (auto state_transition = statesTransitions.find(algState);
599  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
600  state_transition++){
601  partial_sc = state_transition->second(iAlgo,iSlot);
602  if (partial_sc.isFailure()){
603  verbose() << "Could not apply transition from "
604  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
605  << " for algorithm " << index2algname(iAlgo)
606  << " on processing slot " << iSlot << endmsg;
607  }
608  else{global_sc=partial_sc;}
609  } // end loop on transitions
610  }*/ // end loop on algos
611 
612 
613  StatusCode partial_sc(StatusCode::FAILURE,true);
614  //first update CONTROLREADY to DATAREADY
615  if (!m_CFNext) {
616  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::CONTROLREADY);
617  it != thisAlgsStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
618 
619  uint algIndex = *it;
620  partial_sc = promoteToDataReady(algIndex, iSlot);
621  if (partial_sc.isFailure())
622  if (msgLevel(MSG::DEBUG))
623  verbose() << "Could not apply transition from "
624  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
625  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
626  }
627  }
628 
629  //now update DATAREADY to SCHEDULED
630  if (!m_optimizationMode.empty()) {
631  auto comp_nodes = [this] (const uint& i,const uint& j) {
634  };
636  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
637  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it)
638  buffer.push(*it);
639  /*std::stringstream s;
640  auto buffer2 = buffer;
641  while (!buffer2.empty()) {
642  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
643  buffer2.pop();
644  }
645  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
646 
647  while (!buffer.empty()) {
648  partial_sc = promoteToScheduled(buffer.top(), iSlot);
649  if (partial_sc.isFailure())
650  if (msgLevel(MSG::DEBUG))
651  verbose() << "Could not apply transition from "
652  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
653  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
654  buffer.pop();
655  }
656 
657  } else {
658  for(auto it = thisAlgsStates.begin(AlgsExecutionStates::State::DATAREADY);
659  it != thisAlgsStates.end(AlgsExecutionStates::State::DATAREADY); ++it) {
660  uint algIndex = *it;
661  partial_sc = promoteToScheduled(algIndex, iSlot);
662  if (partial_sc.isFailure())
663  if (msgLevel(MSG::DEBUG))
664  verbose() << "Could not apply transition from "
665  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
666  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
667 
668  }
669  }
670 
673  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY)
674  << ", " << thisAlgsStates.sizeOfSubset(State::DATAREADY)
675  << ", " << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << "\n";
676  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
677  : std::to_string(tbb::task_scheduler_init::default_num_threads());
678  std::ofstream myfile;
679  myfile.open("IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app);
680  myfile << s.str();
681  myfile.close();
682  }
683 
684 
685  // Not complete because this would mean that the slot is already free!
686  if (!thisSlot.complete &&
687  m_efManager.rootDecisionResolved(thisSlot.controlFlowState) &&
688  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::CONTROLREADY) &&
689  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::DATAREADY) &&
690  !thisSlot.algsStates.algsPresent(AlgsExecutionStates::SCHEDULED)) {
691 
692  thisSlot.complete=true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling already
695  if (!thisSlot.eventContext->evtFail()) {
696  m_finishedEvents.push(thisSlot.eventContext);
697  if (msgLevel(MSG::DEBUG))
698  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
699  << thisSlot.eventContext->slot() << ")." << endmsg;
700  }
701  // now let's return the fully evaluated result of the control flow
702  if (msgLevel(MSG::DEBUG)) {
704  m_efManager.printEventState(ss, thisSlot.algsStates, thisSlot.controlFlowState,0);
705  debug() << ss.str() << endmsg;
706  }
707 
708  thisSlot.eventContext= nullptr;
709  } else {
710  StatusCode eventStalledSC = isStalled(iSlot);
711  if (! eventStalledSC.isSuccess())
712  eventFailed(thisSlot.eventContext);
713  }
714  } // end loop on slots
715 
716  verbose() << "States Updated." << endmsg;
717 
718  return global_sc;
719 }
720 
721 //---------------------------------------------------------------------------
722 
730  // Get the slot
731  EventSlot& thisSlot = m_eventSlots[iSlot];
732 
733  if (m_actionsQueue.empty() &&
734  m_algosInFlight == 0 &&
736 
737  info() << "About to declare a stall" << endmsg;
738  fatal() << "*** Stall detected! ***\n" << endmsg;
739  dumpSchedulerState(iSlot);
740  //throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
741 
742  return StatusCode::FAILURE;
743  }
744  return StatusCode::SUCCESS;
745 }
746 
747 //---------------------------------------------------------------------------
748 
755 
756  // To have just one big message
757  std::ostringstream outputMessageStream;
758 
759  outputMessageStream
760  << "============================== Execution Task State ============================="
761  << std::endl;
762  dumpState(outputMessageStream);
763 
764  outputMessageStream
765  << std::endl
766  << "============================== Scheduler State ================================="
767  << std::endl;
768 
769  int slotCount = -1;
770  for (auto thisSlot : m_eventSlots){
771  slotCount++;
772  if ( thisSlot.complete )
773  continue;
774 
775  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
776  << " event: " << thisSlot.eventContext->evt()
777  << " -----------"<< std::endl;
778 
779  if ( 0 > iSlot or iSlot == slotCount) {
780  outputMessageStream << "Algorithms states:" << std::endl;
781 
782  const DataObjIDColl& wbSlotContent ( thisSlot.dataFlowMgr.content() );
783  for (unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
784  outputMessageStream << " o " << index2algname(algoIdx)
785  << " [" << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]]
786  << "] Data deps: ";
787  DataObjIDColl deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
788  const int depsSize=deps.size();
789  if (depsSize==0)
790  outputMessageStream << " none";
791 
792  DataObjIDColl missing;
793  for (auto d: deps) {
794  outputMessageStream << d << " ";
795  if ( wbSlotContent.find(d) == wbSlotContent.end() ) {
796  // outputMessageStream << "[missing] ";
797  missing.insert(d);
798  }
799  }
800 
801  if (! missing.empty()) {
802  outputMessageStream << ". The following are missing: ";
803  for (auto d: missing) {
804  outputMessageStream << d << " ";
805  }
806  }
807 
808  outputMessageStream << std::endl;
809  }
810 
811  // Snapshot of the WhiteBoard
812  outputMessageStream << "\nWhiteboard contents: "<< std::endl;
813  for (auto& product : wbSlotContent )
814  outputMessageStream << " o " << product << std::endl;
815 
816  // Snapshot of the ControlFlow
817  outputMessageStream << "\nControl Flow:" << std::endl;
818  std::stringstream cFlowStateStringStream;
819  m_efManager.printEventState(cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState,0);
820 
821  outputMessageStream << cFlowStateStringStream.str() << std::endl;
822  }
823  }
824 
825  outputMessageStream
826  << "=================================== END ======================================"
827  << std::endl;
828 
829  info() << "Dumping Scheduler State " << std::endl
830  << outputMessageStream.str() << endmsg;
831 
832 }
833 
834 //---------------------------------------------------------------------------
835 
837 
838  // Do the control flow
839  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
840  if (sc.isSuccess())
841  if (msgLevel(MSG::VERBOSE))
842  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
843  << si << endmsg;
844 
845  return sc;
846 
847 }
848 
849 //---------------------------------------------------------------------------
850 
852 
853  StatusCode sc;
854  if (!m_DFNext) {
855  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
856  } else {
858  }
859 
861  if (sc == StatusCode::SUCCESS)
862  updateSc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::DATAREADY);
863 
864  if (updateSc.isSuccess())
865  if (msgLevel(MSG::VERBOSE))
866  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
867  << si<< endmsg;
868 
869  return updateSc;
870 
871 }
872 
873 //---------------------------------------------------------------------------
874 
876 
878  return StatusCode::FAILURE;
879 
880  const std::string& algName(index2algname(iAlgo));
881 
882  IAlgorithm* ialgoPtr=nullptr;
883  StatusCode sc ( m_algResourcePool->acquireAlgorithm(algName,ialgoPtr) );
884 
885  if (sc.isSuccess()) {
886  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
887  EventContext* eventContext ( m_eventSlots[si].eventContext );
888  if (!eventContext)
889  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si<< ")" << endmsg;
890 
891  algoPtr->setContext(m_eventSlots[si].eventContext);
892  ++m_algosInFlight;
893  // Avoid to use tbb if the pool size is 1 and run in this thread
894  if (-100 != m_threadPoolSize) {
895  tbb::task* t = new( tbb::task::allocate_root() ) AlgoExecutionTask(ialgoPtr, iAlgo, serviceLocator(), this);
896  tbb::task::enqueue( *t);
897  } else {
898  AlgoExecutionTask theTask(ialgoPtr, iAlgo, serviceLocator(), this);
899  theTask.execute();
900  }
901 
902  if (msgLevel(MSG::DEBUG))
903  debug() << "Algorithm " << algName << " was submitted on event "
904  << eventContext->evt() << " in slot " << si
905  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
906 
907  StatusCode updateSc ( m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::SCHEDULED) );
908 
910 
911  if (updateSc.isSuccess())
912  if (msgLevel(MSG::VERBOSE))
913  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
914  << si << endmsg;
915  return updateSc;
916  } else {
917  if (msgLevel(MSG::DEBUG))
918  debug() << "Could not acquire instance for algorithm " << index2algname(iAlgo) << " on slot " << si << endmsg;
919  return sc;
920  }
921 
922 }
923 
924 //---------------------------------------------------------------------------
929 
930  // Put back the instance
931  Algorithm* castedAlgo = dynamic_cast<Algorithm*>(algo); // DP: expose context getter in IAlgo?
932  if (!castedAlgo)
933  fatal() << "The casting did not succeed!" << endmsg;
934  EventContext* eventContext = castedAlgo->getContext();
935 
936  // Check if the execution failed
937  if (eventContext->evtFail())
938  eventFailed(eventContext);
939 
940  StatusCode sc = m_algResourcePool->releaseAlgorithm(algo->name(),algo);
941 
942  if (!sc.isSuccess()) {
943  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot()
944  << "] " << "Instance of algorithm " << algo->name()
945  << " could not be properly put back." << endmsg;
946  return StatusCode::FAILURE;
947  }
948 
949  m_algosInFlight--;
950 
951  EventSlot& thisSlot = m_eventSlots[si];
952  // XXX: CF tests
953  if (!m_DFNext) {
954  // Update the catalog: some new products may be there
955  m_whiteboard->selectStore(eventContext->slot()).ignore();
956 
957  // update prods in the dataflow
958  // DP: Handles could be used. Just update what the algo wrote
959  DataObjIDColl new_products;
960  m_whiteboard->getNewDataObjects(new_products).ignore();
961  for (const auto& new_product : new_products)
962  if (msgLevel(MSG::DEBUG))
963  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
964  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
965  }
966 
967  if (msgLevel(MSG::DEBUG))
968  debug() << "Algorithm " << algo->name() << " executed in slot " << si
969  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
970 
971  // Limit number of updates
972  if (m_CFNext) m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
973  if (m_updateNeeded) {
974  // Schedule an update of the status of the algorithms
975  auto updateAction = std::bind(&ForwardSchedulerSvc::updateStates, this, -1, algo->name());
976  m_actionsQueue.push(updateAction);
977  m_updateNeeded = false;
978  }
979 
980  if (msgLevel(MSG::DEBUG))
981  debug() << "Trying to handle execution result of " << index2algname(iAlgo)
982  << " on slot " << si << endmsg;
983  State state;
984  if (algo->filterPassed()) {
985  state = State::EVTACCEPTED;
986  } else {
987  state = State::EVTREJECTED;
988  }
989 
990  sc = thisSlot.algsStates.updateState(iAlgo,state);
991 
992  if (sc.isSuccess())
993  if (msgLevel(MSG::VERBOSE))
994  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
996 
997  return sc;
998 }
999 
1000 //===========================================================================
1001 void
1003 
1005  m_sState.push_back(SchedulerState(a,e,t));
1006 
1007 }
1008 
1009 //===========================================================================
1010 bool
1012 
1014 
1015  for (std::list<SchedulerState>::iterator itr = m_sState.begin();
1016  itr != m_sState.end(); ++itr) {
1017  if (*itr == a) {
1018  m_sState.erase(itr);
1019  return true;
1020  }
1021  }
1022 
1023  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1024  return false;
1025 }
1026 
1027 //===========================================================================
1028 void
1030 
1032 
1033  for (auto it : m_sState) {
1034  ost << " " << it << std::endl;
1035  }
1036 
1037 }
1038 
1039 //===========================================================================
1040 void
1042 
1044 
1045  std::ostringstream ost;
1046  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1047  dumpState(ost);
1048 
1049  info() << ost.str() << endmsg;
1050 
1051 }
1052 
1053 
virtual StatusCode initPool(const int &poolSize)=0
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
void simulateExecutionFlow(IGraphVisitor &visitor) const
StatusCode initialize() override
Definition: Service.cpp:68
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:324
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
T open(T...args)
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
virtual StatusCode initialize()
Initialise.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode finalize() override
Definition: Service.cpp:193
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
const DataObjIDColl & inputDataObjs() const
Definition: Algorithm.h:599
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
STL namespace.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual tbb::task * execute()
The SchedulerSvc implements the IScheduler interface.
EventContext * getContext() const
get the context
Definition: Algorithm.h:571
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
bool evtFail() const
Definition: EventContext.h:43
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:574
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void addAlg(Algorithm *, EventContext *, pthread_t)
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:319
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T join(T...args)
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
T close(T...args)
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
def lock(file)
Definition: locker.py:16
T str(T...args)
T bind(T...args)
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:361
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T find(T...args)
T size(T...args)
STL class.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
T sort(T...args)
const DataObjIDColl & outputDataObjs() const
Definition: Algorithm.h:600
friend class AlgoExecutionTask
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
void ignore() const
Definition: StatusCode.h:108
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
list i
Definition: ana.py:128
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
STL class.
std::vector< std::vector< std::string > > m_algosDependencies
DEPRECATED!
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)