The Gaudi Framework  v33r2 (a6f0ec87)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "AvalancheSchedulerSvc.h"
12 #include "AlgTask.h"
13 
14 // Framework includes
17 #include "GaudiKernel/IAlgorithm.h"
19 #include "GaudiKernel/ITask.h"
21 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
22 
23 // C++
24 #include <algorithm>
25 #include <map>
26 #include <queue>
27 #include <sstream>
28 #include <string_view>
29 #include <thread>
30 #include <unordered_set>
31 
32 // External libs
33 #include "boost/algorithm/string.hpp"
34 #include "boost/thread.hpp"
35 #include "boost/tokenizer.hpp"
36 // DP waiting for the TBB service
37 #include "tbb/tbb_stddef.h"
38 #if TBB_INTERFACE_VERSION_MAJOR < 12
39 # include "tbb/task_scheduler_init.h"
40 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
41 #include "tbb/task.h"
42 
43 // Instantiation of a static factory class used by clients to create instances of this service
45 
46 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
48 
49 namespace {
50  struct DataObjIDSorter {
51  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
52  };
53 
54  // Sort a DataObjIDColl in a well-defined, reproducible manner.
55  // Used for making debugging dumps.
56  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
58  v.reserve( coll.size() );
59  for ( const DataObjID& id : coll ) v.push_back( &id );
60  std::sort( v.begin(), v.end(), DataObjIDSorter() );
61  return v;
62  }
63 
64  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
65  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
66  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
67  }
68 } // namespace
69 
70 //---------------------------------------------------------------------------
71 
79 
80  // Initialise mother class (read properties, ...)
82  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83 
84  // Get hold of the TBBSvc. This should initialize the thread pool
85  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86  if ( !m_threadPoolSvc.isValid() ) {
87  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
88  return StatusCode::FAILURE;
89  }
90 
91  // Activate the scheduler in another thread.
92  info() << "Activating scheduler in a separate thread" << endmsg;
93  m_thread = std::thread( [this]() { this->activate(); } );
94 
95  while ( m_isActive != ACTIVE ) {
96  if ( m_isActive == FAILURE ) {
97  fatal() << "Terminating initialization" << endmsg;
98  return StatusCode::FAILURE;
99  } else {
100  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
101  sleep( 1 );
102  }
103  }
104 
105  if ( m_enableCondSvc ) {
106  // Get hold of the CondSvc
107  m_condSvc = serviceLocator()->service( "CondSvc" );
108  if ( !m_condSvc.isValid() ) {
109  warning() << "No CondSvc found, or not enabled. "
110  << "Will not manage CondAlgorithms" << endmsg;
111  m_enableCondSvc = false;
112  }
113  }
114 
115  // Get the algo resource pool
116  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
117  if ( !m_algResourcePool.isValid() ) {
118  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
119  return StatusCode::FAILURE;
120  }
121 
122  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
123  if ( !m_algExecStateSvc.isValid() ) {
124  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
125  return StatusCode::FAILURE;
126  }
127 
128  // Get Whiteboard
130  if ( !m_whiteboard.isValid() ) {
131  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
132  return StatusCode::FAILURE;
133  }
134 
135  // Set the MaxEventsInFlight parameters from the number of WB stores
137 
138  // Set the number of free slots
140 
141  // Get the list of algorithms
143  const unsigned int algsNumber = algos.size();
144  if ( algsNumber != 0 ) {
145  info() << "Found " << algsNumber << " algorithms" << endmsg;
146  } else {
147  error() << "No algorithms found" << endmsg;
148  return StatusCode::FAILURE;
149  }
150 
151  /* Dependencies
152  1) Look for handles in algo, if none
153  2) Assume none are required
154  */
155 
156  DataObjIDColl globalInp, globalOutp;
157 
158  // figure out all outputs
159  for ( IAlgorithm* ialgoPtr : algos ) {
160  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
161  if ( !algoPtr ) {
162  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
163  return StatusCode::FAILURE;
164  }
165  for ( auto id : algoPtr->outputDataObjs() ) globalOutp.insert( id );
166  }
167 
168  std::ostringstream ostdd;
169  ostdd << "Data Dependencies for Algorithms:";
170 
171  std::map<std::string, DataObjIDColl> algosDependenciesMap;
172  for ( IAlgorithm* ialgoPtr : algos ) {
173  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
174  if ( nullptr == algoPtr ) {
175  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176  << ": this will result in a crash." << endmsg;
177  return StatusCode::FAILURE;
178  }
179 
180  ostdd << "\n " << algoPtr->name();
181 
182  DataObjIDColl algoDependencies;
183  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
184  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
185  DataObjID id = *idp;
186  ostdd << "\n o INPUT " << id;
187  if ( id.key().find( ":" ) != std::string::npos ) {
188  ostdd << " contains alternatives which require resolution...\n";
189  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
190  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
191  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
192  } );
193  if ( itok != tokens.end() ) {
194  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
195  id.updateKey( *itok );
196  } else {
197  error() << "failed to find alternate in global output list"
198  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
199  m_showDataDeps = true;
200  }
201  }
202  algoDependencies.insert( id );
203  globalInp.insert( id );
204  }
205  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
206  ostdd << "\n o OUTPUT " << *id;
207  if ( id->key().find( ":" ) != std::string::npos ) {
208  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
209  << endmsg;
210  m_showDataDeps = true;
211  }
212  }
213  } else {
214  ostdd << "\n none";
215  }
216  algosDependenciesMap[algoPtr->name()] = algoDependencies;
217  }
218 
219  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
263  << endmsg;
264  return StatusCode::FAILURE;
265  }
266 
267  for ( auto& id : unmetDep ) {
268  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
269  << dataLoaderAlg->name() << endmsg;
271  }
272 
273  } else {
274  fatal() << "Auto DataLoading not requested, "
275  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
276  return StatusCode::FAILURE;
277  }
278 
279  } else {
280  info() << "No unmet INPUT data dependencies were found" << endmsg;
281  }
282  }
283 
284  // Get the precedence service
285  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
286  if ( !m_precSvc.isValid() ) {
287  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
291  if ( !precSvc ) {
292  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  // Fill the containers to convert algo names to index
297  m_algname_vect.resize( algsNumber );
298  for ( IAlgorithm* algo : algos ) {
299  const std::string& name = algo->name();
300  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
301  m_algname_index_map[name] = index;
302  m_algname_vect.at( index ) = name;
303  }
304 
305  // Shortcut for the message service
306  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
307  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
308 
310  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
312  m_eventSlots.back().complete = true;
313  }
314 
315  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
316 
317  // Clearly inform about the level of concurrency
318  info() << "Concurrency level information:" << endmsg;
319  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
320  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
321 
322  // Inform about task scheduling prescriptions
323  info() << "Task scheduling settings:" << endmsg;
324  info() << " o Avalanche generation mode: "
325  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
326  info() << " o Preemptive scheduling of CPU-blocking tasks: "
328  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
329  : "disabled" )
330  << endmsg;
331  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
332 
334 
336 
337  // Simulate execution flow
339 
340  return sc;
341 }
342 //---------------------------------------------------------------------------
343 
348 
350  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
351 
352  sc = deactivate();
353  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
354 
355  info() << "Joining Scheduler thread" << endmsg;
356  m_thread.join();
357 
358  // Final error check after thread pool termination
359  if ( m_isActive == FAILURE ) {
360  error() << "problems in scheduler thread" << endmsg;
361  return StatusCode::FAILURE;
362  }
363 
364  return sc;
365 }
366 //---------------------------------------------------------------------------
367 
379 
380  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
381 
383  error() << "problems initializing ThreadPoolSvc" << endmsg;
385  return;
386  }
387 
388  // Wait for actions pushed into the queue by finishing tasks.
389  action thisAction;
391 
392  m_isActive = ACTIVE;
393 
394  // Continue to wait if the scheduler is running or there is something to do
395  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
396  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
397  m_actionsQueue.pop( thisAction );
398  sc = thisAction();
399  ON_VERBOSE {
400  if ( sc.isFailure() )
401  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
402  else
403  verbose() << "Action succeeded." << endmsg;
404  }
405  else sc.ignore();
406 
407  // If all queued actions have been processed, update the slot states
408  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
409  sc = iterate();
410  ON_VERBOSE {
411  if ( sc.isFailure() )
412  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
413  else
414  verbose() << "Iteration succeeded." << endmsg;
415  }
416  else sc.ignore();
417  }
418  }
419 
420  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
422  error() << "Problems terminating thread pool" << endmsg;
424  }
425 }
426 
427 //---------------------------------------------------------------------------
428 
436 
437  if ( m_isActive == ACTIVE ) {
438 
439  // Set the number of slots available to an error code
440  m_freeSlots.store( 0 );
441 
442  // Empty queue
443  action thisAction;
444  while ( m_actionsQueue.try_pop( thisAction ) ) {};
445 
446  // This would be the last action
447  m_actionsQueue.push( [this]() -> StatusCode {
448  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
450  return StatusCode::SUCCESS;
451  } );
452  }
453 
454  return StatusCode::SUCCESS;
455 }
456 
457 //---------------------------------------------------------------------------
458 
459 // EventSlot management
467 
468  if ( !eventContext ) {
469  fatal() << "Event context is nullptr" << endmsg;
470  return StatusCode::FAILURE;
471  }
472 
473  if ( m_freeSlots.load() == 0 ) {
474  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
475  return StatusCode::FAILURE;
476  }
477 
478  // no problem as push new event is only called from one thread (event loop manager)
479  --m_freeSlots;
480 
481  auto action = [this, eventContext]() -> StatusCode {
482  // Event processing slot forced to be the same as the wb slot
483  const unsigned int thisSlotNum = eventContext->slot();
484  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
485  if ( !thisSlot.complete ) {
486  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
487  return StatusCode::FAILURE;
488  }
489 
490  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
491  thisSlot.reset( eventContext );
492 
493  // Result status code:
495 
496  // promote to CR and DR the initial set of algorithms
497  Cause cs = {Cause::source::Root, "RootDecisionHub"};
498  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
499  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
500  result = StatusCode::FAILURE;
501  }
502 
503  if ( this->iterate().isFailure() ) {
504  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
505  result = StatusCode::FAILURE;
506  }
507 
508  return result;
509  }; // end of lambda
510 
511  // Kick off scheduling
512  ON_VERBOSE {
513  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
514  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
515  }
516 
517  m_actionsQueue.push( action );
518 
519  return StatusCode::SUCCESS;
520 }
521 
522 //---------------------------------------------------------------------------
523 
525  StatusCode sc;
526  for ( auto context : eventContexts ) {
527  sc = pushNewEvent( context );
528  if ( sc != StatusCode::SUCCESS ) return sc;
529  }
530  return sc;
531 }
532 
533 //---------------------------------------------------------------------------
534 
535 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
536 
537 //---------------------------------------------------------------------------
542 
543  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
544  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
545  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
546  // << " active: " << m_isActive << endmsg;
547  return StatusCode::FAILURE;
548  } else {
549  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
550  // << " active: " << m_isActive << endmsg;
551  m_finishedEvents.pop( eventContext );
552  ++m_freeSlots;
553  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
554  return StatusCode::SUCCESS;
555  }
556 }
557 
558 //---------------------------------------------------------------------------
563 
564  if ( m_finishedEvents.try_pop( eventContext ) ) {
565  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
566  << endmsg;
567  ++m_freeSlots;
568  return StatusCode::SUCCESS;
569  }
570  return StatusCode::FAILURE;
571 }
572 
573 //--------------------------------------------------------------------------
574 
583 
584  StatusCode global_sc( StatusCode::SUCCESS );
585 
586  // Retry algorithms
587  const size_t retries = m_retryQueue.size();
588  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
589  TaskSpec retryTS = std::move( m_retryQueue.front() );
590  m_retryQueue.pop();
591  global_sc = schedule( std::move( retryTS ) );
592  }
593 
594  // Loop over all slots
595  for ( EventSlot& thisSlot : m_eventSlots ) {
596 
597  // Ignore slots without a valid context (relevant when populating scheduler for first time)
598  if ( !thisSlot.eventContext ) continue;
599 
600  int iSlot = thisSlot.eventContext->slot();
601 
602  // Cache the states of the algorithms to improve readability and performance
603  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
604 
605  StatusCode partial_sc( StatusCode::FAILURE, true );
606 
607  // Perform DR->SCHEDULED
608  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
609  uint algIndex{*it};
610  const std::string& algName{index2algname( algIndex )};
611  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
612  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
613 
614  partial_sc =
615  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
616 
617  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
618  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
619  << " on processing slot " << iSlot << endmsg;
620  }
621 
622  // Check for algorithms ready in sub-slots
623  for ( auto& subslot : thisSlot.allSubSlots ) {
624  auto& subslotStates = subslot.algsStates;
625  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
626  uint algIndex{*it};
627  const std::string& algName{index2algname( algIndex )};
628  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
629  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
630  partial_sc =
631  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
632  }
633  }
634 
635  if ( m_dumpIntraEventDynamics ) {
637  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
638  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
639  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
641 #if TBB_INTERFACE_VERSION_MAJOR < 12
642  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
643 #else
645 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
646  std::ofstream myfile;
647  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
648  myfile << s.str();
649  myfile.close();
650  }
651 
652  // Not complete because this would mean that the slot is already free!
653  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
654  !thisSlot.algsStates.containsAny(
655  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
656  !subSlotAlgsInStates( thisSlot,
657  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
658  !thisSlot.complete ) {
659 
660  thisSlot.complete = true;
661  // if the event did not fail, add it to the finished events
662  // otherwise it is taken care of in the error handling
663  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
664  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
665  << thisSlot.eventContext->slot() << ")." << endmsg;
666  m_finishedEvents.push( thisSlot.eventContext.release() );
667  }
668 
669  // now let's return the fully evaluated result of the control flow
670  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
671 
672  thisSlot.eventContext.reset( nullptr );
673 
674  } else if ( isStalled( thisSlot ) ) {
675  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
676  eventFailed( thisSlot.eventContext.get() ); // can't release yet
677  }
678  partial_sc.ignore();
679  } // end loop on slots
680 
681  ON_VERBOSE verbose() << "Iteration done." << endmsg;
682  m_needsUpdate.store( false );
683  return global_sc;
684 }
685 
686 //---------------------------------------------------------------------------
687 // Update algorithm state and, optionally, revise states of other downstream algorithms
688 StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
689  StatusCode sc;
690  auto slotIndex = contextPtr->slot();
691  EventSlot& slot = m_eventSlots[slotIndex];
692  Cause cs = {Cause::source::Task, index2algname( iAlgo )};
693 
694  if ( UNLIKELY( contextPtr->usesSubSlot() ) ) {
695  // Sub-slot
696  auto subSlotIndex = contextPtr->subSlot();
697  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
698 
699  sc = subSlot.algsStates.set( iAlgo, state );
700 
701  if ( LIKELY( sc.isSuccess() ) ) {
702  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
703  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
704  // Revise states of algorithms downstream the precedence graph
705  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
706  }
707  } else {
708  // Event level (standard behaviour)
709  sc = slot.algsStates.set( iAlgo, state );
710 
711  if ( LIKELY( sc.isSuccess() ) ) {
712  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
713  << ", event:" << contextPtr->evt() << "]" << endmsg;
714  // Revise states of algorithms downstream the precedence graph
715  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
716  }
717  }
718  return sc;
719 }
720 
721 //---------------------------------------------------------------------------
722 
729 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
730 
731  if ( !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
732  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
733 
734  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
735 
736  return true;
737  }
738  return false;
739 }
740 
741 //---------------------------------------------------------------------------
742 
748  const uint slotIdx = eventContext->slot();
749 
750  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
751 
752  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
753 
754  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
756 
757  // Push into the finished events queue the failed context
758  m_eventSlots[slotIdx].complete = true;
759  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
760 }
761 
762 //---------------------------------------------------------------------------
763 
769 
770  // To have just one big message
771  std::ostringstream outputMS;
772 
773  outputMS << "Dumping scheduler state\n"
774  << "=========================================================================================\n"
775  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
776  << "=========================================================================================\n\n";
777 
778  //===========================================================================
779 
780  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
781  << "------------------\n\n";
782 
783  // Figure if TimelineSvc is available (used below to detect threads IDs)
784  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
785  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
786  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
787  } else {
788 
789  // Figure optimal printout layout
790  size_t indt( 0 );
791  for ( auto& slot : m_eventSlots )
792  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
793  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
794 
795  // Figure the last running schedule across all slots
796  for ( auto& slot : m_eventSlots ) {
797  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
798  ++it ) {
799 
800  const std::string& algoName{index2algname( *it )};
801 
802  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
803  << slot.eventContext->slot();
804 
805  // Try to get POSIX threads IDs the currently running tasks are scheduled to
806  if ( timelineSvc.isValid() ) {
807  TimelineEvent te{};
808  te.algorithm = algoName;
809  te.slot = slot.eventContext->slot();
810  te.event = slot.eventContext->evt();
811 
812  if ( timelineSvc->getTimelineEvent( te ) )
813  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
814  else
815  outputMS << " thread.id: [unknown]"; // this means a task has just
816  // been signed off as SCHEDULED,
817  // but has not been assigned to a thread yet
818  // (i.e., not running yet)
819  }
820  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
821  }
822  }
823  }
824 
825  //===========================================================================
826 
827  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
828  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
829 
830  int slotCount = -1;
831  for ( auto& slot : m_eventSlots ) {
832  ++slotCount;
833  if ( slot.complete ) continue;
834 
835  outputMS << "[ slot: "
836  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
837  << " event: "
838  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
839  << " ]:\n\n";
840 
841  if ( 0 > iSlot || iSlot == slotCount ) {
842 
843  // Snapshot of the Control Flow and FSM states
844  outputMS << m_precSvc->printState( slot ) << "\n";
845 
846  // Mention sub slots (this is expensive if the number of sub-slots is high)
847  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
848  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
849  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
850  for ( auto& ss : slot.allSubSlots ) {
851  outputMS << "[ slot: " << slotID << ", sub-slot: "
852  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
853  << ", entry: " << ss.entryPoint << ", event: "
854  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
855  << " ]:\n\n";
856  outputMS << m_precSvc->printState( ss ) << "\n";
857  }
858  }
859  }
860  }
861 
862  //===========================================================================
863 
864  if ( 0 <= iSlot ) {
865  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
866  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
867  }
868 
869  outputMS << "\n=========================================================================================\n"
870  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
871  << "=========================================================================================\n\n";
872 
873  info() << outputMS.str() << endmsg;
874 }
875 
876 //---------------------------------------------------------------------------
877 
879 
881  m_retryQueue.push( std::move( ts ) );
882  return StatusCode::SUCCESS;
883  }
884 
885  // Check if a free Algorithm instance is available
886  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
887 
888  // If an instance is available, proceed to scheduling
889  StatusCode sc;
890  if ( LIKELY( getAlgSC.isSuccess() ) ) {
891 
892  // Decide how to schedule the task and schedule it
893  if ( LIKELY( -100 != m_threadPoolSize ) ) {
894 
895  // Cache values before moving the TaskSpec further
896  unsigned int algIndex{ts.algIndex};
897  std::string_view algName( ts.algName );
898  unsigned int algRank{ts.algRank};
899  bool blocking{ts.blocking};
900  int slotIndex{ts.slotIndex};
901  EventContext* contextPtr{ts.contextPtr};
902 
903  if ( LIKELY( !blocking ) ) {
904  // Add the algorithm to the scheduled queue
905  m_scheduledQueue.push( std::move( ts ) );
906 
907  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
908  auto algoTask =
909  new ( tbb::task::allocate_root() ) AlgTask<tbb::task>( this, serviceLocator(), m_algExecStateSvc );
910 
911  // schedule the task
912  tbb::task::enqueue( *algoTask );
913  ++m_algosInFlight;
914 
915  } else { // schedule blocking algorithm in independent thread
916 
917  // Prepare Gaudi task that will execute the Algorithm according to the above queued specs
918  auto algoTask = AlgTask<ITask>( std::move( ts ), this, serviceLocator(), m_algExecStateSvc );
919 
920  // Schedule the blocking task in an independent thread
922  std::thread _t( std::move( algoTask ) );
923  _t.detach();
924 
925  } // end scheduling blocking Algorithm
926 
927  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
928 
929  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
930  << ", rank:" << algRank << ", blocking:" << ( blocking ? "yes" : "no" )
931  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
933  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
934  : "" )
935  << endmsg;
936 
937  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
939  ++m_algosInFlight;
940  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
941  theTask();
942  --m_algosInFlight;
943  }
944  } else { // if no Algorithm instance available, retry later
945 
946  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
947  // Add the algorithm to the retry queue
948  m_retryQueue.push( std::move( ts ) );
949  }
950 
952 
953  return sc;
954 }
955 
956 //---------------------------------------------------------------------------
957 
962 
963  Gaudi::Hive::setCurrentContext( ts.contextPtr );
964 
965  if ( LIKELY( !ts.blocking ) )
966  --m_algosInFlight;
967  else
969 
970  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
971  AState state = algstate.execStatus().isSuccess()
972  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
973  : AState::ERROR;
974 
975  // Update algorithm state and revise the downstream states
976  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
977 
978  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
979  << ", rank:" << ts.algRank << ", blocking:" << ( ts.blocking ? "yes" : "no" )
980  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
982  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
983  : "" )
984  << endmsg;
985 
986  // Prompt a call to updateStates
987  m_needsUpdate.store( true );
988  return sc;
989 }
990 
991 //---------------------------------------------------------------------------
992 
993 // Method to inform the scheduler about event views
994 
996  std::unique_ptr<EventContext> viewContext ) {
997  // Prevent view nesting
998  if ( sourceContext->usesSubSlot() ) {
999  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1000  return StatusCode::FAILURE;
1001  }
1002 
1003  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1004 
1005  // It's not possible to create an std::functional from a move-capturing lambda
1006  // So, we have to release the unique pointer
1007  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1008  &nodeName]() -> StatusCode {
1009  // Attach the sub-slot to the top-level slot
1010  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1011 
1012  if ( viewContextPtr ) {
1013  // Re-create the unique pointer
1014  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1015  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1016  return StatusCode::SUCCESS;
1017  } else {
1018  // Disable the view node if there are no views
1019  topSlot.disableSubSlots( nodeName );
1020  return StatusCode::SUCCESS;
1021  }
1022  };
1023 
1024  m_actionsQueue.push( std::move( action ) );
1025 
1026  return StatusCode::SUCCESS;
1027 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Struct to hold entries in the alg queues.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
#define UNLIKELY(x)
Definition: Kernel.h:106
StatusCode initialize() override
Definition: Service.cpp:70
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
Class representing an event slot.
Definition: EventSlot.h:24
ContextID_t slot() const
Definition: EventContext.h:51
T empty(T... args)
Gaudi::Property< std::string > m_whiteboardSvcName
T open(T... args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
ContextEvt_t evt() const
Definition: EventContext.h:50
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
StatusCode finalize() override
Definition: Service.cpp:174
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
std::queue< TaskSpec > m_retryQueue
std::atomic< bool > m_needsUpdate
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
T to_string(T... args)
T store(T... args)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::string algorithm
Definition: ITimelineSvc.h:31
bool filterPassed() const
void activate()
Activate scheduler.
T end(T... args)
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
T hardware_concurrency(T... args)
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
This class represents an entry point to all the event specific data.
Definition: EventContext.h:34
STL class.
std::string fullKey() const
Definition: DataObjID.cpp:99
virtual StatusCode acquireAlgorithm(std::string_view name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
T release(T... args)
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
T setw(T... args)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T... args)
Gaudi::Property< bool > m_checkDeps
STL class.
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T... args)
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:86
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:284
unsigned int m_algosInFlight
Number of algorithms presently in flight.
T push_back(T... args)
STL class.
StatusCode schedule(TaskSpec &&)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
T join(T... args)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T close(T... args)
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T str(T... args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
bool isSuccess() const
Definition: StatusCode.h:366
SmartIF< IThreadPoolSvc > m_threadPoolSvc
T max(T... args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:38
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
T move(T... args)
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
ContextID_t subSlot() const
Definition: EventContext.h:52
T get(T... args)
T insert(T... args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
T find_if(T... args)
T size(T... args)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:168
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
STL class.
Gaudi::Property< bool > m_simulateExecution
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
T begin(T... args)
Iterator begin(State kind)
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
T any_of(T... args)
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:89
virtual uint getPriority(const std::string &) const =0
Get task priority.
T back(T... args)
string s
Definition: gaudirun.py:328
constexpr static const auto FAILURE
Definition: StatusCode.h:101
T hex(T... args)
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
T sort(T... args)
T detach(T... args)
StatusCode deactivate()
Deactivate scheduler.
bool isFailure() const
Definition: StatusCode.h:145
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
T load(T... args)
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:549
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
T reserve(T... args)
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.
bool usesSubSlot() const
Definition: EventContext.h:53
#define ON_VERBOSE
Iterator end(State kind)