The Gaudi Framework  v28r3 (cc1cf868)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 #include "AlgResourcePool.h"
3 #include "AlgoExecutionTask.h"
4 #include "PRGraphVisitors.h"
5 #include "IOBoundAlgTask.h"
6 
7 // Framework includes
9 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
11 #include "GaudiKernel/IAlgorithm.h"
13 #include "GaudiKernel/SvcFactory.h"
16 
17 // C++
18 #include <algorithm>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_set>
23 
24 // External libs
25 #include "boost/thread.hpp"
26 #include "boost/tokenizer.hpp"
27 #include "boost/algorithm/string.hpp"
28 // DP waiting for the TBB service
29 #include "tbb/task_scheduler_init.h"
30 
33 
34 // Instantiation of a static factory class used by clients to create instances of this service
36 
37 
38 namespace {
39 struct DataObjIDSorter
40 {
41  bool operator() (const DataObjID* a, const DataObjID* b)
42  {
43  return a->fullKey() < b->fullKey();
44  }
45 };
46 
47 // Sort a DataObjIDColl in a well-defined, reproducible manner.
48 // Used for making debugging dumps.
49 std::vector<const DataObjID*> sortedDataObjIDColl (const DataObjIDColl& coll)
50 {
52  v.reserve( coll.size() );
53  for( const DataObjID& id : coll )
54  v.push_back( &id );
55  std::sort( v.begin(), v.end(), DataObjIDSorter() );
56  return v;
57 }
58 
59 
60 }
61 
62 //===========================================================================
63 // Infrastructure methods
64 
71 
72  // Initialise mother class (read properties, ...)
74  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
75 
76  // Get hold of the TBBSvc. This should initialize the thread pool
77  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
78  if ( !m_threadPoolSvc.isValid() ) {
79  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
80  return StatusCode::FAILURE;
81  }
82 
83  // Activate the scheduler in another thread.
84  info() << "Activating scheduler in a separate thread" << endmsg;
86 
87  while ( m_isActive != ACTIVE ) {
88  if ( m_isActive == FAILURE ) {
89  fatal() << "Terminating initialization" << endmsg;
90  return StatusCode::FAILURE;
91  } else {
92  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
93  sleep( 1 );
94  }
95  }
96 
97  // Get the algo resource pool
98  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
99  if ( !m_algResourcePool.isValid() ) {
100  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
101  return StatusCode::FAILURE;
102  }
103 
104  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
105  if (!m_algExecStateSvc.isValid()) {
106  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
107  return StatusCode::FAILURE;
108  }
109 
110  // Get Whiteboard
111  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
112  if ( !m_whiteboard.isValid() ) {
113  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
114  return StatusCode::FAILURE;
115  }
116 
117  // Get dedicated scheduler for I/O-bound algorithms
118  if ( m_useIOBoundAlgScheduler ) {
119  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
120  if ( !m_IOBoundAlgScheduler.isValid() )
121  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
122  }
123 
124  // Set the MaxEventsInFlight parameters from the number of WB stores
125  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
126 
127  // Set the number of free slots
128  m_freeSlots = m_maxEventsInFlight;
129 
130  // set global concurrency flags
132 
133  // Get the list of algorithms
134  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
135  const unsigned int algsNumber = algos.size();
136  info() << "Found " << algsNumber << " algorithms" << endmsg;
137 
138  /* Dependencies
139  1) Look for handles in algo, if none
140  2) Assume none are required
141  */
142 
143  DataObjIDColl globalInp, globalOutp;
144 
145  // figure out all outputs
146  for (IAlgorithm* ialgoPtr : algos) {
147  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
148  if (!algoPtr) {
149  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
150  }
151  for (auto id : algoPtr->outputDataObjs()) {
152  auto r = globalOutp.insert(id);
153  if (!r.second) {
154  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
155  }
156  }
157  }
158 
159  std::ostringstream ostdd;
160  ostdd << "Data Dependencies for Algorithms:";
161 
162  std::vector<DataObjIDColl> m_algosDependencies;
163  for ( IAlgorithm* ialgoPtr : algos ) {
164  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
165  if ( nullptr == algoPtr ) {
166  fatal() << "Could not convert IAlgorithm into Algorithm for "
167  << ialgoPtr->name()
168  << ": this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171 
172  ostdd << "\n " << algoPtr->name();
173 
174  DataObjIDColl algoDependencies;
175  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
176  for ( const DataObjID* idp : sortedDataObjIDColl (algoPtr->inputDataObjs()) ) {
177  DataObjID id = *idp;
178  ostdd << "\n o INPUT " << id;
179  if (id.key().find(":")!=std::string::npos) {
180  ostdd << " contains alternatives which require resolution...\n";
181  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
182  auto itok = std::find_if( tokens.begin(), tokens.end(),
183  [&](const std::string& t) {
184  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
185  } );
186  if (itok!=tokens.end()) {
187  ostdd << "found matching output for " << *itok
188  << " -- updating scheduler info\n";
189  id.updateKey(*itok);
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name()
193  << endmsg;
194  m_showDataDeps = true;
195  }
196  }
197  algoDependencies.insert( id );
198  globalInp.insert( id );
199  }
200  for ( const DataObjID* id : sortedDataObjIDColl (algoPtr->outputDataObjs()) ) {
201  ostdd << "\n o OUTPUT " << *id;
202  if (id->key().find(":")!=std::string::npos) {
203  error() << " in Alg " << algoPtr->name()
204  << " alternatives are NOT allowed for outputs! id: "
205  << *id << endmsg;
206  m_showDataDeps = true;
207  }
208  }
209  } else {
210  ostdd << "\n none";
211  }
212  m_algosDependencies.emplace_back( algoDependencies );
213  }
214 
215  if ( m_showDataDeps ) {
216  info() << ostdd.str() << endmsg;
217  }
218 
219  // Fill the containers to convert algo names to index
220  m_algname_vect.reserve( algsNumber );
221  unsigned int index = 0;
222  IAlgorithm* dataLoaderAlg( nullptr );
223  for ( IAlgorithm* algo : algos ) {
224  const std::string& name = algo->name();
225  m_algname_index_map[name] = index;
226  m_algname_vect.emplace_back( name );
227  if (algo->name() == m_useDataLoader) {
228  dataLoaderAlg = algo;
229  }
230  index++;
231  }
232 
233  // Check if we have unmet global input dependencies
234  if ( m_checkDeps ) {
235  DataObjIDColl unmetDep;
236  for ( auto o : globalInp ) {
237  if ( globalOutp.find( o ) == globalOutp.end() ) {
238  unmetDep.insert( o );
239  }
240  }
241 
242  if ( unmetDep.size() > 0 ) {
243 
244  std::ostringstream ost;
245  for ( const DataObjID* o : sortedDataObjIDColl (unmetDep) ) {
246  ost << "\n o " << *o << " required by Algorithm: ";
247  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
248  if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].end() ) {
249  ost << "\n * " << m_algname_vect[i];
250  }
251  }
252  }
253 
254  if ( m_useDataLoader != "" ) {
255  // Find the DataLoader Alg
256  if (dataLoaderAlg == nullptr) {
257  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
258  << "\" found, and unmet INPUT dependencies "
259  << "detected:\n" << ost.str() << endmsg;
260  return StatusCode::FAILURE;
261  }
262 
263  info() << "Will attribute the following unmet INPUT dependencies to \""
264  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
265  << "\" Algorithm"
266  << ost.str() << endmsg;
267 
268  // Set the property Load of DataLoader Alg
269  Algorithm *dataAlg = dynamic_cast<Algorithm*>(dataLoaderAlg);
270  if ( !dataAlg ) {
271  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value()
272  << "\" IAlg to Algorithm" << endmsg;
273  return StatusCode::FAILURE;
274  }
275 
276  for (auto& id : unmetDep) {
277  debug() << "adding OUTPUT dep \"" << id << "\" to "
278  << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
279  << endmsg;
281  }
282 
283  } else {
284  fatal() << "Auto DataLoading not requested, "
285  << "and the following unmet INPUT dependencies were found:"
286  << ost.str() << endmsg;
287  return StatusCode::FAILURE;
288  }
289 
290  } else {
291  info() << "No unmet INPUT data dependencies were found" << endmsg;
292  }
293  }
294 
295  // prepare the control flow part
296  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
297  if ( !algPool ) {
298  fatal() << "Unable to dcast algResourcePool" << endmsg;
299  return StatusCode::FAILURE;
300  }
301  sc = m_efManager.initialize( algPool->getPRGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
302  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
303 
304  // Shortcut for the message service
305  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
306  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
307 
308  m_eventSlots.assign( m_maxEventsInFlight,
309  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
310  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
311 
312  if (m_threadPoolSize > 1) {
313  m_maxAlgosInFlight = (size_t) m_threadPoolSize;
314  }
315 
316  // Clearly inform about the level of concurrency
317  info() << "Concurrency level information:" << endmsg;
318  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
319  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
320 
321  m_efg = algPool->getPRGraph();
322 
323  if (m_showControlFlow) {
324  info() << std::endl
325  << "========== Algorithm and Sequence Configuration =========="
326  << std::endl << std::endl;
327  info() << m_efg->dumpControlFlow() << endmsg;
328  }
329 
330  if (m_showDataFlow) {
331  info() << std::endl
332  << "======================= Data Flow ========================"
333  << std::endl;
334  info() << m_efg->dumpDataFlow() << endmsg;
335  }
336 
337  // Simulating execution flow by only analyzing the graph topology and logic
338  if ( m_simulateExecution ) {
339  auto vis = concurrency::RunSimulator( m_eventSlots[0] );
340  m_efManager.simulateExecutionFlow( vis );
341  }
342 
343  return sc;
344 }
345 //---------------------------------------------------------------------------
346 
351 
353  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
354 
355  sc = deactivate();
356  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
357 
358  info() << "Joining Scheduler thread" << endmsg;
359  m_thread.join();
360 
361  // Final error check after thread pool termination
362  if ( m_isActive == FAILURE ) {
363  error() << "problems in scheduler thread" << endmsg;
364  return StatusCode::FAILURE;
365  }
366 
367  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
368 
369  return sc;
370 }
371 //---------------------------------------------------------------------------
383 
384  if (msgLevel(MSG::DEBUG))
385  debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
386 
387  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
388  error() << "problems initializing ThreadPoolSvc" << endmsg;
389  m_isActive = FAILURE;
390  return;
391  }
392 
393  // Wait for actions pushed into the queue by finishing tasks.
394  action thisAction;
396 
397  m_isActive = ACTIVE;
398 
399  // Continue to wait if the scheduler is running or there is something to do
400  info() << "Start checking the actionsQueue" << endmsg;
401  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
402  m_actionsQueue.pop( thisAction );
403  sc = thisAction();
404  if ( sc != StatusCode::SUCCESS )
405  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
406  else
407  verbose() << "Action succeeded." << endmsg;
408  }
409 
410  info() << "Terminating thread-pool resources" << endmsg;
411  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
412  error() << "Problems terminating thread pool" << endmsg;
413  m_isActive = FAILURE;
414  }
415 }
416 
417 //---------------------------------------------------------------------------
418 
426 
427  if ( m_isActive == ACTIVE ) {
428  // Drain the scheduler
429  m_actionsQueue.push( std::bind( &AvalancheSchedulerSvc::m_drain, this ) );
430  // This would be the last action
431  m_actionsQueue.push( [this]() -> StatusCode {
432  m_isActive = INACTIVE;
433  return StatusCode::SUCCESS;
434  } );
435  }
436 
437  return StatusCode::SUCCESS;
438 }
439 
440 //===========================================================================
441 
442 //===========================================================================
443 // Utils and shortcuts
444 
445 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) {
446  return m_algname_vect[index];
447 }
448 
449 //---------------------------------------------------------------------------
450 
451 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname ) {
452  unsigned int index = m_algname_index_map[algoname];
453  return index;
454 }
455 
456 //===========================================================================
457 // EventSlot management
465 
466  if ( m_first ) {
467  m_first = false;
468  }
469 
470  if ( !eventContext ) {
471  fatal() << "Event context is nullptr" << endmsg;
472  return StatusCode::FAILURE;
473  }
474 
475  if ( m_freeSlots.load() == 0 ) {
476  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
477  return StatusCode::FAILURE;
478  }
479 
480  // no problem as push new event is only called from one thread (event loop manager)
481  m_freeSlots--;
482 
483  auto action = [this, eventContext]() -> StatusCode {
484  // Event processing slot forced to be the same as the wb slot
485  const unsigned int thisSlotNum = eventContext->slot();
486  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
487  if ( !thisSlot.complete ) {
488  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
489  return StatusCode::FAILURE;
490  }
491 
492  debug() << "Executing event " << eventContext->evt() << " on slot "
493  << thisSlotNum << endmsg;
494  thisSlot.reset( eventContext );
495 
496  // promote to CR and DR the initial set of algorithms
497  auto vis = concurrency::Supervisor( m_eventSlots[thisSlotNum] );
498  m_efManager.touchReadyAlgorithms( vis );
499 
500  return this->updateStates( thisSlotNum );
501  }; // end of lambda
502 
503  // Kick off the scheduling!
504  if ( msgLevel( MSG::VERBOSE ) ) {
505  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
506  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
507  }
508  m_actionsQueue.push( action );
509 
510  return StatusCode::SUCCESS;
511 }
512 
513 //---------------------------------------------------------------------------
515  StatusCode sc;
516  for ( auto context : eventContexts ) {
517  sc = pushNewEvent( context );
518  if ( sc != StatusCode::SUCCESS ) return sc;
519  }
520  return sc;
521 }
522 
523 //---------------------------------------------------------------------------
525  return std::max( m_freeSlots.load(), 0 );
526 }
527 
528 //---------------------------------------------------------------------------
533 
534  unsigned int slotNum = 0;
535  for ( auto& thisSlot : m_eventSlots ) {
536  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
537  updateStates( slotNum );
538  }
539  slotNum++;
540  }
541  return StatusCode::SUCCESS;
542 }
543 
544 //---------------------------------------------------------------------------
549  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
550  if ( m_freeSlots.load() == (int) m_maxEventsInFlight or
551  m_isActive == INACTIVE ) {
552  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
553  // << " active: " << m_isActive << endmsg;
554  return StatusCode::FAILURE;
555  } else {
556  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
557  // << " active: " << m_isActive << endmsg;
558  m_finishedEvents.pop( eventContext );
559  m_freeSlots++;
560  if (msgLevel(MSG::DEBUG))
561  debug() << "Popped slot " << eventContext->slot() << "(event "
562  << eventContext->evt() << ")" << endmsg;
563  return StatusCode::SUCCESS;
564  }
565 }
566 
567 //---------------------------------------------------------------------------
572  if ( m_finishedEvents.try_pop( eventContext ) ) {
573  if ( msgLevel( MSG::DEBUG ) )
574  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
575  << endmsg;
576  m_freeSlots++;
577  return StatusCode::SUCCESS;
578  }
579  return StatusCode::FAILURE;
580 }
581 
582 //---------------------------------------------------------------------------
589 
590  // Set the number of slots available to an error code
591  m_freeSlots.store( 0 );
592 
593  fatal() << "*** Event " << eventContext->evt() << " on slot "
594  << eventContext->slot() << " failed! ***" << endmsg;
595 
596  std::ostringstream ost;
597  m_algExecStateSvc->dump(ost, *eventContext);
598 
599  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
600  << ":\n" << ost.str() << endmsg;
601 
602  dumpSchedulerState(-1);
603 
604  // Empty queue and deactivate the service
605  action thisAction;
606  while ( m_actionsQueue.try_pop( thisAction ) ) {
607  };
608  deactivate();
609 
610  // Push into the finished events queue the failed context
611  EventContext* thisEvtContext;
612  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
613  m_finishedEvents.push( thisEvtContext );
614  };
615  m_finishedEvents.push( eventContext );
616 
617  return StatusCode::FAILURE;
618 }
619 
620 //===========================================================================
621 
622 //===========================================================================
623 // States Management
624 
635 
636  m_updateNeeded = true;
637 
638  StatusCode global_sc( StatusCode::FAILURE, true );
639 
640  // Sort from the oldest to the newest event
641  // Prepare a vector of pointers to the slots to avoid copies
642  std::vector<EventSlot*> eventSlotsPtrs;
643 
644  // Consider all slots if si <0 or just one otherwise
645  if ( si < 0 ) {
646  const int eventsSlotsSize( m_eventSlots.size() );
647  eventSlotsPtrs.reserve( eventsSlotsSize );
648  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
649  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
650  }
651  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
652  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
653  } else {
654  eventSlotsPtrs.push_back( &m_eventSlots[si] );
655  }
656 
657  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
658  int iSlot = thisSlotPtr->eventContext->slot();
659 
660  // Cache the states of the algos to improve readability and performance
661  auto& thisSlot = m_eventSlots[iSlot];
662  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
663 
664  // Take care of the control ready update
665  if ( !algo_name.empty() )
666  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
667 
668  StatusCode partial_sc( StatusCode::FAILURE, true );
669  // first update CONTROLREADY to DATAREADY
670 
671  // now update DATAREADY to SCHEDULED
672  if ( !m_optimizationMode.empty() ) {
673  auto comp_nodes = [this]( const uint& i, const uint& j ) {
674  return ( m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( i ) )->getRank() <
675  m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( j ) )->getRank() );
676  };
678  comp_nodes, std::vector<uint>() );
679  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
680  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
681  buffer.push( *it );
682  /*std::stringstream s;
683  auto buffer2 = buffer;
684  while (!buffer2.empty()) {
685  s << m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
686  buffer2.pop();
687  }
688  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
689 
690  /*while (!buffer.empty()) {
691  partial_sc = promoteToScheduled(buffer.top(), iSlot);
692  if (partial_sc.isFailure()) {
693  if (msgLevel(MSG::VERBOSE))
694  verbose() << "Could not apply transition from "
695  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
696  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
697  if (m_useIOBoundAlgScheduler) {
698  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
699  if (msgLevel(MSG::VERBOSE))
700  if (partial_sc.isFailure())
701  verbose() << "[Asynchronous] Could not apply transition from "
702  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
703  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
704  }
705  }
706  buffer.pop();
707  }*/
708  while ( !buffer.empty() ) {
709  bool IOBound = false;
710  if ( m_useIOBoundAlgScheduler )
711  IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( buffer.top() ) )->isIOBound();
712 
713  if ( !IOBound )
714  partial_sc = promoteToScheduled( buffer.top(), iSlot );
715  else
716  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
717 
718  if (msgLevel(MSG::VERBOSE))
719  if (partial_sc.isFailure())
720  verbose() << "Could not apply transition from "
721  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
722  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
723 
724  buffer.pop();
725  }
726 
727  } else {
728  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
729  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
730  uint algIndex = *it;
731 
732  bool IOBound = false;
733  if ( m_useIOBoundAlgScheduler )
734  IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( algIndex ) )->isIOBound();
735 
736  if ( !IOBound )
737  partial_sc = promoteToScheduled( algIndex, iSlot );
738  else
739  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
740 
741  if (msgLevel(MSG::VERBOSE))
742  if (partial_sc.isFailure())
743  verbose() << "Could not apply transition from "
744  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
745  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
746  }
747  }
748 
749  if (m_dumpIntraEventDynamics) {
750  auto now = std::chrono::system_clock::now();
752  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
753  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
754  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
755  << std::chrono::duration_cast<std::chrono::nanoseconds> (now - m_efManager.getPrecedenceRulesGraph()->getInitTime()).count()
756  << "\n";
757  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
758  : std::to_string(tbb::task_scheduler_init::default_num_threads());
759  std::ofstream myfile;
760  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
761  myfile << s.str();
762  myfile.close();
763  }
764 
765  // Not complete because this would mean that the slot is already free!
766  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
767  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
768  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
769  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
770 
771  thisSlot.complete = true;
772  // if the event did not fail, add it to the finished events
773  // otherwise it is taken care of in the error handling already
774  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
775  m_finishedEvents.push(thisSlot.eventContext);
776  if (msgLevel(MSG::DEBUG))
777  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
778  << thisSlot.eventContext->slot() << ")." << endmsg;
779  }
780  // now let's return the fully evaluated result of the control flow
781  if ( msgLevel( MSG::DEBUG ) ) {
783  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
784  debug() << ss.str() << endmsg;
785  }
786 
787  thisSlot.eventContext = nullptr;
788  } else {
789  StatusCode eventStalledSC = isStalled(iSlot);
790  if (! eventStalledSC.isSuccess()) {
791  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
792  eventFailed(thisSlot.eventContext).ignore();
793  }
794  }
795  } // end loop on slots
796 
797  verbose() << "States Updated." << endmsg;
798 
799  return global_sc;
800 }
801 
802 //---------------------------------------------------------------------------
803 
811  // Get the slot
812  EventSlot& thisSlot = m_eventSlots[iSlot];
813 
814  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
816 
817  info() << "About to declare a stall" << endmsg;
818  fatal() << "*** Stall detected! ***\n" << endmsg;
819  dumpSchedulerState( iSlot );
820  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
821 
822  return StatusCode::FAILURE;
823  }
824  return StatusCode::SUCCESS;
825 }
826 
827 //---------------------------------------------------------------------------
828 
835 
836  // To have just one big message
837  std::ostringstream outputMessageStream;
838 
839  outputMessageStream << "============================== Execution Task State ============================="
840  << std::endl;
841  dumpState( outputMessageStream );
842 
843  outputMessageStream << std::endl
844  << "============================== Scheduler State ================================="
845  << std::endl;
846 
847  int slotCount = -1;
848  for ( auto thisSlot : m_eventSlots ) {
849  slotCount++;
850  if ( thisSlot.complete ) continue;
851 
852  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
853  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
854 
855  if ( 0 > iSlot or iSlot == slotCount ) {
856  outputMessageStream << "Algorithms states:" << std::endl;
857 
858  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
859  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
860  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
861  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
862  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
863  const int depsSize = deps.size();
864  if ( depsSize == 0 ) outputMessageStream << " none";
865 
866  DataObjIDColl missing;
867  for ( auto d : deps ) {
868  outputMessageStream << d << " ";
869  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
870  // outputMessageStream << "[missing] ";
871  missing.insert( d );
872  }
873  }
874 
875  if ( !missing.empty() ) {
876  outputMessageStream << ". The following are missing: ";
877  for ( auto d : missing ) {
878  outputMessageStream << d << " ";
879  }
880  }
881 
882  outputMessageStream << std::endl;
883  }
884 
885  // Snapshot of the WhiteBoard
886  outputMessageStream << "\nWhiteboard contents: " << std::endl;
887  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
888 
889  // Snapshot of the ControlFlow
890  outputMessageStream << "\nControl Flow:" << std::endl;
891  std::stringstream cFlowStateStringStream;
892  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
893 
894  outputMessageStream << cFlowStateStringStream.str() << std::endl;
895  }
896  }
897 
898  outputMessageStream << "=================================== END ======================================" << std::endl;
899 
900  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
901 }
902 
903 //---------------------------------------------------------------------------
904 
906 
907  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
908 
909  const std::string& algName( index2algname( iAlgo ) );
910  IAlgorithm* ialgoPtr = nullptr;
911  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
912 
913  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
914  EventContext* eventContext( m_eventSlots[si].eventContext );
915  if ( !eventContext ) {
916  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
917  return StatusCode::FAILURE;
918  }
919 
920  ++m_algosInFlight;
921  auto promote2ExecutedClosure = std::bind(&AvalancheSchedulerSvc::promoteToExecuted,
922  this,
923  iAlgo,
924  eventContext->slot(),
925  ialgoPtr,
926  eventContext);
927  // Avoid to use tbb if the pool size is 1 and run in this thread
928  if (-100 != m_threadPoolSize) {
929 
930  // this parent task is needed to promote an Algorithm as EXECUTED,
931  // it will be started as soon as the child task (see below) is completed
932  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
933  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
934  // setting parent's refcount to 1 is made here only for consistency
935  // (in this case since it is not scheduled explicitly and there it has only one child task)
936  triggerAlgoStateUpdate->set_ref_count(1);
937  // the child task that executes an Algorithm
938  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
939  AlgoExecutionTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
940  // schedule the algoTask
941  tbb::task::enqueue( *algoTask);
942 
943  } else {
944  AlgoExecutionTask theTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
945  theTask.execute();
946  promote2ExecutedClosure();
947  }
948 
949  if ( msgLevel( MSG::DEBUG ) )
950  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
951  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
952 
953  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
954 
955  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
956 
957  if (updateSc.isSuccess())
958  if (msgLevel(MSG::VERBOSE))
959  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
960  << si << endmsg;
961  return updateSc;
962  } else {
963  if ( msgLevel( MSG::DEBUG ) )
964  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
965  return sc;
966  }
967 }
968 
969 //---------------------------------------------------------------------------
970 
972 
973  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
974 
975  // bool IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(algName)->isIOBound();
976 
977  const std::string& algName( index2algname( iAlgo ) );
978  IAlgorithm* ialgoPtr = nullptr;
979  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
980 
981  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
982  EventContext* eventContext( m_eventSlots[si].eventContext );
983  if ( !eventContext ) {
984  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
985  << endmsg;
986  return StatusCode::FAILURE;
987  }
988 
989  ++m_IOBoundAlgosInFlight;
990  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
991  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
992  IOBoundAlgTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
993  m_IOBoundAlgScheduler->push(*theTask);
994 
995  if (msgLevel(MSG::DEBUG))
996  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
997  << eventContext->evt() << " in slot " << si
998  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
999 
1000  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
1001 
1002  if (updateSc.isSuccess())
1003  if (msgLevel(MSG::VERBOSE))
1004  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
1005  << " to SCHEDULED on slot " << si << endmsg;
1006  return updateSc;
1007  } else {
1008  if ( msgLevel( MSG::DEBUG ) )
1009  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
1010  << si << endmsg;
1011  return sc;
1012  }
1013 }
1014 
1015 //---------------------------------------------------------------------------
1020  EventContext* eventContext ) {
1021  // Put back the instance
1022  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1023  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1024  // EventContext* eventContext = castedAlgo->getContext();
1025 
1026  // Check if the execution failed
1027  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1028  eventFailed(eventContext).ignore();
1029 
1030  Gaudi::Hive::setCurrentContext(eventContext);
1031  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1032 
1033  if ( !sc.isSuccess() ) {
1034  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1035  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1036  return StatusCode::FAILURE;
1037  }
1038 
1039  m_algosInFlight--;
1040 
1041  EventSlot& thisSlot = m_eventSlots[si];
1042 
1043  if ( msgLevel( MSG::DEBUG ) )
1044  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1045  << m_algosInFlight << endmsg;
1046 
1047  // Schedule an update of the status of the algorithms
1048  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1049  m_actionsQueue.push( updateAction );
1050  m_updateNeeded = false;
1051 
1052  if ( msgLevel( MSG::DEBUG ) )
1053  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1054  State state;
1055  if ( algo->filterPassed() ) {
1056  state = State::EVTACCEPTED;
1057  } else {
1058  state = State::EVTREJECTED;
1059  }
1060 
1061  sc = thisSlot.algsStates.updateState( iAlgo, state );
1062 
1063  if (sc.isSuccess())
1064  if (msgLevel(MSG::VERBOSE))
1065  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1067 
1068  return sc;
1069 }
1070 
1071 //---------------------------------------------------------------------------
1076  EventContext* eventContext ) {
1077  // Put back the instance
1078  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1079  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1080  // EventContext* eventContext = castedAlgo->getContext();
1081 
1082  // Check if the execution failed
1083  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1084  eventFailed(eventContext).ignore();
1085 
1086  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1087 
1088  if ( !sc.isSuccess() ) {
1089  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1090  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1091  return StatusCode::FAILURE;
1092  }
1093 
1094  m_IOBoundAlgosInFlight--;
1095 
1096  EventSlot& thisSlot = m_eventSlots[si];
1097 
1098  if (msgLevel(MSG::DEBUG))
1099  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1100  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1101 
1102  // Schedule an update of the status of the algorithms
1103  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1104  m_actionsQueue.push( updateAction );
1105  m_updateNeeded = false;
1106 
1107  if (msgLevel(MSG::DEBUG))
1108  debug() << "[Asynchronous] Trying to handle execution result of "
1109  << index2algname(iAlgo) << " on slot " << si << endmsg;
1110  State state;
1111  if ( algo->filterPassed() ) {
1112  state = State::EVTACCEPTED;
1113  } else {
1114  state = State::EVTREJECTED;
1115  }
1116 
1117  sc = thisSlot.algsStates.updateState( iAlgo, state );
1118 
1119  if (sc.isSuccess())
1120  if (msgLevel(MSG::VERBOSE))
1121  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1122  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1123 
1124  return sc;
1125 }
1126 
1127 //===========================================================================
1129 {
1130 
1131  std::lock_guard<std::mutex> lock( m_ssMut );
1132  m_sState.push_back( SchedulerState( a, e, t ) );
1133 }
1134 
1135 //===========================================================================
1137 {
1138 
1139  std::lock_guard<std::mutex> lock( m_ssMut );
1140 
1141  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1142  if ( *itr == a ) {
1143  m_sState.erase( itr );
1144  return true;
1145  }
1146  }
1147 
1148  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1149  return false;
1150 }
1151 
1152 //===========================================================================
1154 {
1155 
1156  std::lock_guard<std::mutex> lock( m_ssMut );
1157 
1158  for ( auto it : m_sState ) {
1159  ost << " " << it << std::endl;
1160  }
1161 }
1162 
1163 //===========================================================================
1165 {
1166 
1167  std::lock_guard<std::mutex> lock( m_ssMut );
1168 
1169  std::ostringstream ost;
1170  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1171  dumpState( ost );
1172 
1173  info() << ost.str() << endmsg;
1174 }
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
Definition: Service.cpp:64
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
Header file for class GaudiAlgorithm.
T to_string(T...args)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
T duration_cast(T...args)
void activate()
Activate scheduler.
T end(T...args)
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
T push_back(T...args)
static std::list< SchedulerState > m_sState
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
T bind(T...args)
StatusCode finalize() override
Finalise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:213
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
GAUDI_API void setCurrentContext(const EventContext *ctx)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
T for_each(T...args)
std::string fullKey() const
Definition: DataObjID.cpp:64
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
StatusCode m_drain()
Drain the actions present in the queue.
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)