ForwardSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
3 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
9 
10 // Local
11 #include "AlgResourcePool.h"
12 #include "AlgoExecutionTask.h"
13 #include "EFGraphVisitors.h"
14 #include "ForwardSchedulerSvc.h"
15 #include "IOBoundAlgTask.h"
16 
17 // C++
18 #include <algorithm>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_set>
23 
24 // Local
25 #include "ForwardSchedulerSvc.h"
26 #include "AlgoExecutionTask.h"
27 #include "AlgResourcePool.h"
28 #include "EFGraphVisitors.h"
29 #include "IOBoundAlgTask.h"
30 
31 // External libs
32 #include "boost/thread.hpp"
33 #include "boost/tokenizer.hpp"
34 #include "boost/algorithm/string.hpp"
35 #include "tbb/task.h"
36 // DP waiting for the TBB service
37 #include "tbb/task_scheduler_init.h"
38 
41 
42 // Instantiation of a static factory class used by clients to create instances of this service
44 
45 //===========================================================================
46 // Infrastructure methods
47 
48 
54 {
55 
56  // Initialise mother class (read properties, ...)
58  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
59 
60  // Get hold of the TBBSvc. This should initialize the thread pool
61  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
62  if ( !m_threadPoolSvc.isValid() ) {
63  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
64  return StatusCode::FAILURE;
65  }
66 
67  // Activate the scheduler in another thread.
68  info() << "Activating scheduler in a separate thread" << endmsg;
69  m_thread = std::thread( std::bind( &ForwardSchedulerSvc::activate, this ) );
70 
71  while ( m_isActive != ACTIVE ) {
72  if ( m_isActive == FAILURE ) {
73  fatal() << "Terminating initialization" << endmsg;
74  return StatusCode::FAILURE;
75  } else {
76  info() << "Waiting for ForwardSchedulerSvc to activate" << endmsg;
77  sleep( 1 );
78  }
79  }
80 
81  // Get the algo resource pool
82  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
83  if ( !m_algResourcePool.isValid() ) {
84  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
85  return StatusCode::FAILURE;
86  }
87 
88  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
89  if (!m_algExecStateSvc.isValid()) {
90  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
91  return StatusCode::FAILURE;
92  }
93 
94  // Get Whiteboard
95  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
96  if ( !m_whiteboard.isValid() ) {
97  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
98  return StatusCode::FAILURE;
99  }
100 
101  // Check the MaxEventsInFlight parameters and react
102  // Deprecated for the moment
103  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
104  if ( m_maxEventsInFlight != 0 ) {
105  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
106  << "Please migrate your code options files." << endmsg;
107 
108  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
109  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
110  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
111  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
112  }
113  }
114 
115  // Get dedicated scheduler for I/O-bound algorithms
116  if ( m_useIOBoundAlgScheduler ) {
117  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
118  if ( !m_IOBoundAlgScheduler.isValid() )
119  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
120  }
121  // Align the two quantities
122  m_maxEventsInFlight = numberOfWBSlots;
123 
124  // Set the number of free slots
125  m_freeSlots = m_maxEventsInFlight;
126 
127  if ( m_algosDependencies.size() != 0 ) {
128  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
129  << " FIX your job options #####" << endmsg;
130  }
131 
132  // Get the list of algorithms
133  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
134  const unsigned int algsNumber = algos.size();
135  info() << "Found " << algsNumber << " algorithms" << endmsg;
136 
137  /* Dependencies
138  1) Look for handles in algo, if none
139  2) Assume none are required
140  */
141 
142  DataObjIDColl globalInp, globalOutp;
143 
144  // figure out all outputs
145  for (IAlgorithm* ialgoPtr : algos) {
146  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
147  if (!algoPtr) {
148  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
149  }
150  for (auto id : algoPtr->outputDataObjs()) {
151  auto r = globalOutp.insert(id);
152  if (!r.second) {
153  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
154  }
155  }
156  }
157  info() << "outputs:\n" ;
158  for (const auto& i : globalOutp ) {
159  info() << i << '\n' ;
160  }
161  info() << endmsg;
162 
163 
164 
165  info() << "Data Dependencies for Algorithms:";
166 
167  std::vector<DataObjIDColl> m_algosDependencies;
168  for ( IAlgorithm* ialgoPtr : algos ) {
169  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
170  if ( nullptr == algoPtr )
171  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
172 
173  info() << "\n " << algoPtr->name();
174 
175  // FIXME
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( auto id : algoPtr->inputDataObjs() ) {
179  info() << "\n o INPUT " << id;
180  if (id.key().find(":")!=std::string::npos) {
181  info() << " contains alternatives which require resolution... " << endmsg;
182  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
183  auto itok = std::find_if( tokens.begin(), tokens.end(),
184  [&](const std::string& t) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if (itok!=tokens.end()) {
188  info() << "found matching output for " << *itok << " -- updating scheduler info" << endmsg;
189  id.updateKey(*itok);
190  } else {
191  error() << "failed to find alternate in global output list" << endmsg;
192  }
193  }
194  algoDependencies.insert( id );
195  globalInp.insert( id );
196  }
197  for ( auto id : algoPtr->outputDataObjs() ) {
198  info() << "\n o OUTPUT " << id;
199  if (id.key().find(":")!=std::string::npos) {
200  info() << " alternatives are NOT allowed for outputs..." << endmsg;
201  }
202  }
203  } else {
204  info() << "\n none";
205  }
206  m_algosDependencies.emplace_back( algoDependencies );
207  }
208  info() << endmsg;
209 
210  // Fill the containers to convert algo names to index
211  m_algname_vect.reserve( algsNumber );
212  unsigned int index = 0;
213  for ( IAlgorithm* algo : algos ) {
214  const std::string& name = algo->name();
215  m_algname_index_map[name] = index;
216  m_algname_vect.emplace_back( name );
217  index++;
218  }
219 
220  // Check if we have unmet global input dependencies
221  if ( m_checkDeps ) {
222  DataObjIDColl unmetDep;
223  for ( auto o : globalInp ) {
224  if ( globalOutp.find( o ) == globalOutp.end() ) {
225  unmetDep.insert( o );
226  }
227  }
228 
229  if ( unmetDep.size() > 0 ) {
230  fatal() << "The following unmet INPUT data dependencies were found: ";
231  for ( auto& o : unmetDep ) {
232  fatal() << "\n o " << o << " required by Algorithm: ";
233  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
234  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
235  fatal() << "\n * " << m_algname_vect[i];
236  }
237  }
238  }
239  fatal() << endmsg;
240  return StatusCode::FAILURE;
241  } else {
242  info() << "No unmet INPUT data dependencies were found" << endmsg;
243  }
244  }
245 
246  // prepare the control flow part
247  if ( m_CFNext ) m_DFNext = true; // force usage of new data flow machinery when new control flow is used
248  if ( !m_CFNext && !m_optimizationMode.empty() ) {
249  fatal() << "Execution optimization is only available with the graph-based execution flow management" << endmsg;
250  return StatusCode::FAILURE;
251  }
252  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
253  sc =
254  m_efManager.initialize( algPool->getExecutionFlowGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
255  unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
256 
257  // Shortcut for the message service
258  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
259  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
260 
261  m_eventSlots.assign( m_maxEventsInFlight,
262  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
263  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
264 
265  // Clearly inform about the level of concurrency
266  info() << "Concurrency level information:" << endmsg;
267  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
268  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
269  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
270 
271  // Simulating execution flow by only analyzing the graph topology and logic
272  if ( m_simulateExecution ) {
273  auto vis = concurrency::RunSimulator( 0 );
274  m_efManager.simulateExecutionFlow( vis );
275  }
276 
277  return sc;
278 }
279 //---------------------------------------------------------------------------
280 
285 {
286 
288  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
289 
290  sc = deactivate();
291  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
292 
293  info() << "Joining Scheduler thread" << endmsg;
294  m_thread.join();
295 
296  // Final error check after thread pool termination
297  if ( m_isActive == FAILURE ) {
298  error() << "problems in scheduler thread" << endmsg;
299  return StatusCode::FAILURE;
300  }
301 
302  // m_efManager.getExecutionFlowGraph()->dumpExecutionPlan();
303 
304  return sc;
305 }
306 //---------------------------------------------------------------------------
318 {
319 
320  if (msgLevel(MSG::DEBUG))
321  debug() << "ForwardSchedulerSvc::activate()" << endmsg;
322 
324  error() << "problems initializing ThreadPoolSvc" << endmsg;
326  return;
327  }
328 
329  // Wait for actions pushed into the queue by finishing tasks.
330  action thisAction;
332 
333  m_isActive = ACTIVE;
334 
335  // Continue to wait if the scheduler is running or there is something to do
336  info() << "Start checking the actionsQueue" << endmsg;
337  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
338  m_actionsQueue.pop( thisAction );
339  sc = thisAction();
340  if ( sc != StatusCode::SUCCESS )
341  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
342  else
343  verbose() << "Action succeeded." << endmsg;
344  }
345 
346  info() << "Terminating thread-pool resources" << endmsg;
348  error() << "Problems terminating thread pool" << endmsg;
350  }
351 }
352 
353 //---------------------------------------------------------------------------
354 
362 {
363 
364  if ( m_isActive == ACTIVE ) {
365  // Drain the scheduler
367  // This would be the last action
368  m_actionsQueue.push( [this]() -> StatusCode {
370  return StatusCode::SUCCESS;
371  } );
372  }
373 
374  return StatusCode::SUCCESS;
375 }
376 
377 //===========================================================================
378 
379 //===========================================================================
380 // Utils and shortcuts
381 
382 inline const std::string& ForwardSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
383 
384 //---------------------------------------------------------------------------
385 
386 inline unsigned int ForwardSchedulerSvc::algname2index( const std::string& algoname )
387 {
388  unsigned int index = m_algname_index_map[algoname];
389  return index;
390 }
391 
392 //===========================================================================
393 // EventSlot management
401 {
402 
403  if ( m_first ) {
404  m_first = false;
405  }
406 
407  if ( !eventContext ) {
408  fatal() << "Event context is nullptr" << endmsg;
409  return StatusCode::FAILURE;
410  }
411 
412  if ( m_freeSlots.load() == 0 ) {
413  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
414  return StatusCode::FAILURE;
415  }
416 
417  // no problem as push new event is only called from one thread (event loop manager)
418  m_freeSlots--;
419 
420  auto action = [this, eventContext]() -> StatusCode {
421  // Event processing slot forced to be the same as the wb slot
422  const unsigned int thisSlotNum = eventContext->slot();
423  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
424  if ( !thisSlot.complete ) {
425  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
426  return StatusCode::FAILURE;
427  }
428 
429  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
430  thisSlot.reset( eventContext );
431  // XXX: CF tests
432  if ( m_CFNext ) {
433  auto vis = concurrency::Trigger( thisSlotNum );
435  }
436 
437  return this->updateStates( thisSlotNum );
438  }; // end of lambda
439 
440  // Kick off the scheduling!
441  if ( msgLevel( MSG::VERBOSE ) ) {
442  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
443  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
444  }
445  m_actionsQueue.push( action );
446 
447  return StatusCode::SUCCESS;
448 }
449 
450 //---------------------------------------------------------------------------
452 {
453  StatusCode sc;
454  for ( auto context : eventContexts ) {
455  sc = pushNewEvent( context );
456  if ( sc != StatusCode::SUCCESS ) return sc;
457  }
458  return sc;
459 }
460 
461 //---------------------------------------------------------------------------
462 unsigned int ForwardSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
463 
464 //---------------------------------------------------------------------------
469 {
470 
471  unsigned int slotNum = 0;
472  for ( auto& thisSlot : m_eventSlots ) {
473  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
474  updateStates( slotNum );
475  }
476  slotNum++;
477  }
478  return StatusCode::SUCCESS;
479 }
480 
481 //---------------------------------------------------------------------------
486 {
487  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
488  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
489  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
490  // << " active: " << m_isActive << endmsg;
491  return StatusCode::FAILURE;
492  } else {
493  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
494  // << " active: " << m_isActive << endmsg;
495  m_finishedEvents.pop( eventContext );
496  m_freeSlots++;
497  if (msgLevel(MSG::DEBUG))
498  debug() << "Popped slot " << eventContext->slot() << "(event "
499  << eventContext->evt() << ")" << endmsg;
500  return StatusCode::SUCCESS;
501  }
502 }
503 
504 //---------------------------------------------------------------------------
509 {
510  if ( m_finishedEvents.try_pop( eventContext ) ) {
511  if ( msgLevel( MSG::DEBUG ) )
512  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
513  << endmsg;
514  m_freeSlots++;
515  return StatusCode::SUCCESS;
516  }
517  return StatusCode::FAILURE;
518 }
519 
520 //---------------------------------------------------------------------------
527 {
528 
529  // Set the number of slots available to an error code
530  m_freeSlots.store( 0 );
531 
532  fatal() << "*** Event " << eventContext->evt() << " on slot "
533  << eventContext->slot() << " failed! ***" << endmsg;
534 
535  std::ostringstream ost;
536  m_algExecStateSvc->dump(ost, *eventContext);
537 
538  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
539  << ":\n" << ost.str() << endmsg;
540 
541  dumpSchedulerState(-1);
542 
543  // Empty queue and deactivate the service
544  action thisAction;
545  while ( m_actionsQueue.try_pop( thisAction ) ) {
546  };
547  deactivate();
548 
549  // Push into the finished events queue the failed context
550  EventContext* thisEvtContext;
551  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
552  m_finishedEvents.push( thisEvtContext );
553  };
554  m_finishedEvents.push( eventContext );
555 
556  return StatusCode::FAILURE;
557 }
558 
559 //===========================================================================
560 
561 //===========================================================================
562 // States Management
563 
574 {
575 
576  m_updateNeeded = true;
577 
578  // Fill a map of initial state / action using closures.
579  // done to update the states w/o several if/elses
580  // Posterchild for constexpr with gcc4.7 onwards!
581  /*const std::map<AlgsExecutionStates::State, std::function<StatusCode(unsigned int iAlgo, int si)>>
582  statesTransitions = {
583  {AlgsExecutionStates::CONTROLREADY, std::bind(&ForwardSchedulerSvc::promoteToDataReady,
584  this,
585  std::placeholders::_1,
586  std::placeholders::_2)},
587  {AlgsExecutionStates::DATAREADY, std::bind(&ForwardSchedulerSvc::promoteToScheduled,
588  this,
589  std::placeholders::_1,
590  std::placeholders::_2)}
591  };*/
592 
593  StatusCode global_sc( StatusCode::FAILURE, true );
594 
595  // Sort from the oldest to the newest event
596  // Prepare a vector of pointers to the slots to avoid copies
597  std::vector<EventSlot*> eventSlotsPtrs;
598 
599  // Consider all slots if si <0 or just one otherwise
600  if ( si < 0 ) {
601  const int eventsSlotsSize( m_eventSlots.size() );
602  eventSlotsPtrs.reserve( eventsSlotsSize );
603  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
604  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
605  }
606  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
607  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
608  } else {
609  eventSlotsPtrs.push_back( &m_eventSlots[si] );
610  }
611 
612  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
613  int iSlot = thisSlotPtr->eventContext->slot();
614 
615  // Cache the states of the algos to improve readability and performance
616  auto& thisSlot = m_eventSlots[iSlot];
617  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
618 
619  // Take care of the control ready update
620  // XXX: CF tests
621  if ( !m_CFNext ) {
622  m_efManager.updateEventState( thisAlgsStates, thisSlot.controlFlowState );
623  } else {
624  if ( !algo_name.empty() )
625  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
626  }
627 
628  // DF note: all this this is a loop over all algs and applies CR->DR and DR->SCHD transistions
629  /*for (unsigned int iAlgo=0;iAlgo<m_algname_vect.size();++iAlgo){
630  const AlgsExecutionStates::State& algState = thisAlgsStates[iAlgo];
631  if (algState==AlgsExecutionStates::ERROR)
632  error() << " Algo " << index2algname(iAlgo) << " is in ERROR state." << endmsg;
633  // Loop on state transitions from the one suited to algo state up to the one for SCHEDULED.
634  partial_sc=StatusCode::SUCCESS;
635  for (auto state_transition = statesTransitions.find(algState);
636  state_transition!=statesTransitions.end() && partial_sc.isSuccess();
637  state_transition++){
638  partial_sc = state_transition->second(iAlgo,iSlot);
639  if (partial_sc.isFailure()){
640  verbose() << "Could not apply transition from "
641  << AlgsExecutionStates::stateNames[thisAlgsStates[iAlgo]]
642  << " for algorithm " << index2algname(iAlgo)
643  << " on processing slot " << iSlot << endmsg;
644  }
645  else{global_sc=partial_sc;}
646  } // end loop on transitions
647  }*/ // end loop on algos
648 
649  StatusCode partial_sc( StatusCode::FAILURE, true );
650  // first update CONTROLREADY to DATAREADY
651  if ( !m_CFNext ) {
652  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::CONTROLREADY );
653  it != thisAlgsStates.end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
654 
655  uint algIndex = *it;
656  partial_sc = promoteToDataReady(algIndex, iSlot);
657  if (partial_sc.isFailure())
658  if (msgLevel(MSG::VERBOSE))
659  verbose() << "Could not apply transition from "
660  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::CONTROLREADY]
661  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
662  }
663  }
664 
665  // now update DATAREADY to SCHEDULED
666  if ( !m_optimizationMode.empty() ) {
667  auto comp_nodes = [this]( const uint& i, const uint& j ) {
670  };
672  comp_nodes, std::vector<uint>() );
673  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
674  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
675  buffer.push( *it );
676  /*std::stringstream s;
677  auto buffer2 = buffer;
678  while (!buffer2.empty()) {
679  s << m_efManager.getExecutionFlowGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
680  buffer2.pop();
681  }
682  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
683 
684  /*while (!buffer.empty()) {
685  partial_sc = promoteToScheduled(buffer.top(), iSlot);
686  if (partial_sc.isFailure()) {
687  if (msgLevel(MSG::VERBOSE))
688  verbose() << "Could not apply transition from "
689  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
690  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
691  if (m_useIOBoundAlgScheduler) {
692  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
693  if (msgLevel(MSG::VERBOSE))
694  if (partial_sc.isFailure())
695  verbose() << "[Asynchronous] Could not apply transition from "
696  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
697  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
698  }
699  }
700  buffer.pop();
701  }*/
702  while ( !buffer.empty() ) {
703  bool IOBound = false;
706 
707  if ( !IOBound )
708  partial_sc = promoteToScheduled( buffer.top(), iSlot );
709  else
710  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
711 
712  if (msgLevel(MSG::VERBOSE))
713  if (partial_sc.isFailure())
714  verbose() << "Could not apply transition from "
715  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
716  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
717 
718  buffer.pop();
719  }
720 
721  } else {
722  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
723  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
724  uint algIndex = *it;
725 
726  bool IOBound = false;
729 
730  if ( !IOBound )
731  partial_sc = promoteToScheduled( algIndex, iSlot );
732  else
733  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
734 
735  if (msgLevel(MSG::VERBOSE))
736  if (partial_sc.isFailure())
737  verbose() << "Could not apply transition from "
738  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
739  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
740  }
741  }
742 
744  auto now = std::chrono::system_clock::now();
746  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
747  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
748  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
750  << "\n";
751  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
752  : std::to_string(tbb::task_scheduler_init::default_num_threads());
753  std::ofstream myfile;
754  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
755  myfile << s.str();
756  myfile.close();
757  }
758 
759  // Not complete because this would mean that the slot is already free!
760  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
761  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
762  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
763  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
764 
765  thisSlot.complete = true;
766  // if the event did not fail, add it to the finished events
767  // otherwise it is taken care of in the error handling already
768  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
769  m_finishedEvents.push(thisSlot.eventContext);
770  if (msgLevel(MSG::DEBUG))
771  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
772  << thisSlot.eventContext->slot() << ")." << endmsg;
773  }
774  // now let's return the fully evaluated result of the control flow
775  if ( msgLevel( MSG::DEBUG ) ) {
777  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
778  debug() << ss.str() << endmsg;
779  }
780 
781  thisSlot.eventContext = nullptr;
782  } else {
783  StatusCode eventStalledSC = isStalled(iSlot);
784  if (! eventStalledSC.isSuccess()) {
785  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
786  eventFailed(thisSlot.eventContext).ignore();
787  }
788  }
789  } // end loop on slots
790 
791  verbose() << "States Updated." << endmsg;
792 
793  return global_sc;
794 }
795 
796 //---------------------------------------------------------------------------
797 
805 {
806  // Get the slot
807  EventSlot& thisSlot = m_eventSlots[iSlot];
808 
809  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
811 
812  info() << "About to declare a stall" << endmsg;
813  fatal() << "*** Stall detected! ***\n" << endmsg;
814  dumpSchedulerState( iSlot );
815  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
816 
817  return StatusCode::FAILURE;
818  }
819  return StatusCode::SUCCESS;
820 }
821 
822 //---------------------------------------------------------------------------
823 
830 {
831 
832  // To have just one big message
833  std::ostringstream outputMessageStream;
834 
835  outputMessageStream << "============================== Execution Task State ============================="
836  << std::endl;
837  dumpState( outputMessageStream );
838 
839  outputMessageStream << std::endl
840  << "============================== Scheduler State ================================="
841  << std::endl;
842 
843  int slotCount = -1;
844  for ( auto thisSlot : m_eventSlots ) {
845  slotCount++;
846  if ( thisSlot.complete ) continue;
847 
848  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
849  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
850 
851  if ( 0 > iSlot or iSlot == slotCount ) {
852  outputMessageStream << "Algorithms states:" << std::endl;
853 
854  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
855  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
856  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
857  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
858  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
859  const int depsSize = deps.size();
860  if ( depsSize == 0 ) outputMessageStream << " none";
861 
862  DataObjIDColl missing;
863  for ( auto d : deps ) {
864  outputMessageStream << d << " ";
865  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
866  // outputMessageStream << "[missing] ";
867  missing.insert( d );
868  }
869  }
870 
871  if ( !missing.empty() ) {
872  outputMessageStream << ". The following are missing: ";
873  for ( auto d : missing ) {
874  outputMessageStream << d << " ";
875  }
876  }
877 
878  outputMessageStream << std::endl;
879  }
880 
881  // Snapshot of the WhiteBoard
882  outputMessageStream << "\nWhiteboard contents: " << std::endl;
883  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
884 
885  // Snapshot of the ControlFlow
886  outputMessageStream << "\nControl Flow:" << std::endl;
887  std::stringstream cFlowStateStringStream;
888  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
889 
890  outputMessageStream << cFlowStateStringStream.str() << std::endl;
891  }
892  }
893 
894  outputMessageStream << "=================================== END ======================================" << std::endl;
895 
896  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
897 }
898 
899 //---------------------------------------------------------------------------
900 
902 {
903 
904  // Do the control flow
905  StatusCode sc = m_eventSlots[si].algsStates.updateState(iAlgo,AlgsExecutionStates::CONTROLREADY);
906  if (sc.isSuccess())
907  if (msgLevel(MSG::VERBOSE))
908  verbose() << "Promoting " << index2algname(iAlgo) << " to CONTROLREADY on slot "
909  << si << endmsg;
910 
911  return sc;
912 }
913 
914 //---------------------------------------------------------------------------
915 
917 {
918 
919  StatusCode sc;
920  if ( !m_DFNext ) {
921  sc = m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
922  } else {
924  }
925 
926  StatusCode updateSc( StatusCode::FAILURE );
927  if ( sc == StatusCode::SUCCESS )
928  updateSc = m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::DATAREADY );
929 
930  if (updateSc.isSuccess())
931  if (msgLevel(MSG::VERBOSE))
932  verbose() << "Promoting " << index2algname(iAlgo) << " to DATAREADY on slot "
933  << si<< endmsg;
934 
935  return updateSc;
936 }
937 
938 //---------------------------------------------------------------------------
939 
941 {
942 
944 
945  const std::string& algName( index2algname( iAlgo ) );
946  IAlgorithm* ialgoPtr = nullptr;
947  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
948 
949  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
950  EventContext* eventContext( m_eventSlots[si].eventContext );
951  if ( !eventContext )
952  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
953 
954  ++m_algosInFlight;
955  // Avoid to use tbb if the pool size is 1 and run in this thread
956  if (-100 != m_threadPoolSize) {
957  tbb::task* t = new( tbb::task::allocate_root() )
958  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
959  this, m_algExecStateSvc);
960  tbb::task::enqueue( *t);
961  } else {
962  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext,
964  theTask.execute();
965  }
966 
967  if ( msgLevel( MSG::DEBUG ) )
968  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
969  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
970 
971  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
972 
973  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
974 
975  if (updateSc.isSuccess())
976  if (msgLevel(MSG::VERBOSE))
977  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
978  << si << endmsg;
979  return updateSc;
980  } else {
981  if ( msgLevel( MSG::DEBUG ) )
982  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
983  return sc;
984  }
985 }
986 
987 //---------------------------------------------------------------------------
988 
990 {
991 
993 
994  // bool IOBound = m_efManager.getExecutionFlowGraph()->getAlgorithmNode(algName)->isIOBound();
995 
996  const std::string& algName( index2algname( iAlgo ) );
997  IAlgorithm* ialgoPtr = nullptr;
998  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
999 
1000  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
1001  EventContext* eventContext( m_eventSlots[si].eventContext );
1002  if ( !eventContext )
1003  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
1004  << endmsg;
1005 
1007  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
1008  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
1009  IOBoundAlgTask(ialgoPtr, iAlgo, eventContext, serviceLocator(),
1010  this, m_algExecStateSvc);
1011  m_IOBoundAlgScheduler->push(*theTask);
1012 
1013  if (msgLevel(MSG::DEBUG))
1014  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
1015  << eventContext->evt() << " in slot " << si
1016  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1017 
1018  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
1019 
1020  if (updateSc.isSuccess())
1021  if (msgLevel(MSG::VERBOSE))
1022  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
1023  << " to SCHEDULED on slot " << si << endmsg;
1024  return updateSc;
1025  } else {
1026  if ( msgLevel( MSG::DEBUG ) )
1027  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
1028  << si << endmsg;
1029  return sc;
1030  }
1031 }
1032 
1033 //---------------------------------------------------------------------------
1038  EventContext* eventContext )
1039 {
1040 
1041  // Put back the instance
1042  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1043  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
1044  // EventContext* eventContext = castedAlgo->getContext();
1045 
1046  // Check if the execution failed
1047  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1048  eventFailed(eventContext).ignore();
1049 
1050  Gaudi::Hive::setCurrentContext(eventContext);
1051  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1052 
1053  if ( !sc.isSuccess() ) {
1054  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1055  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1056  return StatusCode::FAILURE;
1057  }
1058 
1059  m_algosInFlight--;
1060 
1061  EventSlot& thisSlot = m_eventSlots[si];
1062  // XXX: CF tests
1063  if ( !m_DFNext ) {
1064  // Update the catalog: some new products may be there
1065  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1066 
1067  // update prods in the dataflow
1068  // DP: Handles could be used. Just update what the algo wrote
1069  DataObjIDColl new_products;
1070  m_whiteboard->getNewDataObjects( new_products ).ignore();
1071  for ( const auto& new_product : new_products )
1072  if ( msgLevel( MSG::DEBUG ) ) debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1073  thisSlot.dataFlowMgr.updateDataObjectsCatalog( new_products );
1074  }
1075 
1076  if ( msgLevel( MSG::DEBUG ) )
1077  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1078  << m_algosInFlight << endmsg;
1079 
1080  // Limit number of updates
1081  if ( m_CFNext )
1082  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1083  if ( m_updateNeeded ) {
1084  // Schedule an update of the status of the algorithms
1085  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1086  m_actionsQueue.push( updateAction );
1087  m_updateNeeded = false;
1088  }
1089 
1090  if ( msgLevel( MSG::DEBUG ) )
1091  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1092  State state;
1093  if ( algo->filterPassed() ) {
1094  state = State::EVTACCEPTED;
1095  } else {
1096  state = State::EVTREJECTED;
1097  }
1098 
1099  sc = thisSlot.algsStates.updateState( iAlgo, state );
1100 
1101  if (sc.isSuccess())
1102  if (msgLevel(MSG::VERBOSE))
1103  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
1105 
1106  return sc;
1107 }
1108 
1109 //---------------------------------------------------------------------------
1114  EventContext* eventContext )
1115 {
1116 
1117  // Put back the instance
1118  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1119  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1120  // EventContext* eventContext = castedAlgo->getContext();
1121 
1122  // Check if the execution failed
1123  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1124  eventFailed(eventContext).ignore();
1125 
1126  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1127 
1128  if ( !sc.isSuccess() ) {
1129  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1130  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1131  return StatusCode::FAILURE;
1132  }
1133 
1135 
1136  EventSlot& thisSlot = m_eventSlots[si];
1137  // XXX: CF tests
1138  if ( !m_DFNext ) {
1139  // Update the catalog: some new products may be there
1140  m_whiteboard->selectStore( eventContext->slot() ).ignore();
1141 
1142  // update prods in the dataflow
1143  // DP: Handles could be used. Just update what the algo wrote
1144  DataObjIDColl new_products;
1145  m_whiteboard->getNewDataObjects(new_products).ignore();
1146  for (const auto& new_product : new_products)
1147  if (msgLevel(MSG::DEBUG))
1148  debug() << "Found in WB [" << si << "]: " << new_product << endmsg;
1149  thisSlot.dataFlowMgr.updateDataObjectsCatalog(new_products);
1150  }
1151 
1152  if (msgLevel(MSG::DEBUG))
1153  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1154  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1155 
1156  // Limit number of updates
1157  if ( m_CFNext )
1158  m_updateNeeded = true; // XXX: CF tests: with the new CF traversal the if clause below has to be removed
1159  if ( m_updateNeeded ) {
1160  // Schedule an update of the status of the algorithms
1161  auto updateAction = std::bind( &ForwardSchedulerSvc::updateStates, this, -1, algo->name() );
1162  m_actionsQueue.push( updateAction );
1163  m_updateNeeded = false;
1164  }
1165 
1166  if (msgLevel(MSG::DEBUG))
1167  debug() << "[Asynchronous] Trying to handle execution result of "
1168  << index2algname(iAlgo) << " on slot " << si << endmsg;
1169  State state;
1170  if ( algo->filterPassed() ) {
1171  state = State::EVTACCEPTED;
1172  } else {
1173  state = State::EVTREJECTED;
1174  }
1175 
1176  sc = thisSlot.algsStates.updateState( iAlgo, state );
1177 
1178  if (sc.isSuccess())
1179  if (msgLevel(MSG::VERBOSE))
1180  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1181  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1182 
1183  return sc;
1184 }
1185 
1186 //===========================================================================
1188 {
1189 
1191  m_sState.push_back( SchedulerState( a, e, t ) );
1192 }
1193 
1194 //===========================================================================
1196 {
1197 
1199 
1200  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1201  if ( *itr == a ) {
1202  m_sState.erase( itr );
1203  return true;
1204  }
1205  }
1206 
1207  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1208  return false;
1209 }
1210 
1211 //===========================================================================
1213 {
1214 
1216 
1217  for ( auto it : m_sState ) {
1218  ost << " " << it << std::endl;
1219  }
1220 }
1221 
1222 //===========================================================================
1224 {
1225 
1227 
1228  std::ostringstream ost;
1229  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1230  dumpState( ost );
1231 
1232  info() << ost.str() << endmsg;
1233 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode initialize() override
Definition: Service.cpp:64
Gaudi::Property< bool > m_CFNext
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
T empty(T...args)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T open(T...args)
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
const DataObjIDColl & inputDataObjs() const override
Definition: Algorithm.h:433
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
Header file for class GaudiAlgorithm.
T to_string(T...args)
StatusCode finalize() override
Finalise.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
T duration_cast(T...args)
T end(T...args)
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
bool isIOBound() const
Check if algorithm is I/O-bound.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
void addAlg(Algorithm *, EventContext *, pthread_t)
T push_back(T...args)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
Definition: EventSlot.h:41
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given &#39;slot&#39; for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T join(T...args)
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
const DataObjIDColl & outputDataObjs() const override
Definition: Algorithm.h:434
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
unsigned int freeSlots() override
Get free slots number.
T bind(T...args)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_maxEventsInFlight
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
GAUDI_API void setCurrentContext(const EventContext *ctx)
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
T begin(T...args)
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
Gaudi::Property< bool > m_dumpIntraEventDynamics
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T sort(T...args)
friend class AlgoExecutionTask
virtual StatusCode push(IAlgTask &task)=0
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
Gaudi::Property< int > m_threadPoolSize
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
const std::chrono::system_clock::time_point getInitTime() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)