The Gaudi Framework  v28r3 (cc1cf868)
ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Local
2 #include "AlgResourcePool.h"
3 #include "AlgoExecutionTask.h"
4 #include "ForwardSchedulerSvc.h"
5 
6 // Framework includes
8 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
10 #include "GaudiKernel/IAlgorithm.h"
12 #include "GaudiKernel/SvcFactory.h"
15 
16 // C++
17 #include <algorithm>
18 #include <map>
19 #include <queue>
20 #include <sstream>
21 #include <unordered_set>
22 
23 // External libs
24 #include "boost/thread.hpp"
25 #include "boost/tokenizer.hpp"
26 #include "boost/algorithm/string.hpp"
27 // DP waiting for the TBB service
28 #include "tbb/task_scheduler_init.h"
29 
32 
33 // Instantiation of a static factory class used by clients to create instances of this service
35 
36 //===========================================================================
37 // Infrastructure methods
38 
39 
45 
46  // Initialise mother class (read properties, ...)
48  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
49 
50  // Get hold of the TBBSvc. This should initialize the thread pool
51  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
52  if ( !m_threadPoolSvc.isValid() ) {
53  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
54  return StatusCode::FAILURE;
55  }
56 
57  // Activate the scheduler in another thread.
58  info() << "Activating scheduler in a separate thread" << endmsg;
59  m_thread = std::thread( std::bind( &ForwardSchedulerSvc::activate, this ) );
60 
61  while ( m_isActive != ACTIVE ) {
62  if ( m_isActive == FAILURE ) {
63  fatal() << "Terminating initialization" << endmsg;
64  return StatusCode::FAILURE;
65  } else {
66  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
67  sleep( 1 );
68  }
69  }
70 
71  // Get the algo resource pool
72  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
73  if ( !m_algResourcePool.isValid() ) {
74  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
75  return StatusCode::FAILURE;
76  }
77 
78  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
79  if (!m_algExecStateSvc.isValid()) {
80  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
81  return StatusCode::FAILURE;
82  }
83 
84  // Get Whiteboard
85  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
86  if ( !m_whiteboard.isValid() ) {
87  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
88  return StatusCode::FAILURE;
89  }
90 
91  // Check the MaxEventsInFlight parameters and react
92  // Deprecated for the moment
93  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
94  if ( m_maxEventsInFlight != 0 ) {
95  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
96  << "Please migrate your code options files." << endmsg;
97 
98  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
99  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
100  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
101  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
102  }
103  }
104 
105  // set global concurrency flags
107 
108  // Align the two quantities
109  m_maxEventsInFlight = numberOfWBSlots;
110 
111  // Set the number of free slots
112  m_freeSlots = m_maxEventsInFlight;
113 
114  if ( m_algosDependencies.size() != 0 ) {
115  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
116  << " FIX your job options #####" << endmsg;
117  }
118 
119  // Get the list of algorithms
120  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
121  const unsigned int algsNumber = algos.size();
122  info() << "Found " << algsNumber << " algorithms" << endmsg;
123 
124  /* Dependencies
125  1) Look for handles in algo, if none
126  2) Assume none are required
127  */
128 
129  DataObjIDColl globalInp, globalOutp;
130 
131  // figure out all outputs
132  for (IAlgorithm* ialgoPtr : algos) {
133  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
134  if (!algoPtr) {
135  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
136  }
137  for (auto id : algoPtr->outputDataObjs()) {
138  auto r = globalOutp.insert(id);
139  if (!r.second) {
140  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
141  }
142  }
143  }
144 
145  std::ostringstream ostdd;
146  ostdd << "Data Dependencies for Algorithms:";
147 
148  std::vector<DataObjIDColl> m_algosDependencies;
149  for ( IAlgorithm* ialgoPtr : algos ) {
150  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
151  if ( nullptr == algoPtr ) {
152  fatal() << "Could not convert IAlgorithm into Algorithm for "
153  << ialgoPtr->name()
154  << ": this will result in a crash." << endmsg;
155  return StatusCode::FAILURE;
156  }
157 
158  ostdd << "\n " << algoPtr->name();
159 
160  DataObjIDColl algoDependencies;
161  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
162  for ( auto id : algoPtr->inputDataObjs() ) {
163  ostdd << "\n o INPUT " << id;
164  if (id.key().find(":")!=std::string::npos) {
165  ostdd << " contains alternatives which require resolution...\n";
166  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
167  auto itok = std::find_if( tokens.begin(), tokens.end(),
168  [&](const std::string& t) {
169  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
170  } );
171  if (itok!=tokens.end()) {
172  ostdd << "found matching output for " << *itok
173  << " -- updating scheduler info\n";
174  id.updateKey(*itok);
175  } else {
176  error() << "failed to find alternate in global output list"
177  << " for id: " << id << " in Alg " << algoPtr->name()
178  << endmsg;
179  m_showDataDeps = true;
180  }
181  }
182  algoDependencies.insert( id );
183  globalInp.insert( id );
184  }
185  for ( auto id : algoPtr->outputDataObjs() ) {
186  ostdd << "\n o OUTPUT " << id;
187  if (id.key().find(":")!=std::string::npos) {
188  error() << " in Alg " << algoPtr->name()
189  << " alternatives are NOT allowed for outputs! id: "
190  << id << endmsg;
191  m_showDataDeps = true;
192  }
193  }
194  } else {
195  ostdd << "\n none";
196  }
197  m_algosDependencies.emplace_back( algoDependencies );
198  }
199 
200  if ( m_showDataDeps ) {
201  info() << ostdd.str() << endmsg;
202  }
203 
204  // Fill the containers to convert algo names to index
205  m_algname_vect.reserve( algsNumber );
206  unsigned int index = 0;
207  IAlgorithm* dataLoaderAlg( nullptr );
208  for ( IAlgorithm* algo : algos ) {
209  const std::string& name = algo->name();
210  m_algname_index_map[name] = index;
211  m_algname_vect.emplace_back( name );
212  if (algo->name() == m_useDataLoader) {
213  dataLoaderAlg = algo;
214  }
215  index++;
216  }
217 
218  // Check if we have unmet global input dependencies
219  if ( m_checkDeps ) {
220  DataObjIDColl unmetDep;
221  for ( auto o : globalInp ) {
222  if ( globalOutp.find( o ) == globalOutp.end() ) {
223  unmetDep.insert( o );
224  }
225  }
226 
227  if ( unmetDep.size() > 0 ) {
228 
229  std::ostringstream ost;
230  for ( auto& o : unmetDep ) {
231  ost << "\n o " << o << " required by Algorithm: ";
232  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
233  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
234  ost << "\n * " << m_algname_vect[i];
235  }
236  }
237  }
238 
239  if ( m_useDataLoader != "" ) {
240  // Find the DataLoader Alg
241  if (dataLoaderAlg == nullptr) {
242  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
243  << "\" found, and unmet INPUT dependencies "
244  << "detected:\n" << ost.str() << endmsg;
245  return StatusCode::FAILURE;
246  }
247 
248  info() << "Will attribute the following unmet INPUT dependencies to \""
249  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
250  << "\" Algorithm"
251  << ost.str() << endmsg;
252 
253  // Set the property Load of DataLoader Alg
254  Algorithm *dataAlg = dynamic_cast<Algorithm*>(dataLoaderAlg);
255  if ( !dataAlg ) {
256  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value()
257  << "\" IAlg to Algorithm" << endmsg;
258  return StatusCode::FAILURE;
259  }
260 
261  for (auto& id : unmetDep) {
262  debug() << "adding OUTPUT dep \"" << id << "\" to "
263  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
264  << endmsg;
266  }
267 
268  } else {
269  fatal() << "Auto DataLoading not requested, "
270  << "and the following unmet INPUT dependencies were found:"
271  << ost.str() << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  } else {
276  info() << "No unmet INPUT data dependencies were found" << endmsg;
277  }
278  }
279 
280  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
281  sc = m_efManager.initialize( algPool->getPRGraph(), m_algname_index_map);
282  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
283 
284  // Shortcut for the message service
285  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
286  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
287 
288  m_eventSlots.assign( m_maxEventsInFlight,
289  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
290  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
291 
292  // Clearly inform about the level of concurrency
293  info() << "Concurrency level information:" << endmsg;
294  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
295  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
296  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
297 
298  m_efg = algPool->getPRGraph();
299 
300  if (m_showControlFlow) {
301  info() << std::endl
302  << "========== Algorithm and Sequence Configuration =========="
303  << std::endl << std::endl;
304  info() << m_efg->dumpControlFlow() << endmsg;
305  }
306 
307  if (m_showDataFlow) {
308  info() << std::endl
309  << "======================= Data Flow ========================"
310  << std::endl;
311  info() << m_efg->dumpDataFlow() << endmsg;
312  }
313 
314  return sc;
315 }
316 //---------------------------------------------------------------------------
317 
322 
324  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
325 
326  sc = deactivate();
327  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
328 
329  info() << "Joining Scheduler thread" << endmsg;
330  m_thread.join();
331 
332  // Final error check after thread pool termination
333  if ( m_isActive == FAILURE ) {
334  error() << "problems in scheduler thread" << endmsg;
335  return StatusCode::FAILURE;
336  }
337 
338  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
339 
340  return sc;
341 }
342 //---------------------------------------------------------------------------
354 
355  if (msgLevel(MSG::DEBUG))
356  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
357 
359  error() << "problems initializing ThreadPoolSvc" << endmsg;
361  return;
362  }
363 
364  // Wait for actions pushed into the queue by finishing tasks.
365  action thisAction;
367 
368  m_isActive = ACTIVE;
369 
370  // Continue to wait if the scheduler is running or there is something to do
371  info() << "Start checking the actionsQueue" << endmsg;
372  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
373  m_actionsQueue.pop( thisAction );
374  sc = thisAction();
375  if ( sc != StatusCode::SUCCESS )
376  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
377  else
378  verbose() << "Action succeeded." << endmsg;
379  }
380 
381  info() << "Terminating thread-pool resources" << endmsg;
383  error() << "Problems terminating thread pool" << endmsg;
385  }
386 }
387 
388 //---------------------------------------------------------------------------
389 
397 
398  if ( m_isActive == ACTIVE ) {
399  // Drain the scheduler
401  // This would be the last action
402  m_actionsQueue.push( [this]() -> StatusCode {
404  return StatusCode::SUCCESS;
405  } );
406  }
407 
408  return StatusCode::SUCCESS;
409 }
410 
411 //===========================================================================
412 
413 //===========================================================================
414 // Utils and shortcuts
415 
416 inline const std::string& ForwardSchedulerSvc::index2algname( unsigned int index ) {
417  return m_algname_vect[index];
418 }
419 
420 //---------------------------------------------------------------------------
421 
422 inline unsigned int ForwardSchedulerSvc::algname2index( const std::string& algoname ) {
423  unsigned int index = m_algname_index_map[algoname];
424  return index;
425 }
426 
427 //===========================================================================
428 // EventSlot management
436 
437  if ( m_first ) {
438  m_first = false;
439  }
440 
441  if ( !eventContext ) {
442  fatal() << "Event context is nullptr" << endmsg;
443  return StatusCode::FAILURE;
444  }
445 
446  if ( m_freeSlots.load() == 0 ) {
447  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
448  return StatusCode::FAILURE;
449  }
450 
451  // no problem as push new event is only called from one thread (event loop manager)
452  m_freeSlots--;
453 
454  auto action = [this, eventContext]() -> StatusCode {
455  // Event processing slot forced to be the same as the wb slot
456  const unsigned int thisSlotNum = eventContext->slot();
457  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
458  if ( !thisSlot.complete ) {
459  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
460  return StatusCode::FAILURE;
461  }
462 
463  debug() << "Executing event " << eventContext->evt() << " on slot "
464  << thisSlotNum << endmsg;
465  thisSlot.reset( eventContext );
466 
467  return this->updateStates( thisSlotNum );
468  }; // end of lambda
469 
470  // Kick off the scheduling!
471  if ( msgLevel( MSG::VERBOSE ) ) {
472  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
473  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
474  }
475  m_actionsQueue.push( action );
476 
477  return StatusCode::SUCCESS;
478 }
479 
480 //---------------------------------------------------------------------------
482  StatusCode sc;
483  for ( auto context : eventContexts ) {
484  sc = pushNewEvent( context );
485  if ( sc != StatusCode::SUCCESS ) return sc;
486  }
487  return sc;
488 }
489 
490 //---------------------------------------------------------------------------
492  return std::max( m_freeSlots.load(), 0 );
493 }
494 
495 //---------------------------------------------------------------------------
500  unsigned int slotNum = 0;
501  for ( auto& thisSlot : m_eventSlots ) {
502  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
503  updateStates( slotNum );
504  }
505  slotNum++;
506  }
507  return StatusCode::SUCCESS;
508 }
509 
510 //---------------------------------------------------------------------------
515  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
516  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
517  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
518  // << " active: " << m_isActive << endmsg;
519  return StatusCode::FAILURE;
520  } else {
521  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
522  // << " active: " << m_isActive << endmsg;
523  m_finishedEvents.pop( eventContext );
524  m_freeSlots++;
525  if (msgLevel(MSG::DEBUG))
526  debug() << "Popped slot " << eventContext->slot() << "(event "
527  << eventContext->evt() << ")" << endmsg;
528  return StatusCode::SUCCESS;
529  }
530 }
531 
532 //---------------------------------------------------------------------------
537  if ( m_finishedEvents.try_pop( eventContext ) ) {
538  if ( msgLevel( MSG::DEBUG ) )
539  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
540  << endmsg;
541  m_freeSlots++;
542  return StatusCode::SUCCESS;
543  }
544  return StatusCode::FAILURE;
545 }
546 
547 //---------------------------------------------------------------------------
554 
555  // Set the number of slots available to an error code
556  m_freeSlots.store( 0 );
557 
558  fatal() << "*** Event " << eventContext->evt() << " on slot "
559  << eventContext->slot() << " failed! ***" << endmsg;
560 
561  std::ostringstream ost;
562  m_algExecStateSvc->dump(ost, *eventContext);
563 
564  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
565  << ":\n" << ost.str() << endmsg;
566 
567  dumpSchedulerState(-1);
568 
569  // Empty queue and deactivate the service
570  action thisAction;
571  while ( m_actionsQueue.try_pop( thisAction ) ) {
572  };
573  deactivate();
574 
575  // Push into the finished events queue the failed context
576  EventContext* thisEvtContext;
577  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
578  m_finishedEvents.push( thisEvtContext );
579  };
580  m_finishedEvents.push( eventContext );
581 
582  return StatusCode::FAILURE;
583 }
584 
585 //===========================================================================
586 
587 //===========================================================================
588 // States Management
589 
600 
601  m_updateNeeded = true;
602 
603  // Fill a map of initial state / action using closures.
604  // done to update the states w/o several if/elses
605  // Posterchild for constexpr with gcc4.7 onwards!
606  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
607  statesTransitions = {
608  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
609  this,
610  std::placeholders::_1,
611  std::placeholders::_2)},
612  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
613  this,
614  std::placeholders::_1,
615  std::placeholders::_2)}
616  };*/
617 
618  StatusCode global_sc( StatusCode::FAILURE, true );
619 
620  // Sort from the oldest to the newest event
621  // Prepare a vector of pointers to the slots to avoid copies
622  std::vector<EventSlot*> eventSlotsPtrs;
623 
624  // Consider all slots if si <0 or just one otherwise
625  if ( si < 0 ) {
626  const int eventsSlotsSize( m_eventSlots.size() );
627  eventSlotsPtrs.reserve( eventsSlotsSize );
628  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
629  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
630  }
631  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
632  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
633  } else {
634  eventSlotsPtrs.push_back( &m_eventSlots[si] );
635  }
636 
637  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
638  int iSlot = thisSlotPtr->eventContext->slot();
639 
640  // Cache the states of the algos to improve readability and performance
641  auto& thisSlot = m_eventSlots[iSlot];
642  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
643 
644  // Take care of the control ready update
645  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
646 
647  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
648  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
649  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
650  if (algState==AlgsExecutionStates::ERROR)
651  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
652  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
653  partial_sc=StatusCode::SUCCESS;
654  for (auto state_transition = statesTransitions.find(algState);
655  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
656  state_transition++){
657  partial_sc = state_transition->second(iAlgo,iSlot);
658  if (partial_sc.isFailure()){
659  verbose() << "Could not apply transition from "
660  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
661  << " for algorithm " << index2algname(iAlgo)
662  << " on processing slot " << iSlot << endmsg;
663  }
664  else{global_sc=partial_sc;}
665  } // end loop on transitions
666  }*/ // end loop on algos
667 
668  StatusCode partial_sc( StatusCode::FAILURE, true );
669  // first update CONTROLREADY to DATAREADY
670  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
671  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
672 
673  uint algIndex = *it;
674  partial_sc = promoteToDataReady(algIndex, iSlot);
675  if (partial_sc.isFailure())
676  if (msgLevel(MSG::VERBOSE))
677  verbose() << "Could not apply transition from "
678  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
679  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
680  }
681 
682  // now update DATAREADY to SCHEDULED
683  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
684  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
685  uint algIndex = *it;
686 
687  partial_sc = promoteToScheduled( algIndex, iSlot );
688 
689  if (msgLevel(MSG::VERBOSE))
690  if (partial_sc.isFailure())
691  verbose() << "Could not apply transition from "
692  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
693  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
694  }
695 
696  // Not complete because this would mean that the slot is already free!
697  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
698  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
699  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
700  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
701 
702  thisSlot.complete = true;
703  // if the event did not fail, add it to the finished events
704  // otherwise it is taken care of in the error handling already
705  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
706  m_finishedEvents.push(thisSlot.eventContext);
707  if (msgLevel(MSG::DEBUG))
708  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
709  << thisSlot.eventContext->slot() << ")." << endmsg;
710  }
711  // now let's return the fully evaluated result of the control flow
712  if ( msgLevel( MSG::DEBUG ) ) {
714  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
715  debug() << ss.str() << endmsg;
716  }
717 
718  thisSlot.eventContext = nullptr;
719  } else {
720  StatusCode eventStalledSC = isStalled(iSlot);
721  if (! eventStalledSC.isSuccess()) {
722  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
723  eventFailed(thisSlot.eventContext).ignore();
724  }
725  }
726  } // end loop on slots
727 
728  verbose() << "States Updated." << endmsg;
729 
730  return global_sc;
731 }
732 
733 //---------------------------------------------------------------------------
734 
742  // Get the slot
743  EventSlot& thisSlot = m_eventSlots[iSlot];
744 
745  if ( m_actionsQueue.empty() && m_algosInFlight == 0 &&
747 
748  info() << "About to declare a stall" << endmsg;
749  fatal() << "*** Stall detected! ***\n" << endmsg;
750  dumpSchedulerState( iSlot );
751  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
752 
753  return StatusCode::FAILURE;
754  }
755  return StatusCode::SUCCESS;
756 }
757 
758 //---------------------------------------------------------------------------
759 
766 
767  // To have just one big message
768  std::ostringstream outputMessageStream;
769 
770  outputMessageStream << "============================== Execution Task State ============================="
771  << std::endl;
772  dumpState( outputMessageStream );
773 
774  outputMessageStream << std::endl
775  << "============================== Scheduler State ================================="
776  << std::endl;
777 
778  int slotCount = -1;
779  for ( auto thisSlot : m_eventSlots ) {
780  slotCount++;
781  if ( thisSlot.complete ) continue;
782 
783  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
784  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
785 
786  if ( 0 > iSlot or iSlot == slotCount ) {
787  outputMessageStream << "Algorithms states:" << std::endl;
788 
789  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
790  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
791  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
792  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
793  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
794  const int depsSize = deps.size();
795  if ( depsSize == 0 ) outputMessageStream << " none";
796 
797  DataObjIDColl missing;
798  for ( auto d : deps ) {
799  outputMessageStream << d << " ";
800  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
801  // outputMessageStream << "[missing] ";
802  missing.insert( d );
803  }
804  }
805 
806  if ( !missing.empty() ) {
807  outputMessageStream << ". The following are missing: ";
808  for ( auto d : missing ) {
809  outputMessageStream << d << " ";
810  }
811  }
812 
813  outputMessageStream << std::endl;
814  }
815 
816  // Snapshot of the WhiteBoard
817  outputMessageStream << "\nWhiteboard contents: " << std::endl;
818  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
819 
820  // Snapshot of the ControlFlow
821  outputMessageStream << "\nControl Flow:" << std::endl;
822  std::stringstream cFlowStateStringStream;
823  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
824 
825  outputMessageStream << cFlowStateStringStream.str() << std::endl;
826  }
827  }
828 
829  outputMessageStream << "=================================== END ======================================" << std::endl;
830 
831  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
832 }
833 
834 //---------------------------------------------------------------------------
835 
837 
838  // Do the control flow
839  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
840  if (sc.isSuccess())
841  if (msgLevel(MSG::VERBOSE))
842  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
843  << si << endmsg;
844 
845  return sc;
846 }
847 
848 //---------------------------------------------------------------------------
849 
851 
852  StatusCode sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
853 
854  StatusCode updateSc( StatusCode::FAILURE );
855  if ( sc == StatusCode::SUCCESS )
856  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
857 
858  if (updateSc.isSuccess())
859  if (msgLevel(MSG::VERBOSE))
860  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
861  << si<< endmsg;
862 
863  return updateSc;
864 }
865 
866 //---------------------------------------------------------------------------
867 
869 
871 
872  const std::string& algName( index2algname( iAlgo ) );
873  IAlgorithm* ialgoPtr = nullptr;
874  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
875 
876  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
877  EventContext* eventContext( m_eventSlots[si].eventContext );
878  if ( !eventContext )
879  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
880 
881  ++m_algosInFlight;
882  // prepare a scheduler action to run once the algorithm is executed
883  auto promote2ExecutedClosure = std::bind(&ForwardSchedulerSvc::promoteToExecuted,
884  this,
885  iAlgo,
886  eventContext->slot(),
887  ialgoPtr,
888  eventContext);
889  // Avoid to use tbb if the pool size is 1 and run in this thread
890  if (-100 != m_threadPoolSize) {
891 
892  // this parent task is needed to promote an Algorithm as EXECUTED,
893  // it will be started as soon as the child task (see below) is completed
894  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
895  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
896  // setting parent's refcount to 1 is made here only for consistency
897  // (in this case since it is not scheduled explicitly and there it has only one child task)
898  triggerAlgoStateUpdate->set_ref_count(1);
899  // the child task that executes an Algorithm
900  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
901  AlgoExecutionTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
902  // schedule the algoTask
903  tbb::task::enqueue( *algoTask);
904 
905  } else {
906  AlgoExecutionTask theTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
907  theTask.execute();
908  promote2ExecutedClosure();
909  }
910 
911  if ( msgLevel( MSG::DEBUG ) )
912  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
913  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
914 
915  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
916 
917  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
918 
919  if (updateSc.isSuccess())
920  if (msgLevel(MSG::VERBOSE))
921  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
922  << si << endmsg;
923  return updateSc;
924  } else {
925  if ( msgLevel( MSG::DEBUG ) )
926  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
927  return sc;
928  }
929 }
930 
931 //---------------------------------------------------------------------------
932 
934  EventContext* eventContext ) {
935 
936  // Put back the instance
937  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
938  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
939  // EventContext* eventContext = castedAlgo->getContext();
940 
941  // Check if the execution failed
942  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
943  eventFailed(eventContext).ignore();
944 
945  Gaudi::Hive::setCurrentContext(eventContext);
946  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
947 
948  if ( !sc.isSuccess() ) {
949  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
950  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
951  return StatusCode::FAILURE;
952  }
953 
954  m_algosInFlight--;
955 
956  EventSlot& thisSlot = m_eventSlots[si];
957 
958  // Update the catalog: some new products may be there
959  m_whiteboard->selectStore( eventContext->slot() ).ignore();
960 
961  // update prods in the dataflow
962  // DP: Handles could be used. Just update what the algo wrote
963  DataObjIDColl new_products;
964  m_whiteboard->getNewDataObjects( new_products ).ignore();
965  for ( const auto& new_product : new_products )
966  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
967  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
971  << m_algosInFlight << endmsg;
972 
973  // Limit number of updates
974  if ( m_updateNeeded ) {
975  // Schedule an update of the status of the algorithms
976  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1);
977  m_actionsQueue.push( updateAction );
978  m_updateNeeded = false;
979  }
980 
981  if ( msgLevel( MSG::DEBUG ) )
982  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
983  State state;
984  if ( algo->filterPassed() ) {
985  state = State::EVTACCEPTED;
986  } else {
987  state = State::EVTREJECTED;
988  }
989 
990  sc = thisSlot.algsStates.updateState( iAlgo, state );
991 
992  if (sc.isSuccess())
993  if (msgLevel(MSG::VERBOSE))
994  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
996 
997  return sc;
998 }
999 
1000 //===========================================================================
1002 
1004  m_sState.push_back( SchedulerState( a, e, t ) );
1005 }
1006 
1007 //===========================================================================
1009 
1011 
1012  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1013  if ( *itr == a ) {
1014  m_sState.erase( itr );
1015  return true;
1016  }
1017  }
1018 
1019  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1020  return false;
1021 }
1022 
1023 //===========================================================================
1025 
1027 
1028  for ( auto it : m_sState ) {
1029  ost << " " << it << std::endl;
1030  }
1031 }
1032 
1033 //===========================================================================
1035 
1037 
1038  std::ostringstream ost;
1039  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1040  dumpState( ost );
1041 
1042  info() << ost.str() << endmsg;
1043 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
StatusCode initialize() override
Definition: Service.cpp:64
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
Header file for class GaudiAlgorithm.
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode finalize() override
Finalise.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IThreadPoolSvc > m_threadPoolSvc
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
void addAlg(Algorithm *, EventContext *, pthread_t)
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T join(T...args)
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
unsigned int freeSlots() override
Get free slots number.
T bind(T...args)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:213
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)