All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
3 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
8 
9 // #include "tbb/task.h"
10 // #include "boost/thread.hpp"
11 
12 // // C++
13 // #include <unordered_set>
14 // #include <algorithm>
15 // #include <map>
16 // #include <sstream>
17 // #include <queue>
18 
19 // Local
20 #include "AlgResourcePool.h"
21 #include "AlgoExecutionTask.h"
22 #include "EFGraphVisitors.h"
23 #include "ForwardSchedulerSvc.h"
24 #include "IOBoundAlgTask.h"
25 
26 // C++
27 #include <algorithm>
28 #include <map>
29 #include <queue>
30 #include <sstream>
31 #include <unordered_set>
32 
33 // External libs
34 #include "boost/thread.hpp"
35 #include "tbb/task.h"
36 // DP waiting for the TBB service
37 #include "tbb/task_scheduler_init.h"
38 
41 
42 // Instantiation of a static factory class used by clients to create instances of this service
44 
45 //===========================================================================
46 // Infrastructure methods
53 {
54 
55  // Initialise mother class (read properties, ...)
57  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
58 
59  // Get hold of the TBBSvc. This should initialize the thread pool
60  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
61  if ( !m_threadPoolSvc.isValid() ) {
62  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
63  return StatusCode::FAILURE;
64  }
65 
66  // Activate the scheduler in another thread.
67  info() << "Activating scheduler in a separate thread" << endmsg;
68  m_thread = std::thread( std::bind( &ForwardSchedulerSvc::activate, this ) );
69 
70  while ( m_isActive != ACTIVE ) {
71  if ( m_isActive == FAILURE ) {
72  fatal() << "Terminating initialization" << endmsg;
73  return StatusCode::FAILURE;
74  } else {
75  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
76  sleep( 1 );
77  }
78  }
79 
80  // Get the algo resource pool
81  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
82  if ( !m_algResourcePool.isValid() ) {
83  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
84  return StatusCode::FAILURE;
85  }
86 
87  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
88  if (!m_algExecStateSvc.isValid()) {
89  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
90  return StatusCode::FAILURE;
91  }
92 
93  // Get Whiteboard
94  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
95  if ( !m_whiteboard.isValid() ) {
96  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
97  return StatusCode::FAILURE;
98  }
99 
100  // Check the MaxEventsInFlight parameters and react
101  // Deprecated for the moment
102  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
103  if ( m_maxEventsInFlight != 0 ) {
104  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
105  << "Please migrate your code options files." << endmsg;
106 
107  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
108  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
109  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
110  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
111  }
112  }
113 
114  // Get dedicated scheduler for I/O-bound algorithms
115  if ( m_useIOBoundAlgScheduler ) {
116  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
117  if ( !m_IOBoundAlgScheduler.isValid() )
118  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
119  }
120  // Align the two quantities
121  m_maxEventsInFlight = numberOfWBSlots;
122 
123  // Set the number of free slots
124  m_freeSlots = m_maxEventsInFlight;
125 
126  if ( m_algosDependencies.size() != 0 ) {
127  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
128  << " FIX your job options #####" << endmsg;
129  }
130 
131  // Get the list of algorithms
132  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
133  const unsigned int algsNumber = algos.size();
134  info() << "Found " << algsNumber << " algorithms" << endmsg;
135 
136  /* Dependencies
137  1) Look for handles in algo, if none
138  2) Assume none are required
139  */
140 
141  DataObjIDColl globalInp, globalOutp;
142 
143  info() << "Data Dependencies for Algorithms:";
144 
145  std::vector<DataObjIDColl> m_algosDependencies;
146  for ( IAlgorithm* ialgoPtr : algos ) {
147  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
148  if ( nullptr == algoPtr )
149  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
150 
151  info() << "\n " << algoPtr->name();
152 
153  // FIXME
154  DataObjIDColl algoDependencies;
155  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
156  for ( auto id : algoPtr->inputDataObjs() ) {
157  info() << "\n o INPUT " << id;
158  algoDependencies.insert( id );
159  globalInp.insert( id );
160  }
161  for ( auto id : algoPtr->outputDataObjs() ) {
162  info() << "\n o OUTPUT " << id;
163  globalOutp.insert( id );
164  }
165  } else {
166  info() << "\n none";
167  }
168  m_algosDependencies.emplace_back( algoDependencies );
169  }
170  info() << endmsg;
171 
172  // Fill the containers to convert algo names to index
173  m_algname_vect.reserve( algsNumber );
174  unsigned int index = 0;
175  for ( IAlgorithm* algo : algos ) {
176  const std::string& name = algo->name();
177  m_algname_index_map[name] = index;
178  m_algname_vect.emplace_back( name );
179  index++;
180  }
181 
182  // Check if we have unmet global input dependencies
183  if ( m_checkDeps ) {
184  DataObjIDColl unmetDep;
185  for ( auto o : globalInp ) {
186  if ( globalOutp.find( o ) == globalOutp.end() ) {
187  unmetDep.insert( o );
188  }
189  }
190 
191  if ( unmetDep.size() > 0 ) {
192  fatal() << "The following unmet INPUT data dependencies were found: ";
193  for ( auto& o : unmetDep ) {
194  fatal() << "\n o " << o << " required by Algorithm: ";
195  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
196  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
197  fatal() << "\n * " << m_algname_vect[i];
198  }
199  }
200  }
201  fatal() << endmsg;
202  return StatusCode::FAILURE;
203  } else {
204  info() << "No unmet INPUT data dependencies were found" << endmsg;
205  }
206  }
207 
208  // prepare the control flow part
209  if ( m_CFNext ) m_DFNext = true; // force usage of new data flow machinery when new control flow is used
210  if ( !m_CFNext && !m_optimizationMode.empty() ) {
211  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
212  return StatusCode::FAILURE;
213  }
214  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
215  sc =
216  m_efManager.initialize( algPool->getExecutionFlowGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
217  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
218 
219  // Shortcut for the message service
220  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
221  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
222 
223  m_eventSlots.assign( m_maxEventsInFlight,
224  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
225  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
226 
227  // Clearly inform about the level of concurrency
228  info() << "Concurrency level information:" << endmsg;
229  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
230  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
231  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
232 
233  // Simulating execution flow by only analyzing the graph topology and logic
234  if ( m_simulateExecution ) {
235  auto vis = concurrency::RunSimulator( 0 );
236  m_efManager.simulateExecutionFlow( vis );
237  }
238 
239  return sc;
240 }
241 //---------------------------------------------------------------------------
242 
247 {
248 
250  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
251 
252  sc = deactivate();
253  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
254 
255  info() << "Joining Scheduler thread" << endmsg;
256  m_thread.join();
257 
258  // Final error check after thread pool termination
259  if ( m_isActive == FAILURE ) {
260  error() << "problems in scheduler thread" << endmsg;
261  return StatusCode::FAILURE;
262  }
263 
264  // m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
265 
266  return sc;
267 }
268 //---------------------------------------------------------------------------
280 {
281 
282  if (msgLevel(MSG::DEBUG))
283  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
284 
286  error() << "problems initializing ThreadPoolSvc" << endmsg;
288  return;
289  }
290 
291  // Wait for actions pushed into the queue by finishing tasks.
292  action thisAction;
294 
295  m_isActive = ACTIVE;
296 
297  // Continue to wait if the scheduler is running or there is something to do
298  info() << "Start checking the actionsQueue" << endmsg;
299  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
300  m_actionsQueue.pop( thisAction );
301  sc = thisAction();
302  if ( sc != StatusCode::SUCCESS )
303  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
304  else
305  verbose() << "Action succeeded." << endmsg;
306  }
307 
308  info() << "Terminating thread-pool resources" << endmsg;
310  error() << "Problems terminating thread pool" << endmsg;
312  }
313 }
314 
315 //---------------------------------------------------------------------------
316 
324 {
325 
326  if ( m_isActive == ACTIVE ) {
327  // Drain the scheduler
329  // This would be the last action
330  m_actionsQueue.push( [this]() -> StatusCode {
332  return StatusCode::SUCCESS;
333  } );
334  }
335 
336  return StatusCode::SUCCESS;
337 }
338 
339 //===========================================================================
340 
341 //===========================================================================
342 // Utils and shortcuts
343 
344 inline const std::string& ForwardSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
345 
346 //---------------------------------------------------------------------------
347 
348 inline unsigned int ForwardSchedulerSvc::algname2index( const std::string& algoname )
349 {
350  unsigned int index = m_algname_index_map[algoname];
351  return index;
352 }
353 
354 //===========================================================================
355 // EventSlot management
363 {
364 
365  if ( m_first ) {
366  m_first = false;
367  }
368 
369  if ( !eventContext ) {
370  fatal() << "Event context is nullptr" << endmsg;
371  return StatusCode::FAILURE;
372  }
373 
374  if ( m_freeSlots.load() == 0 ) {
375  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
376  return StatusCode::FAILURE;
377  }
378 
379  // no problem as push new event is only called from one thread (event loop manager)
380  m_freeSlots--;
381 
382  auto action = [this, eventContext]() -> StatusCode {
383  // Event processing slot forced to be the same as the wb slot
384  const unsigned int thisSlotNum = eventContext->slot();
385  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
386  if ( !thisSlot.complete ) {
387  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
388  return StatusCode::FAILURE;
389  }
390 
391  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
392  thisSlot.reset( eventContext );
393  // XXX: CF tests
394  if ( m_CFNext ) {
395  auto vis = concurrency::Trigger( thisSlotNum );
397  }
398 
399  return this->updateStates( thisSlotNum );
400  }; // end of lambda
401 
402  // Kick off the scheduling!
403  if ( msgLevel( MSG::VERBOSE ) ) {
404  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
405  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
406  }
407  m_actionsQueue.push( action );
408 
409  return StatusCode::SUCCESS;
410 }
411 
412 //---------------------------------------------------------------------------
414 {
415  StatusCode sc;
416  for ( auto context : eventContexts ) {
417  sc = pushNewEvent( context );
418  if ( sc != StatusCode::SUCCESS ) return sc;
419  }
420  return sc;
421 }
422 
423 //---------------------------------------------------------------------------
424 unsigned int ForwardSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
425 
426 //---------------------------------------------------------------------------
431 {
432 
433  unsigned int slotNum = 0;
434  for ( auto& thisSlot : m_eventSlots ) {
435  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
436  updateStates( slotNum );
437  }
438  slotNum++;
439  }
440  return StatusCode::SUCCESS;
441 }
442 
443 //---------------------------------------------------------------------------
448 {
449  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
450  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
451  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
452  // << " active: " << m_isActive << endmsg;
453  return StatusCode::FAILURE;
454  } else {
455  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
456  // << " active: " << m_isActive << endmsg;
457  m_finishedEvents.pop( eventContext );
458  m_freeSlots++;
459  if (msgLevel(MSG::DEBUG))
460  debug() << "Popped slot " << eventContext->slot() << "(event "
461  << eventContext->evt() << ")" << endmsg;
462  return StatusCode::SUCCESS;
463  }
464 }
465 
466 //---------------------------------------------------------------------------
471 {
472  if ( m_finishedEvents.try_pop( eventContext ) ) {
473  if ( msgLevel( MSG::DEBUG ) )
474  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
475  << endmsg;
476  m_freeSlots++;
477  return StatusCode::SUCCESS;
478  }
479  return StatusCode::FAILURE;
480 }
481 
482 //---------------------------------------------------------------------------
489 {
490 
491  // Set the number of slots available to an error code
492  m_freeSlots.store( 0 );
493 
494  fatal() << "*** Event " << eventContext->evt() << " on slot "
495  << eventContext->slot() << " failed! ***" << endmsg;
496 
497  std::ostringstream ost;
498  m_algExecStateSvc->dump(ost, *eventContext);
499 
500  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
501  << ":\n" << ost.str() << endmsg;
502 
503  dumpSchedulerState(-1);
504 
505  // Empty queue and deactivate the service
506  action thisAction;
507  while ( m_actionsQueue.try_pop( thisAction ) ) {
508  };
509  deactivate();
510 
511  // Push into the finished events queue the failed context
512  EventContext* thisEvtContext;
513  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
514  m_finishedEvents.push( thisEvtContext );
515  };
516  m_finishedEvents.push( eventContext );
517 
518  return StatusCode::FAILURE;
519 }
520 
521 //===========================================================================
522 
523 //===========================================================================
524 // States Management
525 
536 {
537 
538  m_updateNeeded = true;
539 
540  // Fill a map of initial state / action using closures.
541  // done to update the states w/o several if/elses
542  // Posterchild for constexpr with gcc4.7 onwards!
543  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
544  statesTransitions = {
545  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
546  this,
547  std::placeholders::_1,
548  std::placeholders::_2)},
549  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
550  this,
551  std::placeholders::_1,
552  std::placeholders::_2)}
553  };*/
554 
555  StatusCode global_sc( StatusCode::FAILURE, true );
556 
557  // Sort from the oldest to the newest event
558  // Prepare a vector of pointers to the slots to avoid copies
559  std::vector<EventSlot*> eventSlotsPtrs;
560 
561  // Consider all slots if si <0 or just one otherwise
562  if ( si < 0 ) {
563  const int eventsSlotsSize( m_eventSlots.size() );
564  eventSlotsPtrs.reserve( eventsSlotsSize );
565  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
566  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
567  }
568  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
569  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
570  } else {
571  eventSlotsPtrs.push_back( &m_eventSlots[si] );
572  }
573 
574  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
575  int iSlot = thisSlotPtr->eventContext->slot();
576 
577  // Cache the states of the algos to improve readability and performance
578  auto& thisSlot = m_eventSlots[iSlot];
579  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
580 
581  // Take care of the control ready update
582  // XXX: CF tests
583  if ( !m_CFNext ) {
584  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
585  } else {
586  if ( !algo_name.empty() )
587  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
588  }
589 
590  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
591  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
592  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
593  if (algState==AlgsExecutionStates::ERROR)
594  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
595  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
596  partial_sc=StatusCode::SUCCESS;
597  for (auto state_transition = statesTransitions.find(algState);
598  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
599  state_transition++){
600  partial_sc = state_transition->second(iAlgo,iSlot);
601  if (partial_sc.isFailure()){
602  verbose() << "Could not apply transition from "
603  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
604  << " for algorithm " << index2algname(iAlgo)
605  << " on processing slot " << iSlot << endmsg;
606  }
607  else{global_sc=partial_sc;}
608  } // end loop on transitions
609  }*/ // end loop on algos
610 
611  StatusCode partial_sc( StatusCode::FAILURE, true );
612  // first update CONTROLREADY to DATAREADY
613  if ( !m_CFNext ) {
614  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
615  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
616 
617  uint algIndex = *it;
618  partial_sc = promoteToDataReady(algIndex, iSlot);
619  if (partial_sc.isFailure())
620  if (msgLevel(MSG::VERBOSE))
621  verbose() << "Could not apply transition from "
622  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
623  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
624  }
625  }
626 
627  // now update DATAREADY to SCHEDULED
628  if ( !m_optimizationMode.empty() ) {
629  auto comp_nodes = [this]( const uint& i, const uint& j ) {
632  };
634  comp_nodes, std::vector<uint>() );
635  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
636  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
637  buffer.push( *it );
638  /*std::stringstream s;
639  auto buffer2 = buffer;
640  while (!buffer2.empty()) {
641  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
642  buffer2.pop();
643  }
644  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
645 
646  /*while (!buffer.empty()) {
647  partial_sc = promoteToScheduled(buffer.top(), iSlot);
648  if (partial_sc.isFailure()) {
649  if (msgLevel(MSG::VERBOSE))
650  verbose() << "Could not apply transition from "
651  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
652  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
653  if (m_useIOBoundAlgScheduler) {
654  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
655  if (msgLevel(MSG::VERBOSE))
656  if (partial_sc.isFailure())
657  verbose() << "[Asynchronous] Could not apply transition from "
658  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
659  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
660  }
661  }
662  buffer.pop();
663  }*/
664  while ( !buffer.empty() ) {
665  bool IOBound = false;
668 
669  if ( !IOBound )
670  partial_sc = promoteToScheduled( buffer.top(), iSlot );
671  else
672  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
673 
674  if (msgLevel(MSG::VERBOSE))
675  if (partial_sc.isFailure())
676  verbose() << "Could not apply transition from "
677  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
678  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
679 
680  buffer.pop();
681  }
682 
683  } else {
684  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
685  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
686  uint algIndex = *it;
687 
688  bool IOBound = false;
691 
692  if ( !IOBound )
693  partial_sc = promoteToScheduled( algIndex, iSlot );
694  else
695  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
696 
697  if (msgLevel(MSG::VERBOSE))
698  if (partial_sc.isFailure())
699  verbose() << "Could not apply transition from "
700  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
701  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
702  }
703  }
704 
708  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
709  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
710  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
712  << "\n";
713  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
714  : std::to_string(tbb::task_scheduler_init::default_num_threads());
715  std::ofstream myfile;
716  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
717  myfile << s.str();
718  myfile.close();
719  }
720 
721  // Not complete because this would mean that the slot is already free!
722  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
723  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
724  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
725  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
726 
727  thisSlot.complete = true;
728  // if the event did not fail, add it to the finished events
729  // otherwise it is taken care of in the error handling already
730  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
731  m_finishedEvents.push(thisSlot.eventContext);
732  if (msgLevel(MSG::DEBUG))
733  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
734  << thisSlot.eventContext->slot() << ")." << endmsg;
735  }
736  // now let's return the fully evaluated result of the control flow
737  if ( msgLevel( MSG::DEBUG ) ) {
739  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
740  debug() << ss.str() << endmsg;
741  }
742 
743  thisSlot.eventContext = nullptr;
744  } else {
745  StatusCode eventStalledSC = isStalled(iSlot);
746  if (! eventStalledSC.isSuccess()) {
747  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
748  eventFailed(thisSlot.eventContext).ignore();
749  }
750  }
751  } // end loop on slots
752 
753  verbose() << "States Updated." << endmsg;
754 
755  return global_sc;
756 }
757 
758 //---------------------------------------------------------------------------
759 
767 {
768  // Get the slot
769  EventSlot& thisSlot = m_eventSlots[iSlot];
770 
771  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
773 
774  info() << "About to declare a stall" << endmsg;
775  fatal() << "*** Stall detected! ***\n" << endmsg;
776  dumpSchedulerState( iSlot );
777  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
778 
779  return StatusCode::FAILURE;
780  }
781  return StatusCode::SUCCESS;
782 }
783 
784 //---------------------------------------------------------------------------
785 
792 {
793 
794  // To have just one big message
795  std::ostringstream outputMessageStream;
796 
797  outputMessageStream << "============================== Execution Task State ============================="
798  << std::endl;
799  dumpState( outputMessageStream );
800 
801  outputMessageStream << std::endl
802  << "============================== Scheduler State ================================="
803  << std::endl;
804 
805  int slotCount = -1;
806  for ( auto thisSlot : m_eventSlots ) {
807  slotCount++;
808  if ( thisSlot.complete ) continue;
809 
810  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
811  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
812 
813  if ( 0 > iSlot or iSlot == slotCount ) {
814  outputMessageStream << "Algorithms states:" << std::endl;
815 
816  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
817  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
818  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
819  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
820  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
821  const int depsSize = deps.size();
822  if ( depsSize == 0 ) outputMessageStream << " none";
823 
824  DataObjIDColl missing;
825  for ( auto d : deps ) {
826  outputMessageStream << d << " ";
827  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
828  // outputMessageStream << "[missing] ";
829  missing.insert( d );
830  }
831  }
832 
833  if ( !missing.empty() ) {
834  outputMessageStream << ". The following are missing: ";
835  for ( auto d : missing ) {
836  outputMessageStream << d << " ";
837  }
838  }
839 
840  outputMessageStream << std::endl;
841  }
842 
843  // Snapshot of the WhiteBoard
844  outputMessageStream << "\nWhiteboard contents: " << std::endl;
845  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
846 
847  // Snapshot of the ControlFlow
848  outputMessageStream << "\nControl Flow:" << std::endl;
849  std::stringstream cFlowStateStringStream;
850  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
851 
852  outputMessageStream << cFlowStateStringStream.str() << std::endl;
853  }
854  }
855 
856  outputMessageStream << "=================================== END ======================================" << std::endl;
857 
858  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
859 }
860 
861 //---------------------------------------------------------------------------
862 
864 {
865 
866  // Do the control flow
867  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
868  if (sc.isSuccess())
869  if (msgLevel(MSG::VERBOSE))
870  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
871  << si << endmsg;
872 
873  return sc;
874 }
875 
876 //---------------------------------------------------------------------------
877 
879 {
880 
881  StatusCode sc;
882  if ( !m_DFNext ) {
883  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
884  } else {
886  }
887 
888  StatusCode updateSc( StatusCode::FAILURE );
889  if ( sc == StatusCode::SUCCESS )
890  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
891 
892  if (updateSc.isSuccess())
893  if (msgLevel(MSG::VERBOSE))
894  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
895  << si<< endmsg;
896 
897  return updateSc;
898 }
899 
900 //---------------------------------------------------------------------------
901 
903 {
904 
906 
907  const std::string& algName( index2algname( iAlgo ) );
908  IAlgorithm* ialgoPtr = nullptr;
909  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
910 
911  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
912  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr ); // DP: expose the setter of the context?
913  EventContext* eventContext( m_eventSlots[si].eventContext );
914  if ( !eventContext )
915  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
916 
917  algoPtr->setContext( m_eventSlots[si].eventContext );
918  ++m_algosInFlight;
919  // Avoid to use tbb if the pool size is 1 and run in this thread
920  if (-100 != m_threadPoolSize) {
921  tbb::task* t = new( tbb::task::allocate_root() )
922  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
923  this, m_algExecStateSvc);
924  tbb::task::enqueue( *t);
925  } else {
926  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext,
928  theTask.execute();
929  }
930 
931  if ( msgLevel( MSG::DEBUG ) )
932  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
933  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
934 
935  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
936 
937  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
938 
939  if (updateSc.isSuccess())
940  if (msgLevel(MSG::VERBOSE))
941  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
942  << si << endmsg;
943  return updateSc;
944  } else {
945  if ( msgLevel( MSG::DEBUG ) )
946  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
947  return sc;
948  }
949 }
950 
951 //---------------------------------------------------------------------------
952 
954 {
955 
957 
958  // bool IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode(algName)->isIOBound();
959 
960  const std::string& algName( index2algname( iAlgo ) );
961  IAlgorithm* ialgoPtr = nullptr;
962  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
963 
964  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
965  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr ); // DP: expose the setter of the context?
966  EventContext* eventContext( m_eventSlots[si].eventContext );
967  if ( !eventContext )
968  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
969  << endmsg;
970 
971  algoPtr->setContext( m_eventSlots[si].eventContext );
973  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
974  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
975  IOBoundAlgTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
976  this, m_algExecStateSvc);
977  m_IOBoundAlgScheduler->push(*theTask);
978 
979  if (msgLevel(MSG::DEBUG))
980  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
981  << eventContext->evt() << " in slot " << si
982  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
983 
984  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
985 
986  if (updateSc.isSuccess())
987  if (msgLevel(MSG::VERBOSE))
988  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
989  << " to SCHEDULED on slot " << si << endmsg;
990  return updateSc;
991  } else {
992  if ( msgLevel( MSG::DEBUG ) )
993  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
994  << si << endmsg;
995  return sc;
996  }
997 }
998 
999 //---------------------------------------------------------------------------
1004  EventContext* eventContext )
1005 {
1006 
1007  // Put back the instance
1008  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1009  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1010  // EventContext* eventContext = castedAlgo->getContext();
1011 
1012  // Check if the execution failed
1013  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1014  eventFailed(eventContext).ignore();
1015 
1016  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1017 
1018  if ( !sc.isSuccess() ) {
1019  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1020  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1021  return StatusCode::FAILURE;
1022  }
1023 
1024  m_algosInFlight--;
1025 
1026  EventSlot& thisSlot = m_eventSlots[si];
1027  // XXX: CF tests
1028  if ( !m_DFNext ) {
1029  // Update the catalog: some new products may be there
1030  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1031 
1032  // update prods in the dataflow
1033  // DP: Handles could be used. Just update what the algo wrote
1034  DataObjIDColl new_products;
1035  m_whiteboard->getNewDataObjects( new_products ).ignore();
1036  for ( const auto& new_product : new_products )
1037  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1038  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
1039  }
1040 
1041  if ( msgLevel( MSG::DEBUG ) )
1042  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1043  << m_algosInFlight << endmsg;
1044 
1045  // Limit number of updates
1046  if ( m_CFNext )
1047  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1048  if ( m_updateNeeded ) {
1049  // Schedule an update of the status of the algorithms
1050  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1051  m_actionsQueue.push( updateAction );
1052  m_updateNeeded = false;
1053  }
1054 
1055  if ( msgLevel( MSG::DEBUG ) )
1056  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1057  State state;
1058  if ( algo->filterPassed() ) {
1059  state = State::EVTACCEPTED;
1060  } else {
1061  state = State::EVTREJECTED;
1062  }
1063 
1064  sc = thisSlot.algsStates.updateState( iAlgo, state );
1065 
1066  if (sc.isSuccess())
1067  if (msgLevel(MSG::VERBOSE))
1068  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1070 
1071  return sc;
1072 }
1073 
1074 //---------------------------------------------------------------------------
1079  EventContext* eventContext )
1080 {
1081 
1082  // Put back the instance
1083  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1084  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1085  // EventContext* eventContext = castedAlgo->getContext();
1086 
1087  // Check if the execution failed
1088  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1089  eventFailed(eventContext).ignore();
1090 
1091  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1092 
1093  if ( !sc.isSuccess() ) {
1094  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1095  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1096  return StatusCode::FAILURE;
1097  }
1098 
1100 
1101  EventSlot& thisSlot = m_eventSlots[si];
1102  // XXX: CF tests
1103  if ( !m_DFNext ) {
1104  // Update the catalog: some new products may be there
1105  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1106 
1107  // update prods in the dataflow
1108  // DP: Handles could be used. Just update what the algo wrote
1109  DataObjIDColl new_products;
1110  m_whiteboard->getNewDataObjects(new_products).ignore();
1111  for (const auto& new_product : new_products)
1112  if (msgLevel(MSG::DEBUG))
1113  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1114  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
1115  }
1116 
1117  if (msgLevel(MSG::DEBUG))
1118  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1119  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1120 
1121  // Limit number of updates
1122  if ( m_CFNext )
1123  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1124  if ( m_updateNeeded ) {
1125  // Schedule an update of the status of the algorithms
1126  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1127  m_actionsQueue.push( updateAction );
1128  m_updateNeeded = false;
1129  }
1130 
1131  if (msgLevel(MSG::DEBUG))
1132  debug() << "[Asynchronous] Trying to handle execution result of "
1133  << index2algname(iAlgo) << " on slot " << si << endmsg;
1134  State state;
1135  if ( algo->filterPassed() ) {
1136  state = State::EVTACCEPTED;
1137  } else {
1138  state = State::EVTREJECTED;
1139  }
1140 
1141  sc = thisSlot.algsStates.updateState( iAlgo, state );
1142 
1143  if (sc.isSuccess())
1144  if (msgLevel(MSG::VERBOSE))
1145  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1146  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1147 
1148  return sc;
1149 }
1150 
1151 //===========================================================================
1153 {
1154 
1156  m_sState.push_back( SchedulerState( a, e, t ) );
1157 }
1158 
1159 //===========================================================================
1161 {
1162 
1164 
1165  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1166  if ( *itr == a ) {
1167  m_sState.erase( itr );
1168  return true;
1169  }
1170  }
1171 
1172  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1173  return false;
1174 }
1175 
1176 //===========================================================================
1178 {
1179 
1181 
1182  for ( auto it : m_sState ) {
1183  ost << " " << it << std::endl;
1184  }
1185 }
1186 
1187 //===========================================================================
1189 {
1190 
1192 
1193  std::ostringstream ost;
1194  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1195  dumpState( ost );
1196 
1197  info() << ost.str() << endmsg;
1198 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode initialize() override
Definition: Service.cpp:64
Gaudi::Property< bool > m_CFNext
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
T empty(T...args)
virtual void setEventStatus(const EventStatus::Status &sc)=0
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T open(T...args)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:725
StatusCode finalize() override
Definition: Service.cpp:174
virtual const EventStatus::Status & eventStatus() const =0
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
const DataObjIDColl & inputDataObjs() const override
Definition: Algorithm.h:455
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
Header file for class GaudiAlgorithm.
T to_string(T...args)
StatusCode finalize() override
Finalise.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
void setContext(const EventContext *context) override
set the context
Definition: Algorithm.h:438
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
T duration_cast(T...args)
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
bool isIOBound() const
Check if algorithm is I/O-bound.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
virtual void dump(std::ostringstream &ost) const =0
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
void addAlg(Algorithm *, EventContext *, pthread_t)
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T join(T...args)
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
const DataObjIDColl & outputDataObjs() const override
Definition: Algorithm.h:456
unsigned int freeSlots() override
Get free slots number.
T bind(T...args)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
Gaudi::Property< bool > m_dumpIntraEventDynamics
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
friend class AlgoExecutionTask
virtual StatusCode push(IAlgTask &task)=0
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
Gaudi::Property< int > m_threadPoolSize
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
const std::chrono::system_clock::time_point getInitTime() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)