The Gaudi Framework  v29r0 (ff2e7097)
ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Local
2 #include "ForwardSchedulerSvc.h"
3 #include "AlgResourcePool.h"
4 #include "AlgoExecutionTask.h"
5 
6 // Framework includes
8 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
11 #include "GaudiKernel/IAlgorithm.h"
13 #include "GaudiKernel/SvcFactory.h"
15 
16 // C++
17 #include <algorithm>
18 #include <map>
19 #include <queue>
20 #include <sstream>
21 #include <unordered_set>
22 
23 // External libs
24 #include "boost/algorithm/string.hpp"
25 #include "boost/thread.hpp"
26 #include "boost/tokenizer.hpp"
27 // DP waiting for the TBB service
28 #include "tbb/task_scheduler_init.h"
29 
32 
33 // Instantiation of a static factory class used by clients to create instances of this service
35 
36 //===========================================================================
37 // Infrastructure methods
38 
39 
45 {
46 
47  // Initialise mother class (read properties, ...)
49  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
50 
51  // Get hold of the TBBSvc. This should initialize the thread pool
52  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
53  if ( !m_threadPoolSvc.isValid() ) {
54  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
55  return StatusCode::FAILURE;
56  }
57 
58  // Activate the scheduler in another thread.
59  info() << "Activating scheduler in a separate thread" << endmsg;
60  m_thread = std::thread( std::bind( &ForwardSchedulerSvc::activate, this ) );
61 
62  while ( m_isActive != ACTIVE ) {
63  if ( m_isActive == FAILURE ) {
64  fatal() << "Terminating initialization" << endmsg;
65  return StatusCode::FAILURE;
66  } else {
67  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
68  sleep( 1 );
69  }
70  }
71 
72  // Get the algo resource pool
73  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
74  if ( !m_algResourcePool.isValid() ) {
75  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
76  return StatusCode::FAILURE;
77  }
78 
79  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
80  if ( !m_algExecStateSvc.isValid() ) {
81  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
82  return StatusCode::FAILURE;
83  }
84 
85  // Get Whiteboard
86  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
87  if ( !m_whiteboard.isValid() ) {
88  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
89  return StatusCode::FAILURE;
90  }
91 
92  // Check the MaxEventsInFlight parameters and react
93  // Deprecated for the moment
94  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
95  if ( m_maxEventsInFlight != 0 ) {
96  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
97  << "Please migrate your code options files." << endmsg;
98 
99  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
100  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
101  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
102  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
103  }
104  }
105 
106  // set global concurrency flags
108 
109  // Align the two quantities
110  m_maxEventsInFlight = numberOfWBSlots;
111 
112  // Set the number of free slots
113  m_freeSlots = m_maxEventsInFlight;
114 
115  if ( m_algosDependencies.size() != 0 ) {
116  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
117  << " FIX your job options #####" << endmsg;
118  }
119 
120  // Get the list of algorithms
121  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
122  const unsigned int algsNumber = algos.size();
123  info() << "Found " << algsNumber << " algorithms" << endmsg;
124 
125  /* Dependencies
126  1) Look for handles in algo, if none
127  2) Assume none are required
128  */
129 
130  DataObjIDColl globalInp, globalOutp;
131 
132  // figure out all outputs
133  for ( IAlgorithm* ialgoPtr : algos ) {
134  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
135  if ( !algoPtr ) {
136  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
137  }
138  for ( auto id : algoPtr->outputDataObjs() ) {
139  auto r = globalOutp.insert( id );
140  if ( !r.second ) {
141  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
142  "though, or control flow may guarantee only one runs...!"
143  << endmsg;
144  }
145  }
146  }
147 
148  std::ostringstream ostdd;
149  ostdd << "Data Dependencies for Algorithms:";
150 
151  std::vector<DataObjIDColl> m_algosDependencies;
152  for ( IAlgorithm* ialgoPtr : algos ) {
153  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
154  if ( nullptr == algoPtr ) {
155  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
156  << ": this will result in a crash." << endmsg;
157  return StatusCode::FAILURE;
158  }
159 
160  ostdd << "\n " << algoPtr->name();
161 
162  DataObjIDColl algoDependencies;
163  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
164  for ( auto id : algoPtr->inputDataObjs() ) {
165  ostdd << "\n o INPUT " << id;
166  if ( id.key().find( ":" ) != std::string::npos ) {
167  ostdd << " contains alternatives which require resolution...\n";
168  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
169  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
170  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
171  } );
172  if ( itok != tokens.end() ) {
173  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
174  id.updateKey( *itok );
175  } else {
176  error() << "failed to find alternate in global output list"
177  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
178  m_showDataDeps = true;
179  }
180  }
181  algoDependencies.insert( id );
182  globalInp.insert( id );
183  }
184  for ( auto id : algoPtr->outputDataObjs() ) {
185  ostdd << "\n o OUTPUT " << id;
186  if ( id.key().find( ":" ) != std::string::npos ) {
187  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << id << endmsg;
188  m_showDataDeps = true;
189  }
190  }
191  } else {
192  ostdd << "\n none";
193  }
194  m_algosDependencies.emplace_back( algoDependencies );
195  }
196 
197  if ( m_showDataDeps ) {
198  info() << ostdd.str() << endmsg;
199  }
200 
201  // Fill the containers to convert algo names to index
202  m_algname_vect.reserve( algsNumber );
203  unsigned int index = 0;
204  IAlgorithm* dataLoaderAlg( nullptr );
205  for ( IAlgorithm* algo : algos ) {
206  const std::string& name = algo->name();
207  m_algname_index_map[name] = index;
208  m_algname_vect.emplace_back( name );
209  if ( algo->name() == m_useDataLoader ) {
210  dataLoaderAlg = algo;
211  }
212  index++;
213  }
214 
215  // Check if we have unmet global input dependencies
216  if ( m_checkDeps ) {
217  DataObjIDColl unmetDep;
218  for ( auto o : globalInp ) {
219  if ( globalOutp.find( o ) == globalOutp.end() ) {
220  unmetDep.insert( o );
221  }
222  }
223 
224  if ( unmetDep.size() > 0 ) {
225 
226  std::ostringstream ost;
227  for ( auto& o : unmetDep ) {
228  ost << "\n o " << o << " required by Algorithm: ";
229  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
230  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
231  ost << "\n * " << m_algname_vect[i];
232  }
233  }
234  }
235 
236  if ( m_useDataLoader != "" ) {
237  // Find the DataLoader Alg
238  if ( dataLoaderAlg == nullptr ) {
239  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
240  << "\" found, and unmet INPUT dependencies "
241  << "detected:\n"
242  << ost.str() << endmsg;
243  return StatusCode::FAILURE;
244  }
245 
246  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
247  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
248 
249  // Set the property Load of DataLoader Alg
250  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
251  if ( !dataAlg ) {
252  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  for ( auto& id : unmetDep ) {
257  debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
258  << endmsg;
260  }
261 
262  } else {
263  fatal() << "Auto DataLoading not requested, "
264  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
265  return StatusCode::FAILURE;
266  }
267 
268  } else {
269  info() << "No unmet INPUT data dependencies were found" << endmsg;
270  }
271  }
272 
273  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
274  m_efManager.initialize( algPool->getCFGraph(), m_algname_index_map );
275  unsigned int controlFlowNodeNumber = m_efManager.getCFGraph()->getControlFlowNodeCounter();
276 
277  // Shortcut for the message service
278  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
279  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
280 
281  m_eventSlots.assign( m_maxEventsInFlight,
282  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
283  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
284 
285  // Clearly inform about the level of concurrency
286  info() << "Concurrency level information:" << endmsg;
287  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
288  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
289  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
290 
291  m_efg = algPool->getCFGraph();
292 
293  if ( m_showControlFlow ) {
294  info() << std::endl << "========== Algorithm and Sequence Configuration ==========" << std::endl << std::endl;
295  info() << m_efg->dumpControlFlow() << endmsg;
296  }
297 
298  if ( m_showDataFlow ) {
299  warning() << "A 1-level data flow dump requested, but this feature is not supported"
300  << " by the ForwardScheduler any more. Use the AvalancheScheduler"
301  << " to dump as 1-level data flow, so the complete data flow graph." << endmsg;
302  }
303 
304  return sc;
305 }
306 //---------------------------------------------------------------------------
307 
312 {
313 
315  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
316 
317  sc = deactivate();
318  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
319 
320  info() << "Joining Scheduler thread" << endmsg;
321  m_thread.join();
322 
323  // Final error check after thread pool termination
324  if ( m_isActive == FAILURE ) {
325  error() << "problems in scheduler thread" << endmsg;
326  return StatusCode::FAILURE;
327  }
328 
329  return sc;
330 }
331 //---------------------------------------------------------------------------
343 {
344 
345  if ( msgLevel( MSG::DEBUG ) ) debug() << "ForwardSchedulerSvc::activate()" << endmsg;
346 
348  error() << "problems initializing ThreadPoolSvc" << endmsg;
350  return;
351  }
352 
353  // Wait for actions pushed into the queue by finishing tasks.
354  action thisAction;
356 
357  m_isActive = ACTIVE;
358 
359  // Continue to wait if the scheduler is running or there is something to do
360  info() << "Start checking the actionsQueue" << endmsg;
361  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
362  m_actionsQueue.pop( thisAction );
363  sc = thisAction();
364  if ( sc != StatusCode::SUCCESS )
365  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
366  else
367  verbose() << "Action succeeded." << endmsg;
368  }
369 
370  info() << "Terminating thread-pool resources" << endmsg;
372  error() << "Problems terminating thread pool" << endmsg;
374  }
375 }
376 
377 //---------------------------------------------------------------------------
378 
386 {
387 
388  if ( m_isActive == ACTIVE ) {
389  // Drain the scheduler
391  // This would be the last action
392  m_actionsQueue.push( [this]() -> StatusCode {
394  return StatusCode::SUCCESS;
395  } );
396  }
397 
398  return StatusCode::SUCCESS;
399 }
400 
401 //===========================================================================
402 
403 //===========================================================================
404 // Utils and shortcuts
405 
406 inline const std::string& ForwardSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
407 
408 //---------------------------------------------------------------------------
409 
410 inline unsigned int ForwardSchedulerSvc::algname2index( const std::string& algoname )
411 {
412  unsigned int index = m_algname_index_map[algoname];
413  return index;
414 }
415 
416 //===========================================================================
417 // EventSlot management
425 {
426 
427  if ( m_first ) {
428  m_first = false;
429  }
430 
431  if ( !eventContext ) {
432  fatal() << "Event context is nullptr" << endmsg;
433  return StatusCode::FAILURE;
434  }
435 
436  if ( m_freeSlots.load() == 0 ) {
437  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
438  return StatusCode::FAILURE;
439  }
440 
441  // no problem as push new event is only called from one thread (event loop manager)
442  m_freeSlots--;
443 
444  auto action = [this, eventContext]() -> StatusCode {
445  // Event processing slot forced to be the same as the wb slot
446  const unsigned int thisSlotNum = eventContext->slot();
447  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
448  if ( !thisSlot.complete ) {
449  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
450  return StatusCode::FAILURE;
451  }
452 
453  debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
454  thisSlot.reset( eventContext );
455 
456  return this->updateStates( thisSlotNum );
457  }; // end of lambda
458 
459  // Kick off the scheduling!
460  if ( msgLevel( MSG::VERBOSE ) ) {
461  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
462  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
463  }
464  m_actionsQueue.push( action );
465 
466  return StatusCode::SUCCESS;
467 }
468 
469 //---------------------------------------------------------------------------
471 {
472  StatusCode sc;
473  for ( auto context : eventContexts ) {
474  sc = pushNewEvent( context );
475  if ( sc != StatusCode::SUCCESS ) return sc;
476  }
477  return sc;
478 }
479 
480 //---------------------------------------------------------------------------
481 unsigned int ForwardSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
482 
483 //---------------------------------------------------------------------------
488 {
489  unsigned int slotNum = 0;
490  for ( auto& thisSlot : m_eventSlots ) {
491  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
492  updateStates( slotNum );
493  }
494  slotNum++;
495  }
496  return StatusCode::SUCCESS;
497 }
498 
499 //---------------------------------------------------------------------------
504 {
505  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
506  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
507  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
508  // << " active: " << m_isActive << endmsg;
509  return StatusCode::FAILURE;
510  } else {
511  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
512  // << " active: " << m_isActive << endmsg;
513  m_finishedEvents.pop( eventContext );
514  m_freeSlots++;
515  if ( msgLevel( MSG::DEBUG ) )
516  debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
517  return StatusCode::SUCCESS;
518  }
519 }
520 
521 //---------------------------------------------------------------------------
526 {
527  if ( m_finishedEvents.try_pop( eventContext ) ) {
528  if ( msgLevel( MSG::DEBUG ) )
529  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
530  << endmsg;
531  m_freeSlots++;
532  return StatusCode::SUCCESS;
533  }
534  return StatusCode::FAILURE;
535 }
536 
537 //---------------------------------------------------------------------------
544 {
545 
546  // Set the number of slots available to an error code
547  m_freeSlots.store( 0 );
548 
549  fatal() << "*** Event " << eventContext->evt() << " on slot " << eventContext->slot() << " failed! ***" << endmsg;
550 
551  std::ostringstream ost;
552  m_algExecStateSvc->dump( ost, *eventContext );
553 
554  info() << "Dumping Alg Exec State for slot " << eventContext->slot() << ":\n" << ost.str() << endmsg;
555 
556  dumpSchedulerState( -1 );
557 
558  // Empty queue and deactivate the service
559  action thisAction;
560  while ( m_actionsQueue.try_pop( thisAction ) ) {
561  };
562  deactivate();
563 
564  // Push into the finished events queue the failed context
565  EventContext* thisEvtContext;
566  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
567  m_finishedEvents.push( thisEvtContext );
568  };
569  m_finishedEvents.push( eventContext );
570 
571  return StatusCode::FAILURE;
572 }
573 
574 //===========================================================================
575 
576 //===========================================================================
577 // States Management
578 
589 {
590 
591  m_updateNeeded = true;
592 
593  // Fill a map of initial state / action using closures.
594  // done to update the states w/o several if/elses
595  // Posterchild for constexpr with gcc4.7 onwards!
596  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
597  statesTransitions = {
598  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
599  this,
600  std::placeholders::_1,
601  std::placeholders::_2)},
602  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
603  this,
604  std::placeholders::_1,
605  std::placeholders::_2)}
606  };*/
607 
608  StatusCode global_sc( StatusCode::FAILURE, true );
609 
610  // Sort from the oldest to the newest event
611  // Prepare a vector of pointers to the slots to avoid copies
612  std::vector<EventSlot*> eventSlotsPtrs;
613 
614  // Consider all slots if si <0 or just one otherwise
615  if ( si < 0 ) {
616  const int eventsSlotsSize( m_eventSlots.size() );
617  eventSlotsPtrs.reserve( eventsSlotsSize );
618  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
619  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
620  }
621  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
622  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
623  } else {
624  eventSlotsPtrs.push_back( &m_eventSlots[si] );
625  }
626 
627  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
628  int iSlot = thisSlotPtr->eventContext->slot();
629 
630  // Cache the states of the algos to improve readability and performance
631  auto& thisSlot = m_eventSlots[iSlot];
632  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
633 
634  // Take care of the control ready update
635  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
636 
637  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
638  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
639  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
640  if (algState==AlgsExecutionStates::ERROR)
641  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
642  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
643  partial_sc=StatusCode::SUCCESS;
644  for (auto state_transition = statesTransitions.find(algState);
645  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
646  state_transition++){
647  partial_sc = state_transition->second(iAlgo,iSlot);
648  if (partial_sc.isFailure()){
649  verbose() << "Could not apply transition from "
650  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
651  << " for algorithm " << index2algname(iAlgo)
652  << " on processing slot " << iSlot << endmsg;
653  }
654  else{global_sc=partial_sc;}
655  } // end loop on transitions
656  }*/ // end loop on algos
657 
658  StatusCode partial_sc( StatusCode::FAILURE, true );
659  // first update CONTROLREADY to DATAREADY
660  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
661  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
662 
663  uint algIndex = *it;
664  partial_sc = promoteToDataReady( algIndex, iSlot );
665  if ( partial_sc.isFailure() )
666  if ( msgLevel( MSG::VERBOSE ) )
667  verbose() << "Could not apply transition from "
668  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY] << " for algorithm "
669  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
670  }
671 
672  // now update DATAREADY to SCHEDULED
673  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
674  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
675  uint algIndex = *it;
676 
677  partial_sc = promoteToScheduled( algIndex, iSlot );
678 
679  if ( msgLevel( MSG::VERBOSE ) )
680  if ( partial_sc.isFailure() )
681  verbose() << "Could not apply transition from "
682  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
683  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
684  }
685 
686  // Not complete because this would mean that the slot is already free!
687  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
688  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
689  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
690  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling already
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  m_finishedEvents.push( thisSlot.eventContext );
697  if ( msgLevel( MSG::DEBUG ) )
698  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot " << thisSlot.eventContext->slot()
699  << ")." << endmsg;
700  }
701  // now let's return the fully evaluated result of the control flow
702  if ( msgLevel( MSG::DEBUG ) ) {
704  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
705  debug() << ss.str() << endmsg;
706  }
707 
708  thisSlot.eventContext = nullptr;
709  } else {
710  StatusCode eventStalledSC = isStalled( iSlot );
711  if ( !eventStalledSC.isSuccess() ) {
712  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
713  eventFailed( thisSlot.eventContext ).ignore();
714  }
715  }
716  } // end loop on slots
717 
718  verbose() << "States Updated." << endmsg;
719 
720  return global_sc;
721 }
722 
723 //---------------------------------------------------------------------------
724 
732 {
733  // Get the slot
734  EventSlot& thisSlot = m_eventSlots[iSlot];
735 
736  if ( m_actionsQueue.empty() && m_algosInFlight == 0 &&
738 
739  info() << "About to declare a stall" << endmsg;
740  fatal() << "*** Stall detected! ***\n" << endmsg;
741  dumpSchedulerState( iSlot );
742  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
743 
744  return StatusCode::FAILURE;
745  }
746  return StatusCode::SUCCESS;
747 }
748 
749 //---------------------------------------------------------------------------
750 
757 {
758 
759  // To have just one big message
760  std::ostringstream outputMessageStream;
761 
762  outputMessageStream << "============================== Execution Task State ============================="
763  << std::endl;
764  dumpState( outputMessageStream );
765 
766  outputMessageStream << std::endl
767  << "============================== Scheduler State ================================="
768  << std::endl;
769 
770  int slotCount = -1;
771  for ( auto thisSlot : m_eventSlots ) {
772  slotCount++;
773  if ( thisSlot.complete ) continue;
774 
775  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
776  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
777 
778  if ( 0 > iSlot or iSlot == slotCount ) {
779  outputMessageStream << "Algorithms states:" << std::endl;
780 
781  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
782  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
783  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
784  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
785  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
786  const int depsSize = deps.size();
787  if ( depsSize == 0 ) outputMessageStream << " none";
788 
789  DataObjIDColl missing;
790  for ( auto d : deps ) {
791  outputMessageStream << d << " ";
792  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
793  // outputMessageStream << "[missing] ";
794  missing.insert( d );
795  }
796  }
797 
798  if ( !missing.empty() ) {
799  outputMessageStream << ". The following are missing: ";
800  for ( auto d : missing ) {
801  outputMessageStream << d << " ";
802  }
803  }
804 
805  outputMessageStream << std::endl;
806  }
807 
808  // Snapshot of the WhiteBoard
809  outputMessageStream << "\nWhiteboard contents: " << std::endl;
810  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
811 
812  // Snapshot of the ControlFlow
813  outputMessageStream << "\nControl Flow:" << std::endl;
814  std::stringstream cFlowStateStringStream;
815  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
816 
817  outputMessageStream << cFlowStateStringStream.str() << std::endl;
818  }
819  }
820 
821  outputMessageStream << "=================================== END ======================================" << std::endl;
822 
823  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
824 }
825 
826 //---------------------------------------------------------------------------
827 
829 {
830 
831  // Do the control flow
832  StatusCode sc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::CONTROLREADY );
833  if ( sc.isSuccess() )
834  if ( msgLevel( MSG::VERBOSE ) )
835  verbose() << "Promoting " << index2algname( iAlgo ) << " to CONTROLREADY on slot " << si << endmsg;
836 
837  return sc;
838 }
839 
840 //---------------------------------------------------------------------------
841 
843 {
844 
845  StatusCode sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
846 
847  StatusCode updateSc( StatusCode::FAILURE );
848  if ( sc == StatusCode::SUCCESS )
849  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
850 
851  if ( updateSc.isSuccess() )
852  if ( msgLevel( MSG::VERBOSE ) )
853  verbose() << "Promoting " << index2algname( iAlgo ) << " to DATAREADY on slot " << si << endmsg;
854 
855  return updateSc;
856 }
857 
858 //---------------------------------------------------------------------------
859 
861 {
862 
864 
865  const std::string& algName( index2algname( iAlgo ) );
866  IAlgorithm* ialgoPtr = nullptr;
867  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
868 
869  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
870  EventContext* eventContext( m_eventSlots[si].eventContext );
871  if ( !eventContext )
872  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
873 
874  ++m_algosInFlight;
875  // prepare a scheduler action to run once the algorithm is executed
876  auto promote2ExecutedClosure =
877  std::bind( &ForwardSchedulerSvc::promoteToExecuted, this, iAlgo, eventContext->slot(), ialgoPtr, eventContext );
878  // Avoid to use tbb if the pool size is 1 and run in this thread
879  if ( -100 != m_threadPoolSize ) {
880 
881  // this parent task is needed to promote an Algorithm as EXECUTED,
882  // it will be started as soon as the child task (see below) is completed
883  tbb::task* triggerAlgoStateUpdate =
884  new ( tbb::task::allocate_root() ) enqueueSchedulerActionTask( this, promote2ExecutedClosure );
885  // setting parent's refcount to 1 is made here only for consistency
886  // (in this case since it is not scheduled explicitly and there it has only one child task)
887  triggerAlgoStateUpdate->set_ref_count( 1 );
888  // the child task that executes an Algorithm
889  tbb::task* algoTask = new ( triggerAlgoStateUpdate->allocate_child() )
890  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
891  // schedule the algoTask
892  tbb::task::enqueue( *algoTask );
893 
894  } else {
895  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
896  theTask.execute();
897  promote2ExecutedClosure();
898  }
899 
900  if ( msgLevel( MSG::DEBUG ) )
901  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
902  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
903 
904  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
905 
906  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
907 
908  if ( updateSc.isSuccess() )
909  if ( msgLevel( MSG::VERBOSE ) )
910  verbose() << "Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
911  return updateSc;
912  } else {
913  if ( msgLevel( MSG::DEBUG ) )
914  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
915  return sc;
916  }
917 }
918 
919 //---------------------------------------------------------------------------
920 
922  EventContext* eventContext )
923 {
924 
925  // Put back the instance
926  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
927  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
928  // EventContext* eventContext = castedAlgo->getContext();
929 
930  // Check if the execution failed
931  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
932 
933  Gaudi::Hive::setCurrentContext( eventContext );
934  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
935 
936  if ( !sc.isSuccess() ) {
937  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
938  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
939  return StatusCode::FAILURE;
940  }
941 
942  m_algosInFlight--;
943 
944  EventSlot& thisSlot = m_eventSlots[si];
945 
946  // Update the catalog: some new products may be there
947  m_whiteboard->selectStore( eventContext->slot() ).ignore();
948 
949  // update prods in the dataflow
950  // DP: Handles could be used. Just update what the algo wrote
951  DataObjIDColl new_products;
952  m_whiteboard->getNewDataObjects( new_products ).ignore();
953  for ( const auto& new_product : new_products )
954  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
955  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
956 
957  if ( msgLevel( MSG::DEBUG ) )
958  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
959  << m_algosInFlight << endmsg;
960 
961  // Limit number of updates
962  if ( m_updateNeeded ) {
963  // Schedule an update of the status of the algorithms
964  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1 );
965  m_actionsQueue.push( updateAction );
966  m_updateNeeded = false;
967  }
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
971  State state;
972  if ( algo->filterPassed() ) {
973  state = State::EVTACCEPTED;
974  } else {
975  state = State::EVTREJECTED;
976  }
977 
978  sc = thisSlot.algsStates.updateState( iAlgo, state );
979 
980  if ( sc.isSuccess() )
981  if ( msgLevel( MSG::VERBOSE ) )
982  verbose() << "Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
984 
985  return sc;
986 }
987 
988 //===========================================================================
990 {
991 
993  m_sState.push_back( SchedulerState( a, e, t ) );
994 }
995 
996 //===========================================================================
998 {
999 
1001 
1002  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1003  if ( *itr == a ) {
1004  m_sState.erase( itr );
1005  return true;
1006  }
1007  }
1008 
1009  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1010  return false;
1011 }
1012 
1013 //===========================================================================
1015 {
1016 
1018 
1019  for ( auto it : m_sState ) {
1020  ost << " " << it << std::endl;
1021  }
1022 }
1023 
1024 //===========================================================================
1026 {
1027 
1029 
1030  std::ostringstream ost;
1031  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1032  dumpState( ost );
1033 
1034  info() << ost.str() << endmsg;
1035 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
StatusCode initialize() override
Definition: Service.cpp:64
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:40
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
concurrency::recursive_CF::ExecutionFlowManager m_efManager
Member to take care of the control flow.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
Header file for class GaudiAlgorithm.
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode finalize() override
Finalise.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
SmartIF< IThreadPoolSvc > m_threadPoolSvc
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
void addAlg(Algorithm *, EventContext *, pthread_t)
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T join(T...args)
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
const DataObjIDColl & inputDataObjs() const override
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
unsigned int freeSlots() override
Get free slots number.
T bind(T...args)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
concurrency::recursive_CF::ControlFlowGraph * getCFGraph() const
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:25
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
T begin(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
bool delAlg(Algorithm *)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update states and decisions of algorithms.
T sort(T...args)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
void ignore() const
Definition: StatusCode.h:109
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
Gaudi::Property< int > m_threadPoolSize
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)