The Gaudi Framework  master (37c0b60a)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "AvalancheSchedulerSvc.h"
12 #include "AlgTask.h"
13 #include "FiberManager.h"
14 #include "ThreadPoolSvc.h"
15 
16 // Framework includes
17 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
20 #include <GaudiKernel/IAlgorithm.h>
24 
25 // C++
26 #include <algorithm>
27 #include <fstream>
28 #include <map>
29 #include <queue>
30 #include <regex>
31 #include <semaphore>
32 #include <sstream>
33 #include <string_view>
34 #include <thread>
35 #include <unordered_set>
36 
37 // External libs
38 #include <boost/algorithm/string.hpp>
39 #include <boost/thread.hpp>
40 #include <boost/tokenizer.hpp>
41 
42 // Instantiation of a static factory class used by clients to create instances of this service
44 
45 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
46 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
47 
48 namespace {
49  struct DataObjIDSorter {
50  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
51  };
52 
53  // Sort a DataObjIDColl in a well-defined, reproducible manner.
54  // Used for making debugging dumps.
55  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
57  v.reserve( coll.size() );
58  for ( const DataObjID& id : coll ) v.push_back( &id );
59  std::sort( v.begin(), v.end(), DataObjIDSorter() );
60  return v;
61  }
62 
63  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
64  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
65  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
66  }
67 } // namespace
68 
69 //---------------------------------------------------------------------------
70 
78 
79  // Initialise mother class (read properties, ...)
81  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
82 
83  // Get hold of the TBBSvc. This should initialize the thread pool
84  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
85  if ( !m_threadPoolSvc.isValid() ) {
86  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
87  return StatusCode::FAILURE;
88  }
89  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
90  if ( !castTPS ) {
91  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
92  return StatusCode::FAILURE;
93  }
94  m_arena = castTPS->getArena();
95  if ( !m_arena ) {
96  fatal() << "Cannot find valid TBB task_arena" << endmsg;
97  return StatusCode::FAILURE;
98  }
99 
100  // Activate the scheduler in another thread.
101  info() << "Activating scheduler in a separate thread" << endmsg;
102  std::binary_semaphore fiber_manager_initalized{ 0 };
103  m_thread = std::thread( [this, &fiber_manager_initalized]() {
104  // Initialize FiberManager
105  this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
106  fiber_manager_initalized.release();
107  this->activate();
108  } );
109  // Wait for initialization to complete
110  fiber_manager_initalized.acquire();
111 
112  while ( m_isActive != ACTIVE ) {
113  if ( m_isActive == FAILURE ) {
114  fatal() << "Terminating initialization" << endmsg;
115  return StatusCode::FAILURE;
116  } else {
117  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
118  sleep( 1 );
119  }
120  }
121 
122  if ( m_enableCondSvc ) {
123  // Get hold of the CondSvc
124  m_condSvc = serviceLocator()->service( "CondSvc" );
125  if ( !m_condSvc.isValid() ) {
126  warning() << "No CondSvc found, or not enabled. "
127  << "Will not manage CondAlgorithms" << endmsg;
128  m_enableCondSvc = false;
129  }
130  }
131 
132  // Get the algo resource pool
133  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
134  if ( !m_algResourcePool.isValid() ) {
135  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
136  return StatusCode::FAILURE;
137  }
138 
139  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
140  if ( !m_algExecStateSvc.isValid() ) {
141  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
142  return StatusCode::FAILURE;
143  }
144 
145  // Get Whiteboard
147  if ( !m_whiteboard.isValid() ) {
148  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
149  return StatusCode::FAILURE;
150  }
151 
152  // Set the MaxEventsInFlight parameters from the number of WB stores
153  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
154 
155  // Set the number of free slots
157 
158  // Get the list of algorithms
159  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
160  const unsigned int algsNumber = algos.size();
161  if ( algsNumber != 0 ) {
162  info() << "Found " << algsNumber << " algorithms" << endmsg;
163  } else {
164  error() << "No algorithms found" << endmsg;
165  return StatusCode::FAILURE;
166  }
167 
168  /* Dependencies
169  1) Look for handles in algo, if none
170  2) Assume none are required
171  */
172 
173  DataObjIDColl globalInp, globalOutp;
174 
175  // figure out all outputs
176  std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
177  for ( IAlgorithm* ialgoPtr : algos ) {
178  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
179  if ( !algoPtr ) {
180  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
181  return StatusCode::FAILURE;
182  }
183 
184  DataObjIDColl algoOutputs;
185  for ( auto id : algoPtr->outputDataObjs() ) {
186  globalOutp.insert( id );
187  algoOutputs.insert( id );
188  }
189  algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
190  }
191 
192  std::ostringstream ostdd;
193  ostdd << "Data Dependencies for Algorithms:";
194 
195  std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
196  for ( IAlgorithm* ialgoPtr : algos ) {
197  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
198  if ( nullptr == algoPtr ) {
199  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
200  << ": this will result in a crash." << endmsg;
201  return StatusCode::FAILURE;
202  }
203 
204  DataObjIDColl i1, i2;
205  DHHVisitor avis( i1, i2 );
206  algoPtr->acceptDHVisitor( &avis );
207 
208  ostdd << "\n " << algoPtr->name();
209 
210  auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
211  auto owners = avis.owners_names_of( id );
212  if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
213  };
214 
215  DataObjIDColl algoDependencies;
216  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
217  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
218  DataObjID id = *idp;
219  ostdd << "\n o INPUT " << id;
220  write_owners( id );
221  algoDependencies.insert( id );
222  globalInp.insert( id );
223  }
224  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
225  ostdd << "\n o OUTPUT " << *id;
226  write_owners( *id );
227  if ( id->key().find( ":" ) != std::string::npos ) {
228  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
229  << endmsg;
230  m_showDataDeps = true;
231  }
232  }
233  } else {
234  ostdd << "\n none";
235  }
236  algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
237  }
238 
239  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
240 
241  // If requested, dump a graph of the data dependencies in a .dot or .md file
242  if ( not m_dataDepsGraphFile.empty() ) {
243  if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
244  return StatusCode::FAILURE;
245  }
246  }
247 
248  // Check if we have unmet global input dependencies, and, optionally, heal them
249  // WARNING: this step must be done BEFORE the Precedence Service is initialized
250  DataObjIDColl unmetDepInp, unusedOutp;
251  if ( m_checkDeps || m_checkOutput ) {
252  std::set<std::string> requiredInputKeys;
253  for ( auto o : globalInp ) {
254  // track aliases
255  // (assuming there should be no items with different class and same key corresponding to different objects)
256  requiredInputKeys.insert( o.key() );
257  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
258  }
259  if ( m_checkOutput ) {
260  for ( auto o : globalOutp ) {
261  if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
262  // check ignores
263  bool ignored{};
264  for ( const std::string& algoName : m_checkOutputIgnoreList ) {
265  auto it = algosOutputDependenciesMap.find( algoName );
266  if ( it != algosOutputDependenciesMap.end() ) {
267  if ( it->second.find( o ) != it->second.end() ) {
268  ignored = true;
269  break;
270  }
271  }
272  }
273  if ( !ignored ) { unusedOutp.insert( o ); }
274  }
275  }
276  }
277  }
278 
279  if ( m_checkDeps ) {
280  if ( unmetDepInp.size() > 0 ) {
281 
282  auto printUnmet = [&]( auto msg ) {
283  for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
284  msg << " o " << *o << " required by Algorithm: " << endmsg;
285 
286  for ( const auto& p : algosInputDependenciesMap )
287  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
288  }
289  };
290 
291  if ( !m_useDataLoader.empty() ) {
292 
293  // Find the DataLoader Alg
294  IAlgorithm* dataLoaderAlg( nullptr );
295  for ( IAlgorithm* algo : algos )
296  if ( m_useDataLoader == algo->name() ) {
297  dataLoaderAlg = algo;
298  break;
299  }
300 
301  if ( dataLoaderAlg == nullptr ) {
302  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
303  << "\" found, and unmet INPUT dependencies "
304  << "detected:" << endmsg;
305  printUnmet( fatal() );
306  return StatusCode::FAILURE;
307  }
308 
309  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
310  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
311  printUnmet( info() );
312 
313  // Set the property Load of DataLoader Alg
314  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
315  if ( !dataAlg ) {
316  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
317  << endmsg;
318  return StatusCode::FAILURE;
319  }
320 
321  for ( auto& id : unmetDepInp ) {
322  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
323  << dataLoaderAlg->name() << endmsg;
325  }
326 
327  } else {
328  fatal() << "Auto DataLoading not requested, "
329  << "and the following unmet INPUT dependencies were found:" << endmsg;
330  printUnmet( fatal() );
331  return StatusCode::FAILURE;
332  }
333 
334  } else {
335  info() << "No unmet INPUT data dependencies were found" << endmsg;
336  }
337  }
338 
339  if ( m_checkOutput ) {
340  if ( unusedOutp.size() > 0 ) {
341 
342  auto printUnusedOutp = [&]( auto msg ) {
343  for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
344  msg << " o " << *o << " produced by Algorithm: " << endmsg;
345 
346  for ( const auto& p : algosOutputDependenciesMap )
347  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
348  }
349  };
350 
351  fatal() << "The following unused OUTPUT items were found:" << endmsg;
352  printUnusedOutp( fatal() );
353  return StatusCode::FAILURE;
354  } else {
355  info() << "No unused OUTPUT items were found" << endmsg;
356  }
357  }
358 
359  // Get the precedence service
360  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
361  if ( !m_precSvc.isValid() ) {
362  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
363  return StatusCode::FAILURE;
364  }
365  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
366  if ( !precSvc ) {
367  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
368  return StatusCode::FAILURE;
369  }
370 
371  // Fill the containers to convert algo names to index
372  m_algname_vect.resize( algsNumber );
373  for ( IAlgorithm* algo : algos ) {
374  const std::string& name = algo->name();
375  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
378  }
379 
380  // Shortcut for the message service
381  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
382  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
383 
385  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
386  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
387  m_eventSlots.back().complete = true;
388  }
389 
390  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
391 
392  // Clearly inform about the level of concurrency
393  info() << "Concurrency level information:" << endmsg;
394  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
395  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
396  info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
397 
398  // Inform about task scheduling prescriptions
399  info() << "Task scheduling settings:" << endmsg;
400  info() << " o Avalanche generation mode: "
401  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
402  info() << " o Preemptive scheduling of CPU-blocking tasks: "
404  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
405  : "disabled" )
406  << endmsg;
407  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
408 
409  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
410 
411  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
412 
413  // Simulate execution flow
414  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
415 
416  return sc;
417 }
418 //---------------------------------------------------------------------------
419 
424 
426  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
427 
428  sc = deactivate();
429  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
430 
431  debug() << "Deleting FiberManager" << endmsg;
433 
434  info() << "Joining Scheduler thread" << endmsg;
435  m_thread.join();
436 
437  // Final error check after thread pool termination
438  if ( m_isActive == FAILURE ) {
439  error() << "problems in scheduler thread" << endmsg;
440  return StatusCode::FAILURE;
441  }
442 
443  return sc;
444 }
445 //---------------------------------------------------------------------------
446 
458 
459  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
460 
461  if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
462  error() << "problems initializing ThreadPoolSvc" << endmsg;
464  return;
465  }
466 
467  // Wait for actions pushed into the queue by finishing tasks.
468  action thisAction;
470 
471  m_isActive = ACTIVE;
472 
473  // Continue to wait if the scheduler is running or there is something to do
474  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
475  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
476  m_actionsQueue.pop( thisAction );
477  sc = thisAction();
478  ON_VERBOSE {
479  if ( sc.isFailure() )
480  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
481  else
482  verbose() << "Action succeeded." << endmsg;
483  }
484  else sc.ignore();
485 
486  // If all queued actions have been processed, update the slot states
487  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
488  sc = iterate();
489  ON_VERBOSE {
490  if ( sc.isFailure() )
491  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
492  else
493  verbose() << "Iteration succeeded." << endmsg;
494  }
495  else sc.ignore();
496  }
497  }
498 
499  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
500  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
501  error() << "Problems terminating thread pool" << endmsg;
503  }
504 }
505 
506 //---------------------------------------------------------------------------
507 
515 
516  if ( m_isActive == ACTIVE ) {
517 
518  // Set the number of slots available to an error code
519  m_freeSlots.store( 0 );
520 
521  // Empty queue
522  action thisAction;
523  while ( m_actionsQueue.try_pop( thisAction ) ) {};
524 
525  // This would be the last action
526  m_actionsQueue.push( [this]() -> StatusCode {
527  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
529  return StatusCode::SUCCESS;
530  } );
531  }
532 
533  return StatusCode::SUCCESS;
534 }
535 
536 //---------------------------------------------------------------------------
537 
538 // EventSlot management
546 
547  if ( !eventContext ) {
548  fatal() << "Event context is nullptr" << endmsg;
549  return StatusCode::FAILURE;
550  }
551 
552  if ( m_freeSlots.load() == 0 ) {
553  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
554  return StatusCode::FAILURE;
555  }
556 
557  // no problem as push new event is only called from one thread (event loop manager)
558  --m_freeSlots;
559 
560  auto action = [this, eventContext]() -> StatusCode {
561  // Event processing slot forced to be the same as the wb slot
562  const unsigned int thisSlotNum = eventContext->slot();
563  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
564  if ( !thisSlot.complete ) {
565  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
566  return StatusCode::FAILURE;
567  }
568 
569  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
570  thisSlot.reset( eventContext );
571 
572  // Result status code:
574 
575  // promote to CR and DR the initial set of algorithms
576  Cause cs = { Cause::source::Root, "RootDecisionHub" };
577  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
578  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
579  result = StatusCode::FAILURE;
580  }
581 
582  if ( this->iterate().isFailure() ) {
583  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
584  result = StatusCode::FAILURE;
585  }
586 
587  return result;
588  }; // end of lambda
589 
590  // Kick off scheduling
591  ON_VERBOSE {
592  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
593  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
594  }
595 
596  m_actionsQueue.push( action );
597 
598  return StatusCode::SUCCESS;
599 }
600 
601 //---------------------------------------------------------------------------
602 
604  StatusCode sc;
605  for ( auto context : eventContexts ) {
606  sc = pushNewEvent( context );
607  if ( sc != StatusCode::SUCCESS ) return sc;
608  }
609  return sc;
610 }
611 
612 //---------------------------------------------------------------------------
613 
614 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
615 
616 //---------------------------------------------------------------------------
617 
619 
620 //---------------------------------------------------------------------------
625 
626  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
627  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
628  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
629  // << " active: " << m_isActive << endmsg;
630  return StatusCode::FAILURE;
631  } else {
632  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
633  // << " active: " << m_isActive << endmsg;
634  m_finishedEvents.pop( eventContext );
635  ++m_freeSlots;
636  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
637  return StatusCode::SUCCESS;
638  }
639 }
640 
641 //---------------------------------------------------------------------------
646 
647  if ( m_finishedEvents.try_pop( eventContext ) ) {
648  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
649  << endmsg;
650  ++m_freeSlots;
651  return StatusCode::SUCCESS;
652  }
653  return StatusCode::FAILURE;
654 }
655 
656 //--------------------------------------------------------------------------
657 
666 
667  StatusCode global_sc( StatusCode::SUCCESS );
668 
669  // Retry algorithms
670  const size_t retries = m_retryQueue.size();
671  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
672  TaskSpec retryTS = std::move( m_retryQueue.front() );
673  m_retryQueue.pop();
674  global_sc = schedule( std::move( retryTS ) );
675  }
676 
677  // Loop over all slots
678  OccupancySnapshot nextSnap;
679  auto now = std::chrono::system_clock::now();
680  for ( EventSlot& thisSlot : m_eventSlots ) {
681 
682  // Ignore slots without a valid context (relevant when populating scheduler for first time)
683  if ( !thisSlot.eventContext ) continue;
684 
685  int iSlot = thisSlot.eventContext->slot();
686 
687  // Cache the states of the algorithms to improve readability and performance
688  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
689 
690  StatusCode partial_sc = StatusCode::FAILURE;
691 
692  // Make an occupancy snapshot
695 
696  // Initialise snapshot
697  if ( nextSnap.states.empty() ) {
698  nextSnap.time = now;
699  nextSnap.states.resize( m_eventSlots.size() );
700  }
701 
702  // Store alg states
703  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
704  slotStateTotals.resize( AState::MAXVALUE );
705  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
706  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
707  }
708 
709  // Add subslot alg states
710  for ( auto& subslot : thisSlot.allSubSlots ) {
711  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
712  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
713  }
714  }
715  }
716 
717  // Perform DR->SCHEDULED
718  const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
719  for ( uint algIndex : drAlgs ) {
720  const std::string& algName{ index2algname( algIndex ) };
721  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
722  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
723 
724  partial_sc =
725  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
726 
727  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
728  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
729  << " on processing slot " << iSlot << endmsg;
730  }
731 
732  // Check for algorithms ready in sub-slots
733  for ( auto& subslot : thisSlot.allSubSlots ) {
734  const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
735  for ( uint algIndex : drAlgsSubSlot ) {
736  const std::string& algName{ index2algname( algIndex ) };
737  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
738  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
739  partial_sc =
740  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
741  }
742  }
743 
744  if ( m_dumpIntraEventDynamics ) {
746  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
747  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
748  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
751  std::ofstream myfile;
752  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
753  myfile << s.str();
754  myfile.close();
755  }
756 
757  // Not complete because this would mean that the slot is already free!
758  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
759  !thisSlot.algsStates.containsAny(
760  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
761  !subSlotAlgsInStates( thisSlot,
762  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
763  !thisSlot.complete ) {
764 
765  thisSlot.complete = true;
766  // if the event did not fail, add it to the finished events
767  // otherwise it is taken care of in the error handling
768  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
769  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
770  << thisSlot.eventContext->slot() << ")." << endmsg;
771  m_finishedEvents.push( thisSlot.eventContext.release() );
772  }
773 
774  // now let's return the fully evaluated result of the control flow
775  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
776 
777  thisSlot.eventContext.reset( nullptr );
778 
779  } else if ( isStalled( thisSlot ) ) {
780  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
781  eventFailed( thisSlot.eventContext.get() ); // can't release yet
782  }
783  partial_sc.ignore();
784  } // end loop on slots
785 
786  // Process snapshot
787  if ( !nextSnap.states.empty() ) {
788  m_lastSnapshot = nextSnap.time;
789  m_snapshotCallback( std::move( nextSnap ) );
790  }
791 
792  ON_VERBOSE verbose() << "Iteration done." << endmsg;
793  m_needsUpdate.store( false );
794  return global_sc;
795 }
796 
797 //---------------------------------------------------------------------------
798 // Update algorithm state and, optionally, revise states of other downstream algorithms
799 StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
800  StatusCode sc;
801  auto slotIndex = contextPtr->slot();
802  EventSlot& slot = m_eventSlots[slotIndex];
803  Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
804 
805  if ( contextPtr->usesSubSlot() ) {
806  // Sub-slot
807  auto subSlotIndex = contextPtr->subSlot();
808  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
809 
810  sc = subSlot.algsStates.set( iAlgo, state );
811 
812  if ( sc.isSuccess() ) {
813  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
814  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
815  // Revise states of algorithms downstream the precedence graph
816  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
817  }
818  } else {
819  // Event level (standard behaviour)
820  sc = slot.algsStates.set( iAlgo, state );
821 
822  if ( sc.isSuccess() ) {
823  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
824  << ", event:" << contextPtr->evt() << "]" << endmsg;
825  // Revise states of algorithms downstream the precedence graph
826  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
827  }
828  }
829  return sc;
830 }
831 
832 //---------------------------------------------------------------------------
833 
840 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
841 
842  if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
843  !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
844 
845  error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
846 
847  return true;
848  }
849  return false;
850 }
851 
852 //---------------------------------------------------------------------------
853 
859  const uint slotIdx = eventContext->slot();
860 
861  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
862 
863  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
864 
865  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
866  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
867 
868  // Push into the finished events queue the failed context
869  m_eventSlots[slotIdx].complete = true;
870  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
871 }
872 
873 //---------------------------------------------------------------------------
874 
880 
881  // To have just one big message
882  std::ostringstream outputMS;
883 
884  outputMS << "Dumping scheduler state\n"
885  << "=========================================================================================\n"
886  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
887  << "=========================================================================================\n\n";
888 
889  //===========================================================================
890 
891  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
892  << "------------------\n\n";
893 
894  // Figure if TimelineSvc is available (used below to detect threads IDs)
895  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
896  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
897  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
898  } else {
899 
900  // Figure optimal printout layout
901  size_t indt( 0 );
902  for ( auto& slot : m_eventSlots ) {
903 
904  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
905  for ( uint algIndex : schedAlgs ) {
906  if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
907  }
908  }
909 
910  // Figure the last running schedule across all slots
911  for ( auto& slot : m_eventSlots ) {
912 
913  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
914  for ( uint algIndex : schedAlgs ) {
915 
916  const std::string& algoName{ index2algname( algIndex ) };
917 
918  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
919  << slot.eventContext->slot();
920 
921  // Try to get POSIX threads IDs the currently running tasks are scheduled to
922  if ( timelineSvc.isValid() ) {
923  TimelineEvent te{};
924  te.algorithm = algoName;
925  te.slot = slot.eventContext->slot();
926  te.event = slot.eventContext->evt();
927 
928  if ( timelineSvc->getTimelineEvent( te ) )
929  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
930  else
931  outputMS << " thread.id: [unknown]"; // this means a task has just
932  // been signed off as SCHEDULED,
933  // but has not been assigned to a thread yet
934  // (i.e., not running yet)
935  }
936  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
937  }
938  }
939  }
940 
941  //===========================================================================
942 
943  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
944  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
945 
946  int slotCount = -1;
947  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
948  subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
949  : false;
950 
951  for ( auto& slot : m_eventSlots ) {
952  ++slotCount;
953  if ( slot.complete ) continue;
954 
955  outputMS << "[ slot: "
956  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
957  << ", event: "
958  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
959 
960  if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
961  outputMS << " ]:\n\n";
962 
963  if ( 0 > iSlot || iSlot == slotCount ) {
964 
965  // If an alg has thrown an error then it's not a failure of the CF/DF graph
966  if ( wasAlgError ) {
967  outputMS << "ERROR alg(s):";
968  int errorCount = 0;
969  const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
970  for ( uint algIndex : errorAlgs ) {
971  outputMS << " " << index2algname( algIndex );
972  ++errorCount;
973  }
974  if ( errorCount == 0 ) outputMS << " in subslot(s)";
975  outputMS << "\n\n";
976  } else {
977  // Snapshot of the Control Flow and FSM states
978  outputMS << m_precSvc->printState( slot ) << "\n";
979  }
980 
981  // Mention sub slots (this is expensive if the number of sub-slots is high)
982  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
983  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
984  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
985  for ( auto& ss : slot.allSubSlots ) {
986  outputMS << "[ slot: " << slotID << ", sub-slot: "
987  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
988  << ", entry: " << ss.entryPoint << ", event: "
989  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
990  << " ]:\n\n";
991  if ( wasAlgError ) {
992  outputMS << "ERROR alg(s):";
993  const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
994  for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
995  outputMS << "\n\n";
996  } else {
997  // Snapshot of the Control Flow and FSM states in sub slot
998  outputMS << m_precSvc->printState( ss ) << "\n";
999  }
1000  }
1001  }
1002  }
1003  }
1004 
1005  //===========================================================================
1006 
1007  if ( 0 <= iSlot && !wasAlgError ) {
1008  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1009  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1010  }
1011 
1012  outputMS << "\n=========================================================================================\n"
1013  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1014  << "=========================================================================================\n\n";
1015 
1016  info() << outputMS.str() << endmsg;
1017 }
1018 
1019 //---------------------------------------------------------------------------
1020 
1022 
1023  // Check if a free Algorithm instance is available
1024  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1025 
1026  // If an instance is available, proceed to scheduling
1027  StatusCode sc;
1028  if ( getAlgSC.isSuccess() ) {
1029 
1030  // Decide how to schedule the task and schedule it
1031  if ( -100 != m_threadPoolSize ) {
1032 
1033  // Cache values before moving the TaskSpec further
1034  unsigned int algIndex{ ts.algIndex };
1035  std::string_view algName( ts.algName );
1036  unsigned int algRank{ ts.algRank };
1037  bool asynchronous{ ts.asynchronous };
1038  int slotIndex{ ts.slotIndex };
1039  EventContext* contextPtr{ ts.contextPtr };
1040 
1041  if ( asynchronous ) {
1042  // Add to asynchronous scheduled queue
1044 
1045  // Schedule task
1046  m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1047  }
1048 
1049  if ( !asynchronous ) {
1050  // Add the algorithm to the scheduled queue
1051  m_scheduledQueue.push( std::move( ts ) );
1052 
1053  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1054  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1055  ++m_algosInFlight;
1056  }
1057  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1058 
1059  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1060  << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1061  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1063  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1064  : "" )
1065  << endmsg;
1066 
1067  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1068  // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1069  ++m_algosInFlight;
1070  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1071  AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1072  --m_algosInFlight;
1073  }
1074  } else { // if no Algorithm instance available, retry later
1075 
1076  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1077  // Add the algorithm to the retry queue
1078  m_retryQueue.push( std::move( ts ) );
1079  }
1080 
1082 
1083  return sc;
1084 }
1085 
1086 //---------------------------------------------------------------------------
1087 
1092 
1093  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1094 
1095  --m_algosInFlight;
1096 
1097  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1098  AState state = algstate.execStatus().isSuccess()
1099  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1100  : AState::ERROR;
1101 
1102  // Update algorithm state and revise the downstream states
1103  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1104 
1105  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1106  << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1107  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1109  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1110  : "" )
1111  << endmsg;
1112 
1113  // Prompt a call to updateStates
1114  m_needsUpdate.store( true );
1115  return sc;
1116 }
1117 
1118 //---------------------------------------------------------------------------
1119 
1120 // Method to inform the scheduler about event views
1121 
1123  std::unique_ptr<EventContext> viewContext ) {
1124  // Prevent view nesting
1125  if ( sourceContext->usesSubSlot() ) {
1126  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1127  return StatusCode::FAILURE;
1128  }
1129 
1130  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1131 
1132  // It's not possible to create an std::functional from a move-capturing lambda
1133  // So, we have to release the unique pointer
1134  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1135  &nodeName]() -> StatusCode {
1136  // Attach the sub-slot to the top-level slot
1137  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1138 
1139  if ( viewContextPtr ) {
1140  // Re-create the unique pointer
1141  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1142  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1143  return StatusCode::SUCCESS;
1144  } else {
1145  // Disable the view node if there are no views
1146  topSlot.disableSubSlots( nodeName );
1147  return StatusCode::SUCCESS;
1148  }
1149  };
1150 
1151  m_actionsQueue.push( std::move( action ) );
1152 
1153  return StatusCode::SUCCESS;
1154 }
1155 
1156 //---------------------------------------------------------------------------
1157 
1158 // Sample occupancy at fixed interval (ms)
1159 // Negative value to deactivate, 0 to snapshot every change
1160 // Each sample, apply the callback function to the result
1161 
1162 void AvalancheSchedulerSvc::recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) {
1163 
1164  auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1165  if ( samplePeriod < 0 ) {
1167  } else {
1170  }
1171  return StatusCode::SUCCESS;
1172  };
1173 
1174  m_actionsQueue.push( std::move( action ) );
1175 }
1176 
1178  const std::map<std::string, DataObjIDColl>& outDeps ) const {
1179  // Both maps should have the same algorithm entries
1180  assert( inDeps.size() == outDeps.size() );
1181 
1182  // Check file extension
1183  enum class FileType : short { UNKNOWN, DOT, MD };
1184  std::regex fileExtensionRegexDot( ".dot$" );
1185  std::regex fileExtensionRegexMd( ".md$" );
1186 
1187  std::string fileName = m_dataDepsGraphFile.value();
1188  FileType fileExtension = FileType::UNKNOWN;
1189  if ( std::regex_search( m_dataDepsGraphFile.value(), fileExtensionRegexDot ) ) {
1190  fileExtension = FileType::DOT;
1191  } else if ( std::regex_search( m_dataDepsGraphFile.value(), fileExtensionRegexMd ) ) {
1192  fileExtension = FileType::MD;
1193  } else {
1194  fileExtension = FileType::DOT;
1195  fileName = fileName + ".dot";
1196  }
1197  info() << "Dumping data dependencies graph to file: " << fileName << endmsg;
1198 
1199  std::string startGraph = "";
1200  std::string stopGraph = "";
1201  // define functions
1202  std::function<std::string( const std::string&, const std::string& )> defineAlg;
1203  std::function<std::string( const DataObjID& )> defineObj;
1204  std::function<std::string( const DataObjID&, const std::string& )> defineInput;
1205  std::function<std::string( const std::string&, const DataObjID& )> defineOutput;
1206 
1207  if ( fileExtension == FileType::DOT ) {
1208  // .dot file
1209  startGraph = "digraph datadeps {\nrankdir=\"LR\";\n\n";
1210  stopGraph = "\n}\n";
1211 
1212  defineAlg = []( const std::string& alg, const std::string& idx ) -> std::string {
1213  return "Alg_" + idx + " [label=\"" + alg + "\";shape=box];\n";
1214  };
1215 
1216  defineObj = []( const DataObjID& obj ) -> std::string {
1217  return "obj_" + std::to_string( obj.hash() ) + " [label=\"" + obj.key() + "\"];\n";
1218  };
1219 
1220  defineInput = []( const DataObjID& obj, const std::string& alg ) -> std::string {
1221  return "obj_" + std::to_string( obj.hash() ) + " -> " + "Alg_" + alg + ";\n";
1222  };
1223 
1224  defineOutput = []( const std::string& alg, const DataObjID& obj ) -> std::string {
1225  return "Alg_" + alg + " -> " + "obj_" + std::to_string( obj.hash() ) + ";\n";
1226  };
1227  } else {
1228  // .md file
1229  startGraph = "```mermaid\ngraph LR;\n\n";
1230  stopGraph = "\n```\n";
1231 
1232  defineAlg = []( const std::string& alg, const std::string& idx ) -> std::string {
1233  return "Alg_" + idx + "{{" + alg + "}}\n";
1234  };
1235 
1236  defineObj = []( const DataObjID& obj ) -> std::string {
1237  return "obj_" + std::to_string( obj.hash() ) + ">" + obj.key() + "]\n";
1238  };
1239 
1240  defineInput = []( const DataObjID& obj, const std::string& alg ) -> std::string {
1241  return "obj_" + std::to_string( obj.hash() ) + " --> " + "Alg_" + alg + "\n";
1242  };
1243 
1244  defineOutput = []( const std::string& alg, const DataObjID& obj ) -> std::string {
1245  return "Alg_" + alg + " --> " + "obj_" + std::to_string( obj.hash() ) + "\n";
1246  };
1247  } // fileExtension
1248 
1249  std::ofstream dataDepthGraphFile( m_dataDepsGraphFile.value(), std::ofstream::out );
1250  dataDepthGraphFile << startGraph;
1251 
1252  // define algs and objects
1253  std::set<std::size_t> definedObjects;
1254 
1255  // Regex for selection of algs and objects
1256  std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1257  std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1258 
1259  // inDeps and outDeps should have the same entries
1260  std::size_t algoIndex = 0ul;
1261  for ( const auto& [name, ideps] : inDeps ) {
1262  if ( not std::regex_search( name, algNameRegex ) ) continue;
1263  dataDepthGraphFile << defineAlg( name, std::to_string( algoIndex ) );
1264 
1265  // inputs
1266  for ( const auto& dep : ideps ) {
1267  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1268 
1269  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1270  if ( inserted ) dataDepthGraphFile << defineObj( dep );
1271 
1272  dataDepthGraphFile << defineInput( dep, std::to_string( algoIndex ) );
1273  } // loop on ideps
1274 
1275  const auto& odeps = outDeps.at( name );
1276  for ( const auto& dep : odeps ) {
1277  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1278 
1279  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1280  if ( inserted ) dataDepthGraphFile << defineObj( dep );
1281 
1282  dataDepthGraphFile << defineOutput( std::to_string( algoIndex ), dep );
1283  } // loop on odeps
1284 
1285  ++algoIndex;
1286  } // loop on inDeps
1287 
1288  // end the file
1289  dataDepthGraphFile << stopGraph;
1290  dataDepthGraphFile.close();
1291 
1292  return StatusCode::SUCCESS;
1293 }
IOTest.evt
evt
Definition: IOTest.py:107
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:267
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
std::vector::resize
T resize(T... args)
Gaudi::Details::PropertyBase::name
const std::string name() const
property name
Definition: PropertyBase.h:39
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:207
std::string
STL class.
AvalancheSchedulerSvc::TaskSpec
Struct to hold entries in the alg queues.
Definition: AvalancheSchedulerSvc.h:322
AvalancheSchedulerSvc::finalize
StatusCode finalize() override
Finalise.
Definition: AvalancheSchedulerSvc.cpp:423
std::list< IAlgorithm * >
Gaudi::Algorithm::acceptDHVisitor
void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:186
Read.app
app
Definition: Read.py:36
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:526
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:185
std::unordered_set< DataObjID, DataObjID_Hasher >
std::vector::reserve
T reserve(T... args)
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:46
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:163
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:659
gaudirun.s
string s
Definition: gaudirun.py:346
std::vector
STL class.
std::unordered_set::find
T find(T... args)
std::unordered_set::size
T size(T... args)
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:665
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
AlgsExecutionStates
Definition: AlgsExecutionStates.h:38
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:86
std::chrono::duration
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
Definition: AvalancheSchedulerSvc.h:359
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:167
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:75
std::stringstream
STL class.
std::unique_ptr::get
T get(T... args)
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:72
std::unique_ptr::release
T release(T... args)
IAlgorithm::type
virtual const std::string & type() const =0
The type of the algorithm.
ConcurrencyFlags.h
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Definition: AvalancheSchedulerSvc.h:228
std::vector::back
T back(T... args)
std::function
std::any_of
T any_of(T... args)
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:358
AvalancheSchedulerSvc::m_fiberManager
std::unique_ptr< FiberManager > m_fiberManager
Definition: AvalancheSchedulerSvc.h:370
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:1021
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:218
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:363
DHHVisitor
Definition: DataHandleHolderVisitor.h:21
GaudiPartProp.tests.id
id
Definition: tests.py:111
std::sort
T sort(T... args)
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:210
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:514
std::unique_ptr::reset
T reset(T... args)
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
ThreadPoolSvc.h
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:270
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:40
concurrency::AlgorithmNode::getAlgoIndex
unsigned int getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:520
AvalancheSchedulerSvc::m_numOffloadThreads
Gaudi::Property< int > m_numOffloadThreads
Definition: AvalancheSchedulerSvc.h:192
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:369
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:279
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
DataObjID::fullKey
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
Definition: DataObjID.cpp:99
std::hex
T hex(T... args)
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:163
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:282
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:858
ManySmallAlgs.alg
alg
Definition: ManySmallAlgs.py:81
TimelineEvent
Definition: ITimelineSvc.h:23
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:170
DHHVisitor::owners_names_of
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
Definition: DataHandleHolderVisitor.cpp:82
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:72
AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Definition: AvalancheSchedulerSvc.h:233
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:372
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:180
GaudiUtils::operator<<
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Definition: SerializeSTL.h:90
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
AlgTask.h
ITimelineSvc
Definition: ITimelineSvc.h:37
std::vector::at
T at(T... args)
IAlgorithm
Definition: IAlgorithm.h:38
std::atomic::load
T load(T... args)
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::ofstream
STL class.
AvalancheSchedulerSvc::m_maxParallelismExtra
Gaudi::Property< int > m_maxParallelismExtra
Definition: AvalancheSchedulerSvc.h:175
compareRootHistos.ts
ts
Definition: compareRootHistos.py:488
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:189
FiberManager::schedule
void schedule(F &&func)
Schedule work to run on the asynchronous pool.
Definition: FiberManager.h:54
Io::UNKNOWN
@ UNKNOWN
Definition: IFileMgr.h:156
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:179
AvalancheSchedulerSvc
Definition: AvalancheSchedulerSvc.h:114
AvalancheSchedulerSvc::m_checkOutput
Gaudi::Property< bool > m_checkOutput
Definition: AvalancheSchedulerSvc.h:198
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
DataHandleHolderVisitor.h
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:237
std::to_string
T to_string(T... args)
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
AlgExecState::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:42
std::ofstream::close
T close(T... args)
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:182
AvalancheSchedulerSvc::recordOccupancy
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
Definition: AvalancheSchedulerSvc.cpp:1162
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:258
Algorithm.h
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:160
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:163
std::ofstream::open
T open(T... args)
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:28
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:285
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::map
STL class.
AvalancheSchedulerSvc::tryPopFinishedEvent
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Definition: AvalancheSchedulerSvc.cpp:645
AvalancheSchedulerSvc::scheduleEventView
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Definition: AvalancheSchedulerSvc.cpp:1122
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:314
AvalancheSchedulerSvc::freeSlots
unsigned int freeSlots() override
Get free slots number.
Definition: AvalancheSchedulerSvc.cpp:614
std::regex
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:212
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:373
DataObjID
Definition: DataObjID.h:47
std::regex_search
T regex_search(T... args)
AvalancheSchedulerSvc::dumpState
void dumpState() override
Dump scheduler state for all slots.
Definition: AvalancheSchedulerSvc.cpp:618
AvalancheSchedulerSvc::initialize
StatusCode initialize() override
Initialise.
Definition: AvalancheSchedulerSvc.cpp:77
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:75
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
std::chrono::duration::min
T min(T... args)
std::ostringstream
STL class.
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:45
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
ThreadLocalContext.h
std::vector::emplace_back
T emplace_back(T... args)
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:651
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:187
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:360
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:25
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:84
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:166
std::vector::begin
T begin(T... args)
Gaudi::Decays::valid
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
Definition: Nodes.h:36
std
STL namespace.
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
std::unordered_set::insert
T insert(T... args)
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:368
FiberManager.h
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
AvalancheSchedulerSvc::m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphFile
Definition: AvalancheSchedulerSvc.h:223
EventContext
Definition: EventContext.h:34
AlgsExecutionStates::State
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
Definition: AlgsExecutionStates.h:42
TimelineEvent::algorithm
std::string algorithm
Definition: ITimelineSvc.h:31
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:415
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:799
AlgExecState::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:40
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:457
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:319
std::unordered_set::empty
T empty(T... args)
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:255
Properties.v
v
Definition: Properties.py:122
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:196
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:840
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:116
std::ostringstream::str
T str(T... args)
std::atomic::store
T store(T... args)
std::size_t
SerializeSTL.h
DataObjID::hash
std::size_t hash() const
Definition: DataObjID.h:69
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:249
std::vector::end
T end(T... args)
AvalancheSchedulerSvc::pushNewEvents
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Definition: AvalancheSchedulerSvc.cpp:603
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:215
IAlgorithm.h
AlgExecState
Definition: IAlgExecStateSvc.h:36
AvalancheSchedulerSvc::m_checkOutputIgnoreList
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Definition: AvalancheSchedulerSvc.h:200
std::setw
T setw(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::max
T max(T... args)
AvalancheSchedulerSvc::signoff
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
Definition: AvalancheSchedulerSvc.cpp:1091
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:89
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:273
compareRootHistos.state
state
Definition: compareRootHistos.py:496
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:288
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:168
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:545
AvalancheSchedulerSvc::dumpGraphFile
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Definition: AvalancheSchedulerSvc.cpp:1177
AvalancheSchedulerSvc::popFinishedEvent
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Definition: AvalancheSchedulerSvc.cpp:624
AlgsExecutionStates::algsInState
const boost::container::flat_set< int > algsInState(State state) const
Definition: AlgsExecutionStates.h:83
std::unique_ptr< EventContext >
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
Cause
Definition: PrecedenceRulesGraph.h:396
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:264
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:246
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:276
std::set< std::string >
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:261
GPUAvalancheSchedulerSimpleTest.threads
threads
Definition: GPUAvalancheSchedulerSimpleTest.py:57
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:879
IDataManagerSvc.h
std::thread::join
T join(T... args)
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
AvalancheSchedulerSvc.h
PrepareBase.out
out
Definition: PrepareBase.py:20
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
std::initializer_list
gaudirun.callback
callback
Definition: gaudirun.py:202
std::chrono::system_clock::now
T now(T... args)