The Gaudi Framework  v33r1 (b1225454)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "AvalancheSchedulerSvc.h"
12 
13 #include "AlgoExecutionTask.h"
14 #include "IOBoundAlgTask.h"
15 
16 // Framework includes
19 #include "GaudiKernel/IAlgorithm.h"
22 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
23 
24 // C++
25 #include <algorithm>
26 #include <map>
27 #include <queue>
28 #include <sstream>
29 #include <thread>
30 #include <unordered_set>
31 
32 // External libs
33 #include "boost/algorithm/string.hpp"
34 #include "boost/thread.hpp"
35 #include "boost/tokenizer.hpp"
36 // DP waiting for the TBB service
37 #include "tbb/tbb_stddef.h"
38 #if TBB_INTERFACE_VERSION_MAJOR < 12
39 # include "tbb/task_scheduler_init.h"
40 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
41 
42 // Instantiation of a static factory class used by clients to create instances of this service
44 
45 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
46 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
47 
48 namespace {
49  struct DataObjIDSorter {
50  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
51  };
52 
53  // Sort a DataObjIDColl in a well-defined, reproducible manner.
54  // Used for making debugging dumps.
55  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
57  v.reserve( coll.size() );
58  for ( const DataObjID& id : coll ) v.push_back( &id );
59  std::sort( v.begin(), v.end(), DataObjIDSorter() );
60  return v;
61  }
62 
63  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
64  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
65  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
66  }
67 } // namespace
68 
69 //---------------------------------------------------------------------------
70 
78 
79  // Initialise mother class (read properties, ...)
81  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
82 
83  // Get hold of the TBBSvc. This should initialize the thread pool
84  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
85  if ( !m_threadPoolSvc.isValid() ) {
86  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
87  return StatusCode::FAILURE;
88  }
89 
90  // Activate the scheduler in another thread.
91  info() << "Activating scheduler in a separate thread" << endmsg;
92  m_thread = std::thread( [this]() { this->activate(); } );
93 
94  while ( m_isActive != ACTIVE ) {
95  if ( m_isActive == FAILURE ) {
96  fatal() << "Terminating initialization" << endmsg;
97  return StatusCode::FAILURE;
98  } else {
99  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
100  sleep( 1 );
101  }
102  }
103 
104  if ( m_enableCondSvc ) {
105  // Get hold of the CondSvc
106  m_condSvc = serviceLocator()->service( "CondSvc" );
107  if ( !m_condSvc.isValid() ) {
108  warning() << "No CondSvc found, or not enabled. "
109  << "Will not manage CondAlgorithms" << endmsg;
110  m_enableCondSvc = false;
111  }
112  }
113 
114  // Get the algo resource pool
115  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
116  if ( !m_algResourcePool.isValid() ) {
117  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
118  return StatusCode::FAILURE;
119  }
120 
121  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
122  if ( !m_algExecStateSvc.isValid() ) {
123  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
124  return StatusCode::FAILURE;
125  }
126 
127  // Get Whiteboard
129  if ( !m_whiteboard.isValid() ) {
130  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
131  return StatusCode::FAILURE;
132  }
133 
134  // Get dedicated scheduler for I/O-bound algorithms
135  if ( m_useIOBoundAlgScheduler ) {
138  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
139  }
140 
141  // Set the MaxEventsInFlight parameters from the number of WB stores
143 
144  // Set the number of free slots
146 
147  // Get the list of algorithms
149  const unsigned int algsNumber = algos.size();
150  if ( algsNumber != 0 ) {
151  info() << "Found " << algsNumber << " algorithms" << endmsg;
152  } else {
153  error() << "No algorithms found" << endmsg;
154  return StatusCode::FAILURE;
155  }
156 
157  /* Dependencies
158  1) Look for handles in algo, if none
159  2) Assume none are required
160  */
161 
162  DataObjIDColl globalInp, globalOutp;
163 
164  // figure out all outputs
165  for ( IAlgorithm* ialgoPtr : algos ) {
166  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
167  if ( !algoPtr ) {
168  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171  for ( auto id : algoPtr->outputDataObjs() ) globalOutp.insert( id );
172  }
173 
174  std::ostringstream ostdd;
175  ostdd << "Data Dependencies for Algorithms:";
176 
177  std::map<std::string, DataObjIDColl> algosDependenciesMap;
178  for ( IAlgorithm* ialgoPtr : algos ) {
179  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180  if ( nullptr == algoPtr ) {
181  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
182  << ": this will result in a crash." << endmsg;
183  return StatusCode::FAILURE;
184  }
185 
186  ostdd << "\n " << algoPtr->name();
187 
188  DataObjIDColl algoDependencies;
189  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
190  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
191  DataObjID id = *idp;
192  ostdd << "\n o INPUT " << id;
193  if ( id.key().find( ":" ) != std::string::npos ) {
194  ostdd << " contains alternatives which require resolution...\n";
195  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
196  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
197  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
198  } );
199  if ( itok != tokens.end() ) {
200  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
201  id.updateKey( *itok );
202  } else {
203  error() << "failed to find alternate in global output list"
204  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
205  m_showDataDeps = true;
206  }
207  }
208  algoDependencies.insert( id );
209  globalInp.insert( id );
210  }
211  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
212  ostdd << "\n o OUTPUT " << *id;
213  if ( id->key().find( ":" ) != std::string::npos ) {
214  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
215  << endmsg;
216  m_showDataDeps = true;
217  }
218  }
219  } else {
220  ostdd << "\n none";
221  }
222  algosDependenciesMap[algoPtr->name()] = algoDependencies;
223  }
224 
225  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
226 
227  // Check if we have unmet global input dependencies, and, optionally, heal them
228  // WARNING: this step must be done BEFORE the Precedence Service is initialized
229  if ( m_checkDeps ) {
230  DataObjIDColl unmetDep;
231  for ( auto o : globalInp )
232  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
233 
234  if ( unmetDep.size() > 0 ) {
235 
236  std::ostringstream ost;
237  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
238  ost << "\n o " << *o << " required by Algorithm: ";
239 
240  for ( const auto& p : algosDependenciesMap )
241  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
242  }
243 
244  if ( !m_useDataLoader.empty() ) {
245 
246  // Find the DataLoader Alg
247  IAlgorithm* dataLoaderAlg( nullptr );
248  for ( IAlgorithm* algo : algos )
249  if ( algo->name() == m_useDataLoader ) {
250  dataLoaderAlg = algo;
251  break;
252  }
253 
254  if ( dataLoaderAlg == nullptr ) {
255  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
256  << "\" found, and unmet INPUT dependencies "
257  << "detected:\n"
258  << ost.str() << endmsg;
259  return StatusCode::FAILURE;
260  }
261 
262  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
263  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
264 
265  // Set the property Load of DataLoader Alg
266  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
267  if ( !dataAlg ) {
268  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
269  << endmsg;
270  return StatusCode::FAILURE;
271  }
272 
273  for ( auto& id : unmetDep ) {
274  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
275  << dataLoaderAlg->name() << endmsg;
277  }
278 
279  } else {
280  fatal() << "Auto DataLoading not requested, "
281  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
282  return StatusCode::FAILURE;
283  }
284 
285  } else {
286  info() << "No unmet INPUT data dependencies were found" << endmsg;
287  }
288  }
289 
290  // Get the precedence service
291  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
292  if ( !m_precSvc.isValid() ) {
293  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
294  return StatusCode::FAILURE;
295  }
296  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
297  if ( !precSvc ) {
298  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
299  return StatusCode::FAILURE;
300  }
301 
302  // Fill the containers to convert algo names to index
303  m_algname_vect.resize( algsNumber );
304  for ( IAlgorithm* algo : algos ) {
305  const std::string& name = algo->name();
306  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
307  m_algname_index_map[name] = index;
308  m_algname_vect.at( index ) = name;
309  }
310 
311  // Shortcut for the message service
312  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
313  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
314 
316  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
317  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
318  m_eventSlots.back().complete = true;
319  }
320 
321  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
322 
323  // Clearly inform about the level of concurrency
324  info() << "Concurrency level information:" << endmsg;
325  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
326  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
327 
329 
331 
332  // Simulate execution flow
334 
335  return sc;
336 }
337 //---------------------------------------------------------------------------
338 
343 
345  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
346 
347  sc = deactivate();
348  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
349 
350  info() << "Joining Scheduler thread" << endmsg;
351  m_thread.join();
352 
353  // Final error check after thread pool termination
354  if ( m_isActive == FAILURE ) {
355  error() << "problems in scheduler thread" << endmsg;
356  return StatusCode::FAILURE;
357  }
358 
359  return sc;
360 }
361 //---------------------------------------------------------------------------
362 
374 
375  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
376 
378  error() << "problems initializing ThreadPoolSvc" << endmsg;
380  return;
381  }
382 
383  // Wait for actions pushed into the queue by finishing tasks.
384  action thisAction;
386 
387  m_isActive = ACTIVE;
388 
389  // Continue to wait if the scheduler is running or there is something to do
390  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
391  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
392  m_actionsQueue.pop( thisAction );
393  sc = thisAction();
394  ON_VERBOSE {
395  if ( sc.isFailure() )
396  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
397  else
398  verbose() << "Action succeeded." << endmsg;
399  }
400  else sc.ignore();
401 
402  // If all queued actions have been processed, update the slot states
403  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
404  sc = updateStates();
405  ON_VERBOSE {
406  if ( sc.isFailure() )
407  verbose() << "updateStates did not succeed (which is not bad per se)." << endmsg;
408  else
409  verbose() << "updateStates succeeded." << endmsg;
410  }
411  else sc.ignore();
412  }
413  }
414 
415  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
417  error() << "Problems terminating thread pool" << endmsg;
419  }
420 }
421 
422 //---------------------------------------------------------------------------
423 
431 
432  if ( m_isActive == ACTIVE ) {
433 
434  // Set the number of slots available to an error code
435  m_freeSlots.store( 0 );
436 
437  // Empty queue
438  action thisAction;
439  while ( m_actionsQueue.try_pop( thisAction ) ) {};
440 
441  // This would be the last action
442  m_actionsQueue.push( [this]() -> StatusCode {
443  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
445  return StatusCode::SUCCESS;
446  } );
447  }
448 
449  return StatusCode::SUCCESS;
450 }
451 
452 //---------------------------------------------------------------------------
453 
454 // EventSlot management
462 
463  if ( !eventContext ) {
464  fatal() << "Event context is nullptr" << endmsg;
465  return StatusCode::FAILURE;
466  }
467 
468  if ( m_freeSlots.load() == 0 ) {
469  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
470  return StatusCode::FAILURE;
471  }
472 
473  // no problem as push new event is only called from one thread (event loop manager)
474  --m_freeSlots;
475 
476  auto action = [this, eventContext]() -> StatusCode {
477  // Event processing slot forced to be the same as the wb slot
478  const unsigned int thisSlotNum = eventContext->slot();
479  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
480  if ( !thisSlot.complete ) {
481  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
482  return StatusCode::FAILURE;
483  }
484 
485  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
486  thisSlot.reset( eventContext );
487 
488  // Result status code:
490 
491  // promote to CR and DR the initial set of algorithms
492  Cause cs = {Cause::source::Root, "RootDecisionHub"};
493  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
494  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
495  result = StatusCode::FAILURE;
496  }
497 
498  if ( this->updateStates().isFailure() ) {
499  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
500  result = StatusCode::FAILURE;
501  }
502 
503  return result;
504  }; // end of lambda
505 
506  // Kick off the scheduling!
507  ON_VERBOSE {
508  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
509  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
510  }
511 
512  m_actionsQueue.push( action );
513 
514  return StatusCode::SUCCESS;
515 }
516 
517 //---------------------------------------------------------------------------
518 
520  StatusCode sc;
521  for ( auto context : eventContexts ) {
522  sc = pushNewEvent( context );
523  if ( sc != StatusCode::SUCCESS ) return sc;
524  }
525  return sc;
526 }
527 
528 //---------------------------------------------------------------------------
529 
530 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
531 
532 //---------------------------------------------------------------------------
537 
538  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
539  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
540  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
541  // << " active: " << m_isActive << endmsg;
542  return StatusCode::FAILURE;
543  } else {
544  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
545  // << " active: " << m_isActive << endmsg;
546  m_finishedEvents.pop( eventContext );
547  ++m_freeSlots;
548  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
549  return StatusCode::SUCCESS;
550  }
551 }
552 
553 //---------------------------------------------------------------------------
558 
559  if ( m_finishedEvents.try_pop( eventContext ) ) {
560  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
561  << endmsg;
562  ++m_freeSlots;
563  return StatusCode::SUCCESS;
564  }
565  return StatusCode::FAILURE;
566 }
567 
568 //--------------------------------------------------------------------------
569 // States Management
570 
581 
582  StatusCode global_sc( StatusCode::SUCCESS );
583 
584  // Retry algs
585  AlgQueueEntry queuePop;
586  const size_t retries = m_retryQueue.size();
587  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
588 
589  queuePop = m_retryQueue.front();
590  m_retryQueue.pop();
591 
592  global_sc = enqueue( queuePop.algIndex, queuePop.slotIndex, queuePop.contextPtr );
593  }
594 
595  // Loop over all slots
596  for ( EventSlot& thisSlot : m_eventSlots ) {
597 
598  // Ignore slots without a valid context (relevant when populating scheduler for first time)
599  if ( !thisSlot.eventContext ) continue;
600 
601  int iSlot = thisSlot.eventContext->slot();
602 
603  // Cache the states of the algos to improve readability and performance
604  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
605 
606  StatusCode partial_sc( StatusCode::FAILURE, true );
607 
608  // Perform DR->SCHEDULED
609  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
610  uint algIndex = *it;
611 
612  bool IOBound = false;
613  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
614 
615  if ( !IOBound )
616  partial_sc = enqueue( algIndex, iSlot, thisSlot.eventContext.get() );
617  else
618  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlot.eventContext.get() );
619 
620  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
621  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
622  << " on processing slot " << iSlot << endmsg;
623  }
624 
625  // Check for algorithms ready in sub-slots
626  for ( auto& subslot : thisSlot.allSubSlots ) {
627  auto& subslotStates = subslot.algsStates;
628  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
629  uint algIndex{*it};
630  partial_sc = enqueue( algIndex, iSlot, subslot.eventContext.get() );
631  // The following verbosity is expensive when the number of sub-slots is high
632  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
633  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
634  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
635  }
636  }
637 
638  if ( m_dumpIntraEventDynamics ) {
640  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
641  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
642  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
644 #if TBB_INTERFACE_VERSION_MAJOR < 12
645  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
646 #else
648 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
649  std::ofstream myfile;
650  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
651  myfile << s.str();
652  myfile.close();
653  }
654 
655  // Not complete because this would mean that the slot is already free!
656  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
657  !thisSlot.algsStates.containsAny(
658  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
659  !subSlotAlgsInStates( thisSlot,
660  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
661  !thisSlot.complete ) {
662 
663  thisSlot.complete = true;
664  // if the event did not fail, add it to the finished events
665  // otherwise it is taken care of in the error handling
666  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
667  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
668  << thisSlot.eventContext->slot() << ")." << endmsg;
669  m_finishedEvents.push( thisSlot.eventContext.release() );
670  }
671 
672  // now let's return the fully evaluated result of the control flow
673  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
674 
675  thisSlot.eventContext.reset( nullptr );
676 
677  } else if ( isStalled( thisSlot ) ) {
678  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
679  eventFailed( thisSlot.eventContext.get() ); // can't release yet
680  }
681  partial_sc.ignore();
682  } // end loop on slots
683 
684  ON_VERBOSE verbose() << "States Updated." << endmsg;
685  m_needsUpdate.store( false );
686  return global_sc;
687 }
688 
689 //---------------------------------------------------------------------------
690 // Update states in the appropriate event slot
692  bool iterate ) {
693  StatusCode updateSc;
694  EventSlot& thisSlot = m_eventSlots[contextPtr->slot()];
695  Cause cs = {Cause::source::Task, index2algname( iAlgo )};
696  if ( contextPtr->usesSubSlot() ) {
697  // Sub-slot
698  size_t const subSlotIndex = contextPtr->subSlot();
699  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
700  if ( updateSc.isSuccess() && iterate ) updateSc = m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs );
701  } else {
702  // Event level (standard behaviour)
703  updateSc = thisSlot.algsStates.set( iAlgo, state );
704  if ( updateSc.isSuccess() && iterate ) updateSc = m_precSvc->iterate( thisSlot, cs );
705  }
706  return updateSc;
707 }
708 
709 //---------------------------------------------------------------------------
710 
717 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
718 
719  if ( !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
720  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
721 
722  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
723 
724  return true;
725  }
726  return false;
727 }
728 
729 //---------------------------------------------------------------------------
730 
736  const uint slotIdx = eventContext->slot();
737 
738  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
739 
740  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
741 
742  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
744 
745  // Push into the finished events queue the failed context
746  m_eventSlots[slotIdx].complete = true;
747  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
748 }
749 
750 //---------------------------------------------------------------------------
751 
757 
758  // To have just one big message
759  std::ostringstream outputMS;
760 
761  outputMS << "Dumping scheduler state\n"
762  << "=========================================================================================\n"
763  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
764  << "=========================================================================================\n\n";
765 
766  //===========================================================================
767 
768  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
769  << "------------------\n\n";
770 
771  // Figure if TimelineSvc is available (used below to detect threads IDs)
772  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
773  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
774  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
775  } else {
776 
777  // Figure optimal printout layout
778  size_t indt( 0 );
779  for ( auto& slot : m_eventSlots )
780  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
781  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
782 
783  // Figure the last running schedule across all slots
784  for ( auto& slot : m_eventSlots ) {
785  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
786  ++it ) {
787 
788  const std::string algoName{index2algname( *it )};
789 
790  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
791  << slot.eventContext->slot();
792 
793  // Try to get POSIX threads IDs the currently running tasks are scheduled to
794  if ( timelineSvc.isValid() ) {
795  TimelineEvent te{};
796  te.algorithm = algoName;
797  te.slot = slot.eventContext->slot();
798  te.event = slot.eventContext->evt();
799 
800  if ( timelineSvc->getTimelineEvent( te ) )
801  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
802  else
803  outputMS << " thread.id: [unknown]"; // this means a task has just
804  // been signed off as SCHEDULED,
805  // but has not been assigned to a thread yet
806  // (i.e., not running yet)
807  }
808  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
809  }
810  }
811  }
812 
813  //===========================================================================
814 
815  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
816  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
817 
818  int slotCount = -1;
819  for ( auto& slot : m_eventSlots ) {
820  ++slotCount;
821  if ( slot.complete ) continue;
822 
823  outputMS << "[ slot: "
824  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
825  << " event: "
826  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
827  << " ]:\n\n";
828 
829  if ( 0 > iSlot || iSlot == slotCount ) {
830 
831  // Snapshot of the Control Flow and FSM states
832  outputMS << m_precSvc->printState( slot ) << "\n";
833 
834  // Mention sub slots (this is expensive if the number of sub-slots is high)
835  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
836  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
837  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
838  for ( auto& ss : slot.allSubSlots ) {
839  outputMS << "[ slot: " << slotID << ", sub-slot: "
840  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
841  << ", entry: " << ss.entryPoint << ", event: "
842  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
843  << " ]:\n\n";
844  outputMS << m_precSvc->printState( ss ) << "\n";
845  }
846  }
847  }
848  }
849 
850  //===========================================================================
851 
852  if ( 0 <= iSlot ) {
853  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
854  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
855  }
856 
857  outputMS << "\n=========================================================================================\n"
858  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
859  << "=========================================================================================\n\n";
860 
861  info() << outputMS.str() << endmsg;
862 }
863 
864 //---------------------------------------------------------------------------
865 
866 StatusCode AvalancheSchedulerSvc::enqueue( unsigned int iAlgo, int si, EventContext* eventContext ) {
867 
868  // Use the algorithm rank to sort the queue
869  const std::string& algName( index2algname( iAlgo ) );
870  unsigned int rank = 0;
871  if ( !m_optimizationMode.empty() ) { rank = m_precSvc->getPriority( algName ); }
872 
873  // Get algorithm pointer
874  IAlgorithm* iAlgoPtr = nullptr;
875  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( algName, iAlgoPtr ) );
876 
877  // Check if the algorithm is available
878  AState state;
879  if ( getAlgSC.isSuccess() ) {
880 
881  // Add the algorithm to the scheduled queue
882  m_scheduledQueue.push( {iAlgo, si, eventContext, rank, iAlgoPtr} );
883  ++m_algosInFlight;
884 
885  // Avoid to use tbb if the pool size is 1 and run in this thread
886  if ( -100 != m_threadPoolSize ) {
887 
888  // the child task that executes an Algorithm
889  tbb::task* algoTask =
890  new ( tbb::task::allocate_root() ) AlgoExecutionTask( this, serviceLocator(), m_algExecStateSvc );
891  // schedule the algoTask
892  tbb::task::enqueue( *algoTask );
893  } else {
894 
896  theTask.execute();
897  }
898 
899  ON_DEBUG debug() << "Algorithm " << index2algname( iAlgo ) << " was submitted on event " << eventContext->evt()
900  << " in slot " << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
901 
902  state = AState::SCHEDULED;
903  } else {
904 
905  // Add the algorithm to the retry queue
906  m_retryQueue.push( {iAlgo, si, eventContext, rank, nullptr} );
907 
908  state = AState::RESOURCELESS;
909  }
910 
911  // Update alg state
912  StatusCode updateSc = setAlgState( iAlgo, eventContext, state );
913 
915 
916  if ( updateSc.isSuccess() )
917  ON_VERBOSE verbose() << "Promoting " << index2algname( iAlgo ) << " to " << state << " on slot " << si << endmsg;
918  return updateSc;
919 }
920 
921 //---------------------------------------------------------------------------
922 
923 StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* eventContext ) {
924 
926 
927  // bool IOBound = m_precSvc->isBlocking(algName);
928 
929  const std::string& algName( index2algname( iAlgo ) );
930  IAlgorithm* ialgoPtr = nullptr;
931  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
932 
933  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
934 
936  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
937  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
938  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
939  eventContext );
940  } );
941  return StatusCode::SUCCESS;
942  };
943  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
944  // tbb::task)? it seems it works..
945 
946  // FIXME - The memory allocation here is causing memory leaks as detected by the gcc leak sanitizer
947  //
948  // clang-format off
949  // Direct leak of 224 byte(s) in 7 object(s) allocated from:
950  // #0 0x7fc0cb524da8 in operator new(unsigned long) /afs/cern.ch/cms/CAF/CMSCOMM/COMM_ECAL/dkonst/GCC/build/contrib/gcc-8.2.0/src/gcc/8.2.0/libsanitizer/lsan/lsan_interceptors.cc:229
951  // #1 0x7fc0ba979f7b in function<AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*)::<lambda()> > /cvmfs/lhcb.cern.ch/lib/lcg/releases/gcc/8.2.0-3fa06/x86_64-centos7/include/c++/8.2.0/bits/std_function.h:249
952  // #2 0x7fc0ba97d181 in AvalancheSchedulerSvc::promoteToAsyncScheduled(unsigned int, int, EventContext*) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:969
953  // #3 0x7fc0ba98354d in AvalancheSchedulerSvc::updateStates(int, int, int, int) ../GaudiHive/src/AvalancheSchedulerSvc.cpp:660
954  // clang-format on
955  //
956  // These leaks are currently suppressed in Gaudi/job/Gaudi-LSan.supp - remove entry there to reactivate
957  //
958  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
959  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
960  if ( sc = m_IOBoundAlgScheduler->push( *theTask ); !sc ) return sc;
961  //
962  // FIXME
963 
964  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
965  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
966 
967  // Update alg state
968  StatusCode updateSc = setAlgState( iAlgo, eventContext, AState::SCHEDULED );
969 
970  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
971  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
972  return updateSc;
973  } else {
974  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
975  << " on slot " << si << endmsg;
976  return sc;
977  }
978 }
979 
980 //---------------------------------------------------------------------------
981 
985 StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si, EventContext* eventContext ) {
986 
987  const std::string& algName( index2algname( iAlgo ) );
988 
989  Gaudi::Hive::setCurrentContext( eventContext );
990 
991  --m_algosInFlight;
992 
993  ON_DEBUG debug() << "Trying to handle execution result of " << algName << " on slot " << si << endmsg;
994 
995  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algName, *eventContext );
996  AState state = algstate.execStatus().isSuccess()
997  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
998  : AState::ERROR;
999 
1000  // Update alg state and iterate PrecedenceSvc
1001  StatusCode sc = setAlgState( iAlgo, eventContext, state, true );
1002 
1003  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1004  << "Promoting " << algName << " on slot " << si << " to " << state << endmsg;
1005 
1006  ON_DEBUG debug() << "Algorithm " << algName << " executed in slot " << si << ". Algorithms scheduled are "
1007  << m_algosInFlight << endmsg;
1008 
1009  // Prompt a call to updateStates
1010  m_needsUpdate.store( true );
1011  return sc;
1012 }
1013 
1014 //---------------------------------------------------------------------------
1015 
1020  EventContext* eventContext ) {
1021  Gaudi::Hive::setCurrentContext( eventContext );
1022  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1023 
1024  if ( sc.isFailure() ) {
1025  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1026  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1027  return StatusCode::FAILURE;
1028  }
1029 
1031 
1032  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1033  << endmsg;
1034 
1035  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1036  AState state = algstate.execStatus().isSuccess()
1037  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1038  : AState::ERROR;
1039 
1040  // Update alg state and iterate PrecedenceSvc
1041  sc = setAlgState( iAlgo, eventContext, state, true );
1042 
1043  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1044  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1045 
1046  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1047  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1048 
1049  // Prompt a call to updateStates
1050  m_needsUpdate.store( true );
1051  return sc;
1052 }
1053 
1054 //---------------------------------------------------------------------------
1055 
1056 // Method to inform the scheduler about event views
1057 
1059  std::unique_ptr<EventContext> viewContext ) {
1060  // Prevent view nesting
1061  if ( sourceContext->usesSubSlot() ) {
1062  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1063  return StatusCode::FAILURE;
1064  }
1065 
1066  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1067 
1068  // It's not possible to create an std::functional from a move-capturing lambda
1069  // So, we have to release the unique pointer
1070  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1071  &nodeName]() -> StatusCode {
1072  // Attach the sub-slot to the top-level slot
1073  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1074 
1075  if ( viewContextPtr ) {
1076  // Re-create the unique pointer
1077  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1078  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1079  return StatusCode::SUCCESS;
1080  } else {
1081  // Disable the view node if there are no views
1082  topSlot.disableSubSlots( nodeName );
1083  return StatusCode::SUCCESS;
1084  }
1085  };
1086 
1087  m_actionsQueue.push( std::move( action ) );
1088 
1089  return StatusCode::SUCCESS;
1090 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Wrapper around I/O-bound Gaudi-algorithms.
EventContext * contextPtr
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
Definition: Service.cpp:70
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
Class representing an event slot.
Definition: EventSlot.h:24
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:51
T empty(T... args)
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
T open(T... args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
ContextEvt_t evt() const
Definition: EventContext.h:50
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
StatusCode finalize() override
Definition: Service.cpp:174
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
std::atomic< bool > m_needsUpdate
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
T to_string(T... args)
T store(T... args)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::string algorithm
Definition: ITimelineSvc.h:31
bool filterPassed() const
void activate()
Activate scheduler.
T end(T... args)
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
Struct to hold entries in the alg queues.
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
T hardware_concurrency(T... args)
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
This class represents an entry point to all the event specific data.
Definition: EventContext.h:34
STL class.
unsigned int algIndex
std::string fullKey() const
Definition: DataObjID.cpp:99
T release(T... args)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
T setw(T... args)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T... args)
tbb::task * execute() override
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T... args)
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:86
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:284
unsigned int m_algosInFlight
Number of algorithms presently in flight.
T push_back(T... args)
STL class.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
T join(T... args)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T close(T... args)
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
T str(T... args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
bool isSuccess() const
Definition: StatusCode.h:365
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
T max(T... args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:38
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T... args)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
ContextID_t subSlot() const
Definition: EventContext.h:52
T get(T... args)
T insert(T... args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
T find_if(T... args)
T size(T... args)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:168
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
Gaudi::Property< bool > m_simulateExecution
StatusCode setAlgState(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
StatusCode updateStates()
Loop on algorithm in the slots and promote them to successive states.
T begin(T... args)
int slotIndex
Iterator begin(State kind)
T any_of(T... args)
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:89
virtual uint getPriority(const std::string &) const =0
Get task priority.
T back(T... args)
string s
Definition: gaudirun.py:328
StatusCode enqueue(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
constexpr static const auto FAILURE
Definition: StatusCode.h:101
T hex(T... args)
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
T sort(T... args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool isFailure() const
Definition: StatusCode.h:145
virtual StatusCode push(IAlgTask &task)=0
tbb::concurrent_priority_queue< AlgQueueEntry, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
std::queue< AlgQueueEntry > m_retryQueue
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
T load(T... args)
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:549
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
T reserve(T... args)
StatusCode promoteToExecuted(unsigned int iAlgo, int si, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.
bool usesSubSlot() const
Definition: EventContext.h:53
#define ON_VERBOSE
Iterator end(State kind)