Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  master (d98a2936)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "AvalancheSchedulerSvc.h"
12 #include "AlgTask.h"
13 #include "FiberManager.h"
14 #include "GraphDumper.h"
15 #include "ThreadPoolSvc.h"
16 
17 // Framework includes
18 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
21 #include <GaudiKernel/IAlgorithm.h>
25 
26 // C++
27 #include <algorithm>
28 #include <fstream>
29 #include <map>
30 #include <queue>
31 #include <regex>
32 #include <semaphore>
33 #include <sstream>
34 #include <string_view>
35 #include <thread>
36 #include <unordered_set>
37 
38 // External libs
39 #include <boost/algorithm/string.hpp>
40 #include <boost/thread.hpp>
41 #include <boost/tokenizer.hpp>
42 
43 // Instantiation of a static factory class used by clients to create instances of this service
45 
46 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
48 
49 namespace {
50  struct DataObjIDSorter {
51  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
52  };
53 
54  // Sort a DataObjIDColl in a well-defined, reproducible manner.
55  // Used for making debugging dumps.
56  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
57  std::vector<const DataObjID*> v;
58  v.reserve( coll.size() );
59  for ( const DataObjID& id : coll ) v.push_back( &id );
60  std::sort( v.begin(), v.end(), DataObjIDSorter() );
61  return v;
62  }
63 
64  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
65  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
66  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
67  }
68 } // namespace
69 
70 //---------------------------------------------------------------------------
71 
79 
80  // Initialise mother class (read properties, ...)
82  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83 
84  // Get hold of the TBBSvc. This should initialize the thread pool
85  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86  if ( !m_threadPoolSvc.isValid() ) {
87  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
88  return StatusCode::FAILURE;
89  }
90  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
91  if ( !castTPS ) {
92  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
93  return StatusCode::FAILURE;
94  }
95  m_arena = castTPS->getArena();
96  if ( !m_arena ) {
97  fatal() << "Cannot find valid TBB task_arena" << endmsg;
98  return StatusCode::FAILURE;
99  }
100 
101  // Activate the scheduler in another thread.
102  info() << "Activating scheduler in a separate thread" << endmsg;
103  std::binary_semaphore fiber_manager_initalized{ 0 };
104  m_thread = std::thread( [this, &fiber_manager_initalized]() {
105  // Initialize FiberManager
106  this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
107  fiber_manager_initalized.release();
108  this->activate();
109  } );
110  // Wait for initialization to complete
111  fiber_manager_initalized.acquire();
112 
113  while ( m_isActive != ACTIVE ) {
114  if ( m_isActive == FAILURE ) {
115  fatal() << "Terminating initialization" << endmsg;
116  return StatusCode::FAILURE;
117  } else {
118  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
119  sleep( 1 );
120  }
121  }
122 
123  if ( m_enableCondSvc ) {
124  // Get hold of the CondSvc
125  m_condSvc = serviceLocator()->service( "CondSvc" );
126  if ( !m_condSvc.isValid() ) {
127  warning() << "No CondSvc found, or not enabled. "
128  << "Will not manage CondAlgorithms" << endmsg;
129  m_enableCondSvc = false;
130  }
131  }
132 
133  // Get the algo resource pool
134  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
135  if ( !m_algResourcePool.isValid() ) {
136  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
137  return StatusCode::FAILURE;
138  }
139 
140  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
141  if ( !m_algExecStateSvc.isValid() ) {
142  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
143  return StatusCode::FAILURE;
144  }
145 
146  // Get Whiteboard
148  if ( !m_whiteboard.isValid() ) {
149  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
150  return StatusCode::FAILURE;
151  }
152 
153  // Set the MaxEventsInFlight parameters from the number of WB stores
154  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
155 
156  // Set the number of free slots
158 
159  // Get the list of algorithms
160  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
161  const unsigned int algsNumber = algos.size();
162  if ( algsNumber != 0 ) {
163  info() << "Found " << algsNumber << " algorithms" << endmsg;
164  } else {
165  error() << "No algorithms found" << endmsg;
166  return StatusCode::FAILURE;
167  }
168 
169  /* Dependencies
170  1) Look for handles in algo, if none
171  2) Assume none are required
172  */
173 
174  DataObjIDColl globalInp, globalOutp;
175 
176  // figure out all outputs
177  std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
178  for ( IAlgorithm* ialgoPtr : algos ) {
179  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180  if ( !algoPtr ) {
181  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
182  return StatusCode::FAILURE;
183  }
184 
185  DataObjIDColl algoOutputs;
186  for ( auto id : algoPtr->outputDataObjs() ) {
187  globalOutp.insert( id );
188  algoOutputs.insert( id );
189  }
190  algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
191  }
192 
193  std::ostringstream ostdd;
194  ostdd << "Data Dependencies for Algorithms:";
195 
196  std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
197  for ( IAlgorithm* ialgoPtr : algos ) {
198  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
199  if ( nullptr == algoPtr ) {
200  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201  << ": this will result in a crash." << endmsg;
202  return StatusCode::FAILURE;
203  }
204 
205  DataObjIDColl i1, i2;
206  DHHVisitor avis( i1, i2 );
207  algoPtr->acceptDHVisitor( &avis );
208 
209  ostdd << "\n " << algoPtr->name();
210 
211  auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
212  auto owners = avis.owners_names_of( id );
213  if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
214  };
215 
216  DataObjIDColl algoDependencies;
217  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
218  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
219  DataObjID id = *idp;
220  ostdd << "\n o INPUT " << id;
221  write_owners( id );
222  algoDependencies.insert( id );
223  globalInp.insert( id );
224  }
225  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
226  ostdd << "\n o OUTPUT " << *id;
227  write_owners( *id );
228  if ( id->key().find( ":" ) != std::string::npos ) {
229  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
230  << endmsg;
231  m_showDataDeps = true;
232  }
233  }
234  } else {
235  ostdd << "\n none";
236  }
237  algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
238  }
239 
240  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
241 
242  // If requested, dump a graph of the data dependencies in a .dot or .md file
243  if ( not m_dataDepsGraphFile.empty() ) {
244  if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
245  return StatusCode::FAILURE;
246  }
247  }
248 
249  // Check if we have unmet global input dependencies, and, optionally, heal them
250  // WARNING: this step must be done BEFORE the Precedence Service is initialized
251  DataObjIDColl unmetDepInp, unusedOutp;
252  if ( m_checkDeps || m_checkOutput ) {
253  std::set<std::string> requiredInputKeys;
254  for ( auto o : globalInp ) {
255  // track aliases
256  // (assuming there should be no items with different class and same key corresponding to different objects)
257  requiredInputKeys.insert( o.key() );
258  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
259  }
260  if ( m_checkOutput ) {
261  for ( auto o : globalOutp ) {
262  if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
263  // check ignores
264  bool ignored{};
265  for ( const std::string& algoName : m_checkOutputIgnoreList ) {
266  auto it = algosOutputDependenciesMap.find( algoName );
267  if ( it != algosOutputDependenciesMap.end() ) {
268  if ( it->second.find( o ) != it->second.end() ) {
269  ignored = true;
270  break;
271  }
272  }
273  }
274  if ( !ignored ) { unusedOutp.insert( o ); }
275  }
276  }
277  }
278  }
279 
280  if ( m_checkDeps ) {
281  if ( unmetDepInp.size() > 0 ) {
282 
283  auto printUnmet = [&]( auto msg ) {
284  for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285  msg << " o " << *o << " required by Algorithm: " << endmsg;
286 
287  for ( const auto& p : algosInputDependenciesMap )
288  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
289  }
290  };
291 
292  if ( !m_useDataLoader.empty() ) {
293 
294  // Find the DataLoader Alg
295  IAlgorithm* dataLoaderAlg( nullptr );
296  for ( IAlgorithm* algo : algos )
297  if ( m_useDataLoader == algo->name() ) {
298  dataLoaderAlg = algo;
299  break;
300  }
301 
302  if ( dataLoaderAlg == nullptr ) {
303  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
304  << "\" found, and unmet INPUT dependencies "
305  << "detected:" << endmsg;
306  printUnmet( fatal() );
307  return StatusCode::FAILURE;
308  }
309 
310  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
311  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
312  printUnmet( info() );
313 
314  // Set the property Load of DataLoader Alg
315  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
316  if ( !dataAlg ) {
317  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
318  << endmsg;
319  return StatusCode::FAILURE;
320  }
321 
322  for ( auto& id : unmetDepInp ) {
323  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
324  << dataLoaderAlg->name() << endmsg;
326  }
327 
328  } else {
329  fatal() << "Auto DataLoading not requested, "
330  << "and the following unmet INPUT dependencies were found:" << endmsg;
331  printUnmet( fatal() );
332  return StatusCode::FAILURE;
333  }
334 
335  } else {
336  info() << "No unmet INPUT data dependencies were found" << endmsg;
337  }
338  }
339 
340  if ( m_checkOutput ) {
341  if ( unusedOutp.size() > 0 ) {
342 
343  auto printUnusedOutp = [&]( auto msg ) {
344  for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345  msg << " o " << *o << " produced by Algorithm: " << endmsg;
346 
347  for ( const auto& p : algosOutputDependenciesMap )
348  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
349  }
350  };
351 
352  fatal() << "The following unused OUTPUT items were found:" << endmsg;
353  printUnusedOutp( fatal() );
354  return StatusCode::FAILURE;
355  } else {
356  info() << "No unused OUTPUT items were found" << endmsg;
357  }
358  }
359 
360  // Get the precedence service
361  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
362  if ( !m_precSvc.isValid() ) {
363  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
364  return StatusCode::FAILURE;
365  }
366  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
367  if ( !precSvc ) {
368  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
369  return StatusCode::FAILURE;
370  }
371 
372  // Fill the containers to convert algo names to index
373  m_algname_vect.resize( algsNumber );
374  for ( IAlgorithm* algo : algos ) {
375  const std::string& name = algo->name();
376  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
378  m_algname_vect.at( index ) = name;
379  }
380 
381  // Shortcut for the message service
382  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
383  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
384 
386  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
387  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
388  m_eventSlots.back().complete = true;
389  }
390 
391  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
392 
393  // Clearly inform about the level of concurrency
394  info() << "Concurrency level information:" << endmsg;
395  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
396  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
397  info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
398 
399  // Inform about task scheduling prescriptions
400  info() << "Task scheduling settings:" << endmsg;
401  info() << " o Avalanche generation mode: "
402  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
403  info() << " o Preemptive scheduling of CPU-blocking tasks: "
405  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
406  : "disabled" )
407  << endmsg;
408  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
409 
410  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
411 
412  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
413 
414  // Simulate execution flow
415  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
416 
417  return sc;
418 }
419 //---------------------------------------------------------------------------
420 
425 
427  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
428 
429  sc = deactivate();
430  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
431 
432  debug() << "Deleting FiberManager" << endmsg;
433  m_fiberManager.reset();
434 
435  info() << "Joining Scheduler thread" << endmsg;
436  m_thread.join();
437 
438  // Final error check after thread pool termination
439  if ( m_isActive == FAILURE ) {
440  error() << "problems in scheduler thread" << endmsg;
441  return StatusCode::FAILURE;
442  }
443 
444  return sc;
445 }
446 //---------------------------------------------------------------------------
447 
459 
460  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
461 
462  if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
463  error() << "problems initializing ThreadPoolSvc" << endmsg;
465  return;
466  }
467 
468  // Wait for actions pushed into the queue by finishing tasks.
469  action thisAction;
471 
472  m_isActive = ACTIVE;
473 
474  // Continue to wait if the scheduler is running or there is something to do
475  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
476  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
477  m_actionsQueue.pop( thisAction );
478  sc = thisAction();
479  ON_VERBOSE {
480  if ( sc.isFailure() )
481  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
482  else
483  verbose() << "Action succeeded." << endmsg;
484  }
485  else sc.ignore();
486 
487  // If all queued actions have been processed, update the slot states
488  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
489  sc = iterate();
490  ON_VERBOSE {
491  if ( sc.isFailure() )
492  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
493  else
494  verbose() << "Iteration succeeded." << endmsg;
495  }
496  else sc.ignore();
497  }
498  }
499 
500  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
501  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
502  error() << "Problems terminating thread pool" << endmsg;
504  }
505 }
506 
507 //---------------------------------------------------------------------------
508 
516 
517  if ( m_isActive == ACTIVE ) {
518 
519  // Set the number of slots available to an error code
520  m_freeSlots.store( 0 );
521 
522  // Empty queue
523  action thisAction;
524  while ( m_actionsQueue.try_pop( thisAction ) ) {};
525 
526  // This would be the last action
527  m_actionsQueue.push( [this]() -> StatusCode {
528  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
530  return StatusCode::SUCCESS;
531  } );
532  }
533 
534  return StatusCode::SUCCESS;
535 }
536 
537 //---------------------------------------------------------------------------
538 
539 // EventSlot management
547 
548  if ( !eventContext ) {
549  fatal() << "Event context is nullptr" << endmsg;
550  return StatusCode::FAILURE;
551  }
552 
553  if ( m_freeSlots.load() == 0 ) {
554  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
555  return StatusCode::FAILURE;
556  }
557 
558  // no problem as push new event is only called from one thread (event loop manager)
559  --m_freeSlots;
560 
561  auto action = [this, eventContext]() -> StatusCode {
562  // Event processing slot forced to be the same as the wb slot
563  const unsigned int thisSlotNum = eventContext->slot();
564  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
565  if ( !thisSlot.complete ) {
566  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
567  return StatusCode::FAILURE;
568  }
569 
570  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
571  thisSlot.reset( eventContext );
572 
573  // Result status code:
575 
576  // promote to CR and DR the initial set of algorithms
577  Cause cs = { Cause::source::Root, "RootDecisionHub" };
578  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
580  result = StatusCode::FAILURE;
581  }
582 
583  if ( this->iterate().isFailure() ) {
584  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
585  result = StatusCode::FAILURE;
586  }
587 
588  return result;
589  }; // end of lambda
590 
591  // Kick off scheduling
592  ON_VERBOSE {
593  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
594  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
595  }
596 
597  m_actionsQueue.push( action );
598 
599  return StatusCode::SUCCESS;
600 }
601 
602 //---------------------------------------------------------------------------
603 
604 StatusCode AvalancheSchedulerSvc::pushNewEvents( std::vector<EventContext*>& eventContexts ) {
605  StatusCode sc;
606  for ( auto context : eventContexts ) {
607  sc = pushNewEvent( context );
608  if ( sc != StatusCode::SUCCESS ) return sc;
609  }
610  return sc;
611 }
612 
613 //---------------------------------------------------------------------------
614 
615 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
616 
617 //---------------------------------------------------------------------------
618 
620 
621 //---------------------------------------------------------------------------
626 
627  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
628  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
629  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
630  // << " active: " << m_isActive << endmsg;
631  return StatusCode::FAILURE;
632  } else {
633  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
634  // << " active: " << m_isActive << endmsg;
635  m_finishedEvents.pop( eventContext );
636  ++m_freeSlots;
637  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
638  return StatusCode::SUCCESS;
639  }
640 }
641 
642 //---------------------------------------------------------------------------
647 
648  if ( m_finishedEvents.try_pop( eventContext ) ) {
649  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
650  << endmsg;
651  ++m_freeSlots;
652  return StatusCode::SUCCESS;
653  }
654  return StatusCode::FAILURE;
655 }
656 
657 //--------------------------------------------------------------------------
658 
667 
668  StatusCode global_sc( StatusCode::SUCCESS );
669 
670  // Retry algorithms
671  const size_t retries = m_retryQueue.size();
672  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
673  TaskSpec retryTS = std::move( m_retryQueue.front() );
674  m_retryQueue.pop();
675  global_sc = schedule( std::move( retryTS ) );
676  }
677 
678  // Loop over all slots
679  OccupancySnapshot nextSnap;
680  auto now = std::chrono::system_clock::now();
681  for ( EventSlot& thisSlot : m_eventSlots ) {
682 
683  // Ignore slots without a valid context (relevant when populating scheduler for first time)
684  if ( !thisSlot.eventContext ) continue;
685 
686  int iSlot = thisSlot.eventContext->slot();
687 
688  // Cache the states of the algorithms to improve readability and performance
689  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
690 
691  StatusCode partial_sc = StatusCode::FAILURE;
692 
693  // Make an occupancy snapshot
694  if ( m_snapshotInterval != std::chrono::duration<int64_t, std::milli>::min() &&
696 
697  // Initialise snapshot
698  if ( nextSnap.states.empty() ) {
699  nextSnap.time = now;
700  nextSnap.states.resize( m_eventSlots.size() );
701  }
702 
703  // Store alg states
704  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705  slotStateTotals.resize( AState::MAXVALUE );
706  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
708  }
709 
710  // Add subslot alg states
711  for ( auto& subslot : thisSlot.allSubSlots ) {
712  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
714  }
715  }
716  }
717 
718  // Perform DR->SCHEDULED
719  const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
720  for ( uint algIndex : drAlgs ) {
721  const std::string& algName{ index2algname( algIndex ) };
722  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
723  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
724 
725  partial_sc =
726  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
727 
728  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
729  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
730  << " on processing slot " << iSlot << endmsg;
731  }
732 
733  // Check for algorithms ready in sub-slots
734  for ( auto& subslot : thisSlot.allSubSlots ) {
735  const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736  for ( uint algIndex : drAlgsSubSlot ) {
737  const std::string& algName{ index2algname( algIndex ) };
738  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
739  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
740  partial_sc =
741  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
742  }
743  }
744 
745  if ( m_dumpIntraEventDynamics ) {
746  std::stringstream s;
747  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
748  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
749  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
750  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
751  : std::to_string( std::thread::hardware_concurrency() );
752  std::ofstream myfile;
753  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
754  myfile << s.str();
755  myfile.close();
756  }
757 
758  // Not complete because this would mean that the slot is already free!
759  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
760  !thisSlot.algsStates.containsAny(
761  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762  !subSlotAlgsInStates( thisSlot,
763  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764  !thisSlot.complete ) {
765 
766  thisSlot.complete = true;
767  // if the event did not fail, add it to the finished events
768  // otherwise it is taken care of in the error handling
769  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
770  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
771  << thisSlot.eventContext->slot() << ")." << endmsg;
772  m_finishedEvents.push( thisSlot.eventContext.release() );
773  }
774 
775  // now let's return the fully evaluated result of the control flow
776  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
777 
778  thisSlot.eventContext.reset( nullptr );
779 
780  } else if ( isStalled( thisSlot ) ) {
781  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
782  eventFailed( thisSlot.eventContext.get() ); // can't release yet
783  }
784  partial_sc.ignore();
785  } // end loop on slots
786 
787  // Process snapshot
788  if ( !nextSnap.states.empty() ) {
789  m_lastSnapshot = nextSnap.time;
790  m_snapshotCallback( std::move( nextSnap ) );
791  }
792 
793  ON_VERBOSE verbose() << "Iteration done." << endmsg;
794  m_needsUpdate.store( false );
795  return global_sc;
796 }
797 
798 //---------------------------------------------------------------------------
799 // Update algorithm state and, optionally, revise states of other downstream algorithms
800 StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
801  StatusCode sc;
802  auto slotIndex = contextPtr->slot();
803  EventSlot& slot = m_eventSlots[slotIndex];
804  Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
805 
806  if ( contextPtr->usesSubSlot() ) {
807  // Sub-slot
808  auto subSlotIndex = contextPtr->subSlot();
809  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
810 
811  sc = subSlot.algsStates.set( iAlgo, state );
812 
813  if ( sc.isSuccess() ) {
814  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
815  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
816  // Revise states of algorithms downstream the precedence graph
817  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
818  }
819  } else {
820  // Event level (standard behaviour)
821  sc = slot.algsStates.set( iAlgo, state );
822 
823  if ( sc.isSuccess() ) {
824  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
825  << ", event:" << contextPtr->evt() << "]" << endmsg;
826  // Revise states of algorithms downstream the precedence graph
827  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
828  }
829  }
830  return sc;
831 }
832 
833 //---------------------------------------------------------------------------
834 
841 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
842 
843  if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844  !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845 
846  error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
847 
848  return true;
849  }
850  return false;
851 }
852 
853 //---------------------------------------------------------------------------
854 
860  const uint slotIdx = eventContext->slot();
861 
862  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
863 
864  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
865 
866  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
867  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
868 
869  // Push into the finished events queue the failed context
870  m_eventSlots[slotIdx].complete = true;
871  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
872 }
873 
874 //---------------------------------------------------------------------------
875 
881 
882  // To have just one big message
883  std::ostringstream outputMS;
884 
885  outputMS << "Dumping scheduler state\n"
886  << "=========================================================================================\n"
887  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888  << "=========================================================================================\n\n";
889 
890  //===========================================================================
891 
892  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893  << "------------------\n\n";
894 
895  // Figure if TimelineSvc is available (used below to detect threads IDs)
896  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
897  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
899  } else {
900 
901  // Figure optimal printout layout
902  size_t indt( 0 );
903  for ( auto& slot : m_eventSlots ) {
904 
905  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906  for ( uint algIndex : schedAlgs ) {
907  if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
908  }
909  }
910 
911  // Figure the last running schedule across all slots
912  for ( auto& slot : m_eventSlots ) {
913 
914  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915  for ( uint algIndex : schedAlgs ) {
916 
917  const std::string& algoName{ index2algname( algIndex ) };
918 
919  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
920  << slot.eventContext->slot();
921 
922  // Try to get POSIX threads IDs the currently running tasks are scheduled to
923  if ( timelineSvc.isValid() ) {
924  TimelineEvent te{};
925  te.algorithm = algoName;
926  te.slot = slot.eventContext->slot();
927  te.event = slot.eventContext->evt();
928 
929  if ( timelineSvc->getTimelineEvent( te ) )
930  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
931  else
932  outputMS << " thread.id: [unknown]"; // this means a task has just
933  // been signed off as SCHEDULED,
934  // but has not been assigned to a thread yet
935  // (i.e., not running yet)
936  }
937  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
938  }
939  }
940  }
941 
942  //===========================================================================
943 
944  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
945  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
946 
947  int slotCount = -1;
948  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949  subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
950  : false;
951 
952  for ( auto& slot : m_eventSlots ) {
953  ++slotCount;
954  if ( slot.complete ) continue;
955 
956  outputMS << "[ slot: "
957  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
958  << ", event: "
959  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
960 
961  if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
962  outputMS << " ]:\n\n";
963 
964  if ( 0 > iSlot || iSlot == slotCount ) {
965 
966  // If an alg has thrown an error then it's not a failure of the CF/DF graph
967  if ( wasAlgError ) {
968  outputMS << "ERROR alg(s):";
969  int errorCount = 0;
970  const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971  for ( uint algIndex : errorAlgs ) {
972  outputMS << " " << index2algname( algIndex );
973  ++errorCount;
974  }
975  if ( errorCount == 0 ) outputMS << " in subslot(s)";
976  outputMS << "\n\n";
977  } else {
978  // Snapshot of the Control Flow and FSM states
979  outputMS << m_precSvc->printState( slot ) << "\n";
980  }
981 
982  // Mention sub slots (this is expensive if the number of sub-slots is high)
983  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
985  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
986  for ( auto& ss : slot.allSubSlots ) {
987  outputMS << "[ slot: " << slotID << ", sub-slot: "
988  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
989  << ", entry: " << ss.entryPoint << ", event: "
990  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
991  << " ]:\n\n";
992  if ( wasAlgError ) {
993  outputMS << "ERROR alg(s):";
994  const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995  for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
996  outputMS << "\n\n";
997  } else {
998  // Snapshot of the Control Flow and FSM states in sub slot
999  outputMS << m_precSvc->printState( ss ) << "\n";
1000  }
1001  }
1002  }
1003  }
1004  }
1005 
1006  //===========================================================================
1007 
1008  if ( 0 <= iSlot && !wasAlgError ) {
1009  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1011  }
1012 
1013  outputMS << "\n=========================================================================================\n"
1014  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015  << "=========================================================================================\n\n";
1016 
1017  info() << outputMS.str() << endmsg;
1018 }
1019 
1020 //---------------------------------------------------------------------------
1021 
1023 
1024  // Check if a free Algorithm instance is available
1025  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1026 
1027  // If an instance is available, proceed to scheduling
1028  StatusCode sc;
1029  if ( getAlgSC.isSuccess() ) {
1030 
1031  // Decide how to schedule the task and schedule it
1032  if ( -100 != m_threadPoolSize ) {
1033 
1034  // Cache values before moving the TaskSpec further
1035  unsigned int algIndex{ ts.algIndex };
1036  std::string_view algName( ts.algName );
1037  unsigned int algRank{ ts.algRank };
1038  bool asynchronous{ ts.asynchronous };
1039  int slotIndex{ ts.slotIndex };
1040  EventContext* contextPtr{ ts.contextPtr };
1041 
1042  if ( asynchronous ) {
1043  // Add to asynchronous scheduled queue
1044  m_scheduledAsynchronousQueue.push( std::move( ts ) );
1045 
1046  // Schedule task
1047  m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1048  }
1049 
1050  if ( !asynchronous ) {
1051  // Add the algorithm to the scheduled queue
1052  m_scheduledQueue.push( std::move( ts ) );
1053 
1054  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1055  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1056  ++m_algosInFlight;
1057  }
1058  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1059 
1060  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1061  << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1062  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1064  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1065  : "" )
1066  << endmsg;
1067 
1068  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1069  // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1070  ++m_algosInFlight;
1071  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1072  AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1073  --m_algosInFlight;
1074  }
1075  } else { // if no Algorithm instance available, retry later
1076 
1077  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1078  // Add the algorithm to the retry queue
1079  m_retryQueue.push( std::move( ts ) );
1080  }
1081 
1083 
1084  return sc;
1085 }
1086 
1087 //---------------------------------------------------------------------------
1088 
1093 
1094  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1095 
1096  --m_algosInFlight;
1097 
1098  const AlgExecStateRef algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1099  AState state = algstate.execStatus().isSuccess()
1100  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1101  : AState::ERROR;
1102 
1103  // Update algorithm state and revise the downstream states
1104  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1105 
1106  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1107  << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1108  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1110  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1111  : "" )
1112  << endmsg;
1113 
1114  // Prompt a call to updateStates
1115  m_needsUpdate.store( true );
1116  return sc;
1117 }
1118 
1119 //---------------------------------------------------------------------------
1120 
1121 // Method to inform the scheduler about event views
1122 
1123 StatusCode AvalancheSchedulerSvc::scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
1124  std::unique_ptr<EventContext> viewContext ) {
1125  // Prevent view nesting
1126  if ( sourceContext->usesSubSlot() ) {
1127  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1128  return StatusCode::FAILURE;
1129  }
1130 
1131  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1132 
1133  // It's not possible to create an std::functional from a move-capturing lambda
1134  // So, we have to release the unique pointer
1135  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1136  &nodeName]() -> StatusCode {
1137  // Attach the sub-slot to the top-level slot
1138  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139 
1140  if ( viewContextPtr ) {
1141  // Re-create the unique pointer
1142  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1144  return StatusCode::SUCCESS;
1145  } else {
1146  // Disable the view node if there are no views
1147  topSlot.disableSubSlots( nodeName );
1148  return StatusCode::SUCCESS;
1149  }
1150  };
1151 
1152  m_actionsQueue.push( std::move( action ) );
1153 
1154  return StatusCode::SUCCESS;
1155 }
1156 
1157 //---------------------------------------------------------------------------
1158 
1159 // Sample occupancy at fixed interval (ms)
1160 // Negative value to deactivate, 0 to snapshot every change
1161 // Each sample, apply the callback function to the result
1162 
1163 void AvalancheSchedulerSvc::recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) {
1164 
1165  auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1166  if ( samplePeriod < 0 ) {
1167  this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
1168  } else {
1169  this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1170  m_snapshotCallback = std::move( callback );
1171  }
1172  return StatusCode::SUCCESS;
1173  };
1174 
1175  m_actionsQueue.push( std::move( action ) );
1176 }
1177 
1178 StatusCode AvalancheSchedulerSvc::dumpGraphFile( const std::map<std::string, DataObjIDColl>& inDeps,
1179  const std::map<std::string, DataObjIDColl>& outDeps ) const {
1180  // Both maps should have the same algorithm entries
1181  assert( inDeps.size() == outDeps.size() );
1182 
1184  info() << "Dumping data dependencies graph to file: " << g.fileName() << endmsg;
1185 
1186  // define algs and objects
1187  std::set<std::size_t> definedObjects;
1188 
1189  // Regex for selection of algs and objects
1190  std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1191  std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1192 
1193  // inDeps and outDeps should have the same entries
1194  std::size_t algoIndex = 0ul;
1195  for ( const auto& [algName, ideps] : inDeps ) {
1196  if ( not std::regex_search( algName, algNameRegex ) ) continue;
1197  std::string algIndex = "Alg_" + std::to_string( algoIndex );
1198  g.addNode( algName, algIndex );
1199 
1200  // inputs
1201  for ( const auto& dep : ideps ) {
1202  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1203 
1204  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205  std::string objIndex = "obj_" + std::to_string( dep.hash() );
1206  if ( inserted ) g.addNode( dep.key(), objIndex );
1207 
1208  g.addEdge( dep.key(), objIndex, algName, algIndex );
1209  } // loop on ideps
1210 
1211  const auto& odeps = outDeps.at( algName );
1212  for ( const auto& dep : odeps ) {
1213  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1214 
1215  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216  std::string objIndex = "obj_" + std::to_string( dep.hash() );
1217  if ( inserted ) g.addNode( dep.key(), objIndex );
1218 
1219  g.addEdge( algName, algIndex, dep.key(), objIndex );
1220  } // loop on odeps
1221 
1222  ++algoIndex;
1223  } // loop on inDeps
1224 
1225  return StatusCode::SUCCESS;
1226 }
IOTest.evt
evt
Definition: IOTest.py:107
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:82
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:266
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:30
MSG::hex
MsgStream & hex(MsgStream &log)
Definition: MsgStream.h:258
GraphDumper.h
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AlgExecStateRef
wrapper on an Algorithm state.
Definition: IAlgExecStateSvc.h:32
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:206
AvalancheSchedulerSvc::TaskSpec
Struct to hold entries in the alg queues.
Definition: AvalancheSchedulerSvc.h:321
AvalancheSchedulerSvc::finalize
StatusCode finalize() override
Finalise.
Definition: AvalancheSchedulerSvc.cpp:424
Gaudi::Algorithm::acceptDHVisitor
void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:183
Read.app
app
Definition: Read.py:36
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:524
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
AlgExecStateRef::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:148
GaudiPartProp.decorators.std
std
Definition: decorators.py:32
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:184
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:47
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:162
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:660
gaudirun.s
string s
Definition: gaudirun.py:346
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:666
EventSlot
Class representing an event slot.
Definition: EventSlot.h:23
AlgsExecutionStates
Definition: AlgsExecutionStates.h:37
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:85
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
Definition: AvalancheSchedulerSvc.h:358
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:166
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:74
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:75
IAlgorithm::type
virtual const std::string & type() const =0
The type of the algorithm.
ConcurrencyFlags.h
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Definition: AvalancheSchedulerSvc.h:227
Gaudi::Hive::Graph
utilities to dump graphs in different formats
Definition: GraphDumper.h:30
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:357
AvalancheSchedulerSvc::m_fiberManager
std::unique_ptr< FiberManager > m_fiberManager
Definition: AvalancheSchedulerSvc.h:369
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:1022
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:217
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:362
DHHVisitor
Definition: DataHandleHolderVisitor.h:20
GaudiPartProp.tests.id
id
Definition: tests.py:111
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:209
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:515
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:147
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:223
Gaudi::Property::name
std::string name
Definition: Property.h:60
ThreadPoolSvc.h
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:269
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:39
concurrency::AlgorithmNode::getAlgoIndex
unsigned int getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:521
AvalancheSchedulerSvc::m_numOffloadThreads
Gaudi::Property< int > m_numOffloadThreads
Definition: AvalancheSchedulerSvc.h:191
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:368
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:278
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:88
DataObjID::fullKey
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
Definition: DataObjID.cpp:103
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:162
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:281
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:859
TimelineEvent
Definition: TimelineEvent.h:16
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:169
DHHVisitor::owners_names_of
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
Definition: DataHandleHolderVisitor.cpp:82
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:60
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:75
AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Definition: AvalancheSchedulerSvc.h:232
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:371
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:69
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:179
GaudiUtils::operator<<
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Definition: SerializeSTL.h:101
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:333
StatusCode
Definition: StatusCode.h:64
AlgTask.h
ITimelineSvc
Definition: ITimelineSvc.h:20
IAlgorithm
Definition: IAlgorithm.h:36
gaudirun.g
dictionary g
Definition: gaudirun.py:582
AvalancheSchedulerSvc::m_maxParallelismExtra
Gaudi::Property< int > m_maxParallelismExtra
Definition: AvalancheSchedulerSvc.h:174
MSG::dec
MsgStream & dec(MsgStream &log)
Definition: MsgStream.h:254
compareRootHistos.ts
ts
Definition: compareRootHistos.py:488
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:188
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:87
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:178
AvalancheSchedulerSvc
Definition: AvalancheSchedulerSvc.h:113
AvalancheSchedulerSvc::m_checkOutput
Gaudi::Property< bool > m_checkOutput
Definition: AvalancheSchedulerSvc.h:197
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:48
DataHandleHolderVisitor.h
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:229
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:77
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:181
AvalancheSchedulerSvc::recordOccupancy
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
Definition: AvalancheSchedulerSvc.cpp:1163
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:257
Algorithm.h
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:99
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:159
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:162
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:28
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:284
DataObjIDColl
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition: DataObjID.h:122
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:198
AvalancheSchedulerSvc::tryPopFinishedEvent
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Definition: AvalancheSchedulerSvc.cpp:646
AvalancheSchedulerSvc::scheduleEventView
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Definition: AvalancheSchedulerSvc.cpp:1123
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:313
AvalancheSchedulerSvc::freeSlots
unsigned int freeSlots() override
Get free slots number.
Definition: AvalancheSchedulerSvc.cpp:615
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:211
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:372
DataObjID
Definition: DataObjID.h:47
AvalancheSchedulerSvc::dumpState
void dumpState() override
Dump scheduler state for all slots.
Definition: AvalancheSchedulerSvc.cpp:619
AvalancheSchedulerSvc::initialize
StatusCode initialize() override
Initialise.
Definition: AvalancheSchedulerSvc.cpp:78
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:74
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:46
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
ThreadLocalContext.h
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:652
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:186
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:359
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:22
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:99
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:165
Gaudi::Decays::valid
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
Definition: Nodes.h:36
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:45
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:367
FiberManager.h
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:22
AvalancheSchedulerSvc::m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphFile
Definition: AvalancheSchedulerSvc.h:222
EventContext
Definition: EventContext.h:34
AlgsExecutionStates::State
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
Definition: AlgsExecutionStates.h:41
TimelineEvent::algorithm
std::string algorithm
Definition: TimelineEvent.h:24
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:406
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:800
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:458
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:318
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:254
Properties.v
v
Definition: Properties.py:122
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:195
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:841
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:115
SerializeSTL.h
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:82
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:248
AvalancheSchedulerSvc::pushNewEvents
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Definition: AvalancheSchedulerSvc.cpp:604
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:214
IAlgorithm.h
AvalancheSchedulerSvc::m_checkOutputIgnoreList
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Definition: AvalancheSchedulerSvc.h:199
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:100
AvalancheSchedulerSvc::signoff
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
Definition: AvalancheSchedulerSvc.cpp:1092
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:88
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:272
compareRootHistos.state
state
Definition: compareRootHistos.py:496
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:287
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:167
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:546
AvalancheSchedulerSvc::action
std::function< StatusCode()> action
Definition: AvalancheSchedulerSvc.h:160
AvalancheSchedulerSvc::dumpGraphFile
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Definition: AvalancheSchedulerSvc.cpp:1178
AvalancheSchedulerSvc::popFinishedEvent
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Definition: AvalancheSchedulerSvc.cpp:625
AlgsExecutionStates::algsInState
const boost::container::flat_set< int > algsInState(State state) const
Definition: AlgsExecutionStates.h:82
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:84
Cause
Definition: PrecedenceRulesGraph.h:397
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:263
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:245
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:275
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:260
GPUAvalancheSchedulerSimpleTest.threads
threads
Definition: GPUAvalancheSchedulerSimpleTest.py:57
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:880
IDataManagerSvc.h
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:336
AvalancheSchedulerSvc.h
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:37
AlgExecStateRef::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:155
gaudirun.callback
callback
Definition: gaudirun.py:202