The Gaudi Framework  v36r1 (3e2fb5a8)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "AvalancheSchedulerSvc.h"
12 #include "AlgTask.h"
13 #include "ThreadPoolSvc.h"
14 
15 // Framework includes
18 #include "GaudiKernel/IAlgorithm.h"
21 #include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
22 
23 // C++
24 #include <algorithm>
25 #include <map>
26 #include <queue>
27 #include <sstream>
28 #include <string_view>
29 #include <thread>
30 #include <unordered_set>
31 
32 // External libs
33 #include "boost/algorithm/string.hpp"
34 #include "boost/thread.hpp"
35 #include "boost/tokenizer.hpp"
36 // DP waiting for the TBB service
37 #include "tbb/tbb_stddef.h"
38 
39 // Instantiation of a static factory class used by clients to create instances of this service
41 
42 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
43 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
44 
45 namespace {
46  struct DataObjIDSorter {
47  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
48  };
49 
50  // Sort a DataObjIDColl in a well-defined, reproducible manner.
51  // Used for making debugging dumps.
52  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
54  v.reserve( coll.size() );
55  for ( const DataObjID& id : coll ) v.push_back( &id );
56  std::sort( v.begin(), v.end(), DataObjIDSorter() );
57  return v;
58  }
59 
60  bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
61  return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
62  [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
63  }
64 } // namespace
65 
66 //---------------------------------------------------------------------------
67 
75 
76  // Initialise mother class (read properties, ...)
78  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
79 
80  // Get hold of the TBBSvc. This should initialize the thread pool
81  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
82  if ( !m_threadPoolSvc.isValid() ) {
83  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
84  return StatusCode::FAILURE;
85  }
86  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
87  if ( !castTPS ) {
88  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
89  return StatusCode::FAILURE;
90  }
91  m_arena = castTPS->getArena();
92  if ( !m_arena ) {
93  fatal() << "Cannot find valid TBB task_arena" << endmsg;
94  return StatusCode::FAILURE;
95  }
96 
97  // Activate the scheduler in another thread.
98  info() << "Activating scheduler in a separate thread" << endmsg;
99  m_thread = std::thread( [this]() { this->activate(); } );
100 
101  while ( m_isActive != ACTIVE ) {
102  if ( m_isActive == FAILURE ) {
103  fatal() << "Terminating initialization" << endmsg;
104  return StatusCode::FAILURE;
105  } else {
106  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
107  sleep( 1 );
108  }
109  }
110 
111  if ( m_enableCondSvc ) {
112  // Get hold of the CondSvc
113  m_condSvc = serviceLocator()->service( "CondSvc" );
114  if ( !m_condSvc.isValid() ) {
115  warning() << "No CondSvc found, or not enabled. "
116  << "Will not manage CondAlgorithms" << endmsg;
117  m_enableCondSvc = false;
118  }
119  }
120 
121  // Get the algo resource pool
122  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
123  if ( !m_algResourcePool.isValid() ) {
124  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
125  return StatusCode::FAILURE;
126  }
127 
128  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
129  if ( !m_algExecStateSvc.isValid() ) {
130  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
131  return StatusCode::FAILURE;
132  }
133 
134  // Get Whiteboard
136  if ( !m_whiteboard.isValid() ) {
137  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
138  return StatusCode::FAILURE;
139  }
140 
141  // Set the MaxEventsInFlight parameters from the number of WB stores
142  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
143 
144  // Set the number of free slots
146 
147  // Get the list of algorithms
148  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
149  const unsigned int algsNumber = algos.size();
150  if ( algsNumber != 0 ) {
151  info() << "Found " << algsNumber << " algorithms" << endmsg;
152  } else {
153  error() << "No algorithms found" << endmsg;
154  return StatusCode::FAILURE;
155  }
156 
157  /* Dependencies
158  1) Look for handles in algo, if none
159  2) Assume none are required
160  */
161 
162  DataObjIDColl globalInp, globalOutp;
163 
164  // figure out all outputs
165  for ( IAlgorithm* ialgoPtr : algos ) {
166  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
167  if ( !algoPtr ) {
168  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
169  return StatusCode::FAILURE;
170  }
171  for ( auto id : algoPtr->outputDataObjs() ) globalOutp.insert( id );
172  }
173 
174  std::ostringstream ostdd;
175  ostdd << "Data Dependencies for Algorithms:";
176 
177  std::map<std::string, DataObjIDColl> algosDependenciesMap;
178  for ( IAlgorithm* ialgoPtr : algos ) {
179  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180  if ( nullptr == algoPtr ) {
181  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
182  << ": this will result in a crash." << endmsg;
183  return StatusCode::FAILURE;
184  }
185 
186  ostdd << "\n " << algoPtr->name();
187 
188  DataObjIDColl algoDependencies;
189  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
190  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
191  DataObjID id = *idp;
192  ostdd << "\n o INPUT " << id;
193  if ( id.key().find( ":" ) != std::string::npos ) {
194  ostdd << " contains alternatives which require resolution...\n";
195  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
196  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
197  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
198  } );
199  if ( itok != tokens.end() ) {
200  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
201  id.updateKey( *itok );
202  } else {
203  error() << "failed to find alternate in global output list"
204  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
205  m_showDataDeps = true;
206  }
207  }
208  algoDependencies.insert( id );
209  globalInp.insert( id );
210  }
211  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
212  ostdd << "\n o OUTPUT " << *id;
213  if ( id->key().find( ":" ) != std::string::npos ) {
214  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
215  << endmsg;
216  m_showDataDeps = true;
217  }
218  }
219  } else {
220  ostdd << "\n none";
221  }
222  algosDependenciesMap[algoPtr->name()] = algoDependencies;
223  }
224 
225  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
226 
227  // Check if we have unmet global input dependencies, and, optionally, heal them
228  // WARNING: this step must be done BEFORE the Precedence Service is initialized
229  if ( m_checkDeps ) {
230  DataObjIDColl unmetDep;
231  for ( auto o : globalInp )
232  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
233 
234  if ( unmetDep.size() > 0 ) {
235 
236  auto printUnmet = [&]( auto msg ) {
237  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
238  msg << " o " << *o << " required by Algorithm: " << endmsg;
239 
240  for ( const auto& p : algosDependenciesMap )
241  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
242  }
243  };
244 
245  if ( !m_useDataLoader.empty() ) {
246 
247  // Find the DataLoader Alg
248  IAlgorithm* dataLoaderAlg( nullptr );
249  for ( IAlgorithm* algo : algos )
250  if ( algo->name() == m_useDataLoader ) {
251  dataLoaderAlg = algo;
252  break;
253  }
254 
255  if ( dataLoaderAlg == nullptr ) {
256  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
257  << "\" found, and unmet INPUT dependencies "
258  << "detected:" << endmsg;
259  printUnmet( fatal() );
260  return StatusCode::FAILURE;
261  }
262 
263  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
264  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
265  printUnmet( info() );
266 
267  // Set the property Load of DataLoader Alg
268  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
269  if ( !dataAlg ) {
270  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
271  << endmsg;
272  return StatusCode::FAILURE;
273  }
274 
275  for ( auto& id : unmetDep ) {
276  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
277  << dataLoaderAlg->name() << endmsg;
279  }
280 
281  } else {
282  fatal() << "Auto DataLoading not requested, "
283  << "and the following unmet INPUT dependencies were found:" << endmsg;
284  printUnmet( fatal() );
285  return StatusCode::FAILURE;
286  }
287 
288  } else {
289  info() << "No unmet INPUT data dependencies were found" << endmsg;
290  }
291  }
292 
293  // Get the precedence service
294  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
295  if ( !m_precSvc.isValid() ) {
296  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
297  return StatusCode::FAILURE;
298  }
299  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
300  if ( !precSvc ) {
301  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
302  return StatusCode::FAILURE;
303  }
304 
305  // Fill the containers to convert algo names to index
306  m_algname_vect.resize( algsNumber );
307  for ( IAlgorithm* algo : algos ) {
308  const std::string& name = algo->name();
309  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
310  m_algname_index_map[name] = index;
311  m_algname_vect.at( index ) = name;
312  }
313 
314  // Shortcut for the message service
315  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
316  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
317 
318  m_eventSlots.reserve( m_maxEventsInFlight );
319  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
320  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
321  m_eventSlots.back().complete = true;
322  }
323 
324  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
325 
326  // Clearly inform about the level of concurrency
327  info() << "Concurrency level information:" << endmsg;
328  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
329  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
330 
331  // Inform about task scheduling prescriptions
332  info() << "Task scheduling settings:" << endmsg;
333  info() << " o Avalanche generation mode: "
334  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
335  info() << " o Preemptive scheduling of CPU-blocking tasks: "
336  << ( m_enablePreemptiveBlockingTasks
337  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
338  : "disabled" )
339  << endmsg;
340  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
341 
342  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
343 
344  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
345 
346  // Simulate execution flow
347  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
348 
349  return sc;
350 }
351 //---------------------------------------------------------------------------
352 
357 
359  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
360 
361  sc = deactivate();
362  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
363 
364  info() << "Joining Scheduler thread" << endmsg;
365  m_thread.join();
366 
367  // Final error check after thread pool termination
368  if ( m_isActive == FAILURE ) {
369  error() << "problems in scheduler thread" << endmsg;
370  return StatusCode::FAILURE;
371  }
372 
373  return sc;
374 }
375 //---------------------------------------------------------------------------
376 
388 
389  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
390 
391  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
392  error() << "problems initializing ThreadPoolSvc" << endmsg;
394  return;
395  }
396 
397  // Wait for actions pushed into the queue by finishing tasks.
398  action thisAction;
400 
401  m_isActive = ACTIVE;
402 
403  // Continue to wait if the scheduler is running or there is something to do
404  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
405  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
406  m_actionsQueue.pop( thisAction );
407  sc = thisAction();
408  ON_VERBOSE {
409  if ( sc.isFailure() )
410  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
411  else
412  verbose() << "Action succeeded." << endmsg;
413  }
414  else sc.ignore();
415 
416  // If all queued actions have been processed, update the slot states
417  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
418  sc = iterate();
419  ON_VERBOSE {
420  if ( sc.isFailure() )
421  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
422  else
423  verbose() << "Iteration succeeded." << endmsg;
424  }
425  else sc.ignore();
426  }
427  }
428 
429  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
430  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
431  error() << "Problems terminating thread pool" << endmsg;
433  }
434 }
435 
436 //---------------------------------------------------------------------------
437 
445 
446  if ( m_isActive == ACTIVE ) {
447 
448  // Set the number of slots available to an error code
449  m_freeSlots.store( 0 );
450 
451  // Empty queue
452  action thisAction;
453  while ( m_actionsQueue.try_pop( thisAction ) ) {};
454 
455  // This would be the last action
456  m_actionsQueue.push( [this]() -> StatusCode {
457  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
459  return StatusCode::SUCCESS;
460  } );
461  }
462 
463  return StatusCode::SUCCESS;
464 }
465 
466 //---------------------------------------------------------------------------
467 
468 // EventSlot management
476 
477  if ( !eventContext ) {
478  fatal() << "Event context is nullptr" << endmsg;
479  return StatusCode::FAILURE;
480  }
481 
482  if ( m_freeSlots.load() == 0 ) {
483  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
484  return StatusCode::FAILURE;
485  }
486 
487  // no problem as push new event is only called from one thread (event loop manager)
488  --m_freeSlots;
489 
490  auto action = [this, eventContext]() -> StatusCode {
491  // Event processing slot forced to be the same as the wb slot
492  const unsigned int thisSlotNum = eventContext->slot();
493  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
494  if ( !thisSlot.complete ) {
495  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
496  return StatusCode::FAILURE;
497  }
498 
499  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
500  thisSlot.reset( eventContext );
501 
502  // Result status code:
504 
505  // promote to CR and DR the initial set of algorithms
506  Cause cs = {Cause::source::Root, "RootDecisionHub"};
507  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
508  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
509  result = StatusCode::FAILURE;
510  }
511 
512  if ( this->iterate().isFailure() ) {
513  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
514  result = StatusCode::FAILURE;
515  }
516 
517  return result;
518  }; // end of lambda
519 
520  // Kick off scheduling
521  ON_VERBOSE {
522  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
523  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
524  }
525 
526  m_actionsQueue.push( action );
527 
528  return StatusCode::SUCCESS;
529 }
530 
531 //---------------------------------------------------------------------------
532 
534  StatusCode sc;
535  for ( auto context : eventContexts ) {
536  sc = pushNewEvent( context );
537  if ( sc != StatusCode::SUCCESS ) return sc;
538  }
539  return sc;
540 }
541 
542 //---------------------------------------------------------------------------
543 
544 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
545 
546 //---------------------------------------------------------------------------
551 
552  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
553  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
554  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
555  // << " active: " << m_isActive << endmsg;
556  return StatusCode::FAILURE;
557  } else {
558  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
559  // << " active: " << m_isActive << endmsg;
560  m_finishedEvents.pop( eventContext );
561  ++m_freeSlots;
562  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
563  return StatusCode::SUCCESS;
564  }
565 }
566 
567 //---------------------------------------------------------------------------
572 
573  if ( m_finishedEvents.try_pop( eventContext ) ) {
574  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
575  << endmsg;
576  ++m_freeSlots;
577  return StatusCode::SUCCESS;
578  }
579  return StatusCode::FAILURE;
580 }
581 
582 //--------------------------------------------------------------------------
583 
592 
593  StatusCode global_sc( StatusCode::SUCCESS );
594 
595  // Retry algorithms
596  const size_t retries = m_retryQueue.size();
597  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
598  TaskSpec retryTS = std::move( m_retryQueue.front() );
599  m_retryQueue.pop();
600  global_sc = schedule( std::move( retryTS ) );
601  }
602 
603  // Loop over all slots
604  OccupancySnapshot nextSnap;
605  auto now = std::chrono::system_clock::now();
606  for ( EventSlot& thisSlot : m_eventSlots ) {
607 
608  // Ignore slots without a valid context (relevant when populating scheduler for first time)
609  if ( !thisSlot.eventContext ) continue;
610 
611  int iSlot = thisSlot.eventContext->slot();
612 
613  // Cache the states of the algorithms to improve readability and performance
614  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
615 
616  StatusCode partial_sc = StatusCode::FAILURE;
617 
618  // Make an occupancy snapshot
621 
622  // Initialise snapshot
623  if ( nextSnap.states.empty() ) {
624  nextSnap.time = now;
625  nextSnap.states.resize( m_eventSlots.size() );
626  }
627 
628  // Store alg states
629  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
630  slotStateTotals.resize( AState::MAXVALUE );
631  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
632  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
633  }
634 
635  // Add subslot alg states
636  for ( auto& subslot : thisSlot.allSubSlots ) {
637  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
638  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
639  }
640  }
641  }
642 
643  // Perform DR->SCHEDULED
644  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
645  uint algIndex{*it};
646  const std::string& algName{index2algname( algIndex )};
647  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
648  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
649 
650  partial_sc =
651  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
652 
653  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
654  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
655  << " on processing slot " << iSlot << endmsg;
656  }
657 
658  // Check for algorithms ready in sub-slots
659  for ( auto& subslot : thisSlot.allSubSlots ) {
660  auto& subslotStates = subslot.algsStates;
661  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
662  uint algIndex{*it};
663  const std::string& algName{index2algname( algIndex )};
664  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
665  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
666  partial_sc =
667  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
668  }
669  }
670 
671  if ( m_dumpIntraEventDynamics ) {
673  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
674  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
675  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
678  std::ofstream myfile;
679  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
680  myfile << s.str();
681  myfile.close();
682  }
683 
684  // Not complete because this would mean that the slot is already free!
685  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
686  !thisSlot.algsStates.containsAny(
687  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
688  !subSlotAlgsInStates( thisSlot,
689  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
690  !thisSlot.complete ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
697  << thisSlot.eventContext->slot() << ")." << endmsg;
698  m_finishedEvents.push( thisSlot.eventContext.release() );
699  }
700 
701  // now let's return the fully evaluated result of the control flow
702  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
703 
704  thisSlot.eventContext.reset( nullptr );
705 
706  } else if ( isStalled( thisSlot ) ) {
707  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
708  eventFailed( thisSlot.eventContext.get() ); // can't release yet
709  }
710  partial_sc.ignore();
711  } // end loop on slots
712 
713  // Process snapshot
714  if ( !nextSnap.states.empty() ) {
715  m_lastSnapshot = nextSnap.time;
716  m_snapshotCallback( std::move( nextSnap ) );
717  }
718 
719  ON_VERBOSE verbose() << "Iteration done." << endmsg;
720  m_needsUpdate.store( false );
721  return global_sc;
722 }
723 
724 //---------------------------------------------------------------------------
725 // Update algorithm state and, optionally, revise states of other downstream algorithms
726 StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
727  StatusCode sc;
728  auto slotIndex = contextPtr->slot();
729  EventSlot& slot = m_eventSlots[slotIndex];
730  Cause cs = {Cause::source::Task, index2algname( iAlgo )};
731 
732  if ( UNLIKELY( contextPtr->usesSubSlot() ) ) {
733  // Sub-slot
734  auto subSlotIndex = contextPtr->subSlot();
735  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
736 
737  sc = subSlot.algsStates.set( iAlgo, state );
738 
739  if ( LIKELY( sc.isSuccess() ) ) {
740  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
741  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
742  // Revise states of algorithms downstream the precedence graph
743  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
744  }
745  } else {
746  // Event level (standard behaviour)
747  sc = slot.algsStates.set( iAlgo, state );
748 
749  if ( LIKELY( sc.isSuccess() ) ) {
750  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
751  << ", event:" << contextPtr->evt() << "]" << endmsg;
752  // Revise states of algorithms downstream the precedence graph
753  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
754  }
755  }
756  return sc;
757 }
758 
759 //---------------------------------------------------------------------------
760 
767 bool AvalancheSchedulerSvc::isStalled( const EventSlot& slot ) const {
768 
769  if ( !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
770  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
771 
772  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
773 
774  return true;
775  }
776  return false;
777 }
778 
779 //---------------------------------------------------------------------------
780 
786  const uint slotIdx = eventContext->slot();
787 
788  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
789 
790  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
791 
792  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
793  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
794 
795  // Push into the finished events queue the failed context
796  m_eventSlots[slotIdx].complete = true;
797  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
798 }
799 
800 //---------------------------------------------------------------------------
801 
807 
808  // To have just one big message
809  std::ostringstream outputMS;
810 
811  outputMS << "Dumping scheduler state\n"
812  << "=========================================================================================\n"
813  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
814  << "=========================================================================================\n\n";
815 
816  //===========================================================================
817 
818  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
819  << "------------------\n\n";
820 
821  // Figure if TimelineSvc is available (used below to detect threads IDs)
822  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
823  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
824  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
825  } else {
826 
827  // Figure optimal printout layout
828  size_t indt( 0 );
829  for ( auto& slot : m_eventSlots )
830  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
831  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
832 
833  // Figure the last running schedule across all slots
834  for ( auto& slot : m_eventSlots ) {
835  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
836  ++it ) {
837 
838  const std::string& algoName{index2algname( *it )};
839 
840  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
841  << slot.eventContext->slot();
842 
843  // Try to get POSIX threads IDs the currently running tasks are scheduled to
844  if ( timelineSvc.isValid() ) {
845  TimelineEvent te{};
846  te.algorithm = algoName;
847  te.slot = slot.eventContext->slot();
848  te.event = slot.eventContext->evt();
849 
850  if ( timelineSvc->getTimelineEvent( te ) )
851  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
852  else
853  outputMS << " thread.id: [unknown]"; // this means a task has just
854  // been signed off as SCHEDULED,
855  // but has not been assigned to a thread yet
856  // (i.e., not running yet)
857  }
858  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
859  }
860  }
861  }
862 
863  //===========================================================================
864 
865  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
866  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
867 
868  int slotCount = -1;
869  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( {AState::ERROR} ) ||
870  subSlotAlgsInStates( m_eventSlots[iSlot], {AState::ERROR} )
871  : false;
872 
873  for ( auto& slot : m_eventSlots ) {
874  ++slotCount;
875  if ( slot.complete ) continue;
876 
877  outputMS << "[ slot: "
878  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
879  << " event: "
880  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
881  << " ]:\n\n";
882 
883  if ( 0 > iSlot || iSlot == slotCount ) {
884 
885  // If an alg has thrown an error then it's not a failure of the CF/DF graph
886  if ( wasAlgError ) {
887  outputMS << "ERROR alg(s):";
888  int errorCount = 0;
889  for ( auto it = slot.algsStates.begin( AState::ERROR ); it != slot.algsStates.end( AState::ERROR ); ++it ) {
890  outputMS << " " << index2algname( *it );
891  ++errorCount;
892  }
893  if ( errorCount == 0 ) outputMS << " in subslot(s)";
894  outputMS << "\n\n";
895  } else {
896  // Snapshot of the Control Flow and FSM states
897  outputMS << m_precSvc->printState( slot ) << "\n";
898  }
899 
900  // Mention sub slots (this is expensive if the number of sub-slots is high)
901  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
902  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
903  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
904  for ( auto& ss : slot.allSubSlots ) {
905  outputMS << "[ slot: " << slotID << ", sub-slot: "
906  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
907  << ", entry: " << ss.entryPoint << ", event: "
908  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
909  << " ]:\n\n";
910  if ( wasAlgError ) {
911  outputMS << "ERROR alg(s):";
912  for ( auto it = ss.algsStates.begin( AState::ERROR ); it != ss.algsStates.end( AState::ERROR ); ++it ) {
913  outputMS << " " << index2algname( *it );
914  }
915  outputMS << "\n\n";
916  } else {
917  outputMS << m_precSvc->printState( ss ) << "\n";
918  }
919  }
920  }
921  }
922  }
923 
924  //===========================================================================
925 
926  if ( 0 <= iSlot && !wasAlgError ) {
927  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
928  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
929  }
930 
931  outputMS << "\n=========================================================================================\n"
932  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
933  << "=========================================================================================\n\n";
934 
935  info() << outputMS.str() << endmsg;
936 }
937 
938 //---------------------------------------------------------------------------
939 
941 
943  m_retryQueue.push( std::move( ts ) );
944  return StatusCode::SUCCESS;
945  }
946 
947  // Check if a free Algorithm instance is available
948  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
949 
950  // If an instance is available, proceed to scheduling
951  StatusCode sc;
952  if ( LIKELY( getAlgSC.isSuccess() ) ) {
953 
954  // Decide how to schedule the task and schedule it
955  if ( LIKELY( -100 != m_threadPoolSize ) ) {
956 
957  // Cache values before moving the TaskSpec further
958  unsigned int algIndex{ts.algIndex};
959  std::string_view algName( ts.algName );
960  unsigned int algRank{ts.algRank};
961  bool blocking{ts.blocking};
962  int slotIndex{ts.slotIndex};
963  EventContext* contextPtr{ts.contextPtr};
964 
965  if ( LIKELY( !blocking ) ) {
966  // Add the algorithm to the scheduled queue
967  m_scheduledQueue.push( std::move( ts ) );
968 
969  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
970  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, false ) );
971  ++m_algosInFlight;
972 
973  } else { // schedule blocking algorithm in independent thread
975 
976  // Schedule the blocking task in an independent thread
978  std::thread _t( AlgTask( this, serviceLocator(), m_algExecStateSvc, true ) );
979  _t.detach();
980 
981  } // end scheduling blocking Algorithm
982 
983  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
984 
985  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
986  << ", rank:" << algRank << ", blocking:" << ( blocking ? "yes" : "no" )
987  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
989  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
990  : "" )
991  << endmsg;
992 
993  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
994  ++m_algosInFlight;
995  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
996  AlgTask( this, serviceLocator(), m_algExecStateSvc, false )();
997  --m_algosInFlight;
998  }
999  } else { // if no Algorithm instance available, retry later
1000 
1001  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1002  // Add the algorithm to the retry queue
1003  m_retryQueue.push( std::move( ts ) );
1004  }
1005 
1007 
1008  return sc;
1009 }
1010 
1011 //---------------------------------------------------------------------------
1012 
1017 
1018  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1019 
1020  if ( LIKELY( !ts.blocking ) )
1021  --m_algosInFlight;
1022  else
1024 
1025  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1026  AState state = algstate.execStatus().isSuccess()
1027  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1028  : AState::ERROR;
1029 
1030  // Update algorithm state and revise the downstream states
1031  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1032 
1033  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1034  << ", rank:" << ts.algRank << ", blocking:" << ( ts.blocking ? "yes" : "no" )
1035  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1037  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1038  : "" )
1039  << endmsg;
1040 
1041  // Prompt a call to updateStates
1042  m_needsUpdate.store( true );
1043  return sc;
1044 }
1045 
1046 //---------------------------------------------------------------------------
1047 
1048 // Method to inform the scheduler about event views
1049 
1051  std::unique_ptr<EventContext> viewContext ) {
1052  // Prevent view nesting
1053  if ( sourceContext->usesSubSlot() ) {
1054  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1055  return StatusCode::FAILURE;
1056  }
1057 
1058  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1059 
1060  // It's not possible to create an std::functional from a move-capturing lambda
1061  // So, we have to release the unique pointer
1062  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1063  &nodeName]() -> StatusCode {
1064  // Attach the sub-slot to the top-level slot
1065  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1066 
1067  if ( viewContextPtr ) {
1068  // Re-create the unique pointer
1069  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1070  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1071  return StatusCode::SUCCESS;
1072  } else {
1073  // Disable the view node if there are no views
1074  topSlot.disableSubSlots( nodeName );
1075  return StatusCode::SUCCESS;
1076  }
1077  };
1078 
1079  m_actionsQueue.push( std::move( action ) );
1080 
1081  return StatusCode::SUCCESS;
1082 }
1083 
1084 //---------------------------------------------------------------------------
1085 
1086 // Sample occupancy at fixed interval (ms)
1087 // Negative value to deactivate, 0 to snapshot every change
1088 // Each sample, apply the callback function to the result
1089 
1090 void AvalancheSchedulerSvc::recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) {
1091 
1092  auto action = [this, samplePeriod, callback{std::move( callback )}]() -> StatusCode {
1093  if ( samplePeriod < 0 ) {
1095  } else {
1098  }
1099  return StatusCode::SUCCESS;
1100  };
1101 
1102  m_actionsQueue.push( std::move( action ) );
1103 }
IOTest.evt
evt
Definition: IOTest.py:105
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:231
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
std::vector::resize
T resize(T... args)
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
AvalancheSchedulerSvc::TaskSpec
Struct to hold entries in the alg queues.
Definition: AvalancheSchedulerSvc.h:286
AvalancheSchedulerSvc::finalize
StatusCode finalize() override
Finalise.
Definition: AvalancheSchedulerSvc.cpp:356
std::list< IAlgorithm * >
Read.app
app
Definition: Read.py:35
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:542
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:355
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:176
std::unordered_set< DataObjID, DataObjID_Hasher >
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:43
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:158
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:674
gaudirun.s
string s
Definition: gaudirun.py:328
std::vector
STL class.
std::find_if
T find_if(T... args)
std::unordered_set::size
T size(T... args)
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:591
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
AlgsExecutionStates
Definition: AlgsExecutionStates.h:36
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:86
std::chrono::duration
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:18
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:162
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
std::stringstream
STL class.
std::unique_ptr::get
T get(T... args)
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:73
std::unique_ptr::release
T release(T... args)
ConcurrencyFlags.h
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AlgsExecutionStates::end
Iterator end(State kind)
Definition: AlgsExecutionStates.h:117
std::function
std::any_of
T any_of(T... args)
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:322
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:940
compareRootHistos.ts
tuple ts
Definition: compareRootHistos.py:460
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:327
AtlasMCRecoScenario.threads
int threads
Definition: AtlasMCRecoScenario.py:23
std::sort
T sort(T... args)
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:188
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:444
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
ThreadPoolSvc.h
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:234
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:40
std::thread::detach
T detach(T... args)
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:333
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:243
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
DataObjID::fullKey
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
Definition: DataObjID.cpp:99
std::hex
T hex(T... args)
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:158
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:246
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:785
TimelineEvent
Definition: ITimelineSvc.h:23
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:165
bug_34121.t
t
Definition: bug_34121.py:30
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:73
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:334
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:171
TimingHistograms.name
name
Definition: TimingHistograms.py:23
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
AlgTask.h
ITimelineSvc
Definition: ITimelineSvc.h:37
IAlgorithm
Definition: IAlgorithm.h:38
std::atomic::load
T load(T... args)
AlgsExecutionStates::begin
Iterator begin(State kind)
Definition: AlgsExecutionStates.h:116
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::ofstream
STL class.
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:180
LIKELY
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:170
AvalancheSchedulerSvc
Definition: AvalancheSchedulerSvc.h:112
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
DataHandleHolderVisitor.h
std::to_string
T to_string(T... args)
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
AlgExecState::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:43
std::ofstream::close
T close(T... args)
AvalancheSchedulerSvc::m_scheduledBlockingQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledBlockingQueue
Definition: AvalancheSchedulerSvc.h:323
AvalancheSchedulerSvc::recordOccupancy
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
Definition: AvalancheSchedulerSvc.cpp:1090
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:222
Algorithm.h
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:155
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:158
std::ofstream::open
T open(T... args)
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:29
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:249
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map
STL class.
AvalancheSchedulerSvc::tryPopFinishedEvent
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Definition: AvalancheSchedulerSvc.cpp:571
AvalancheSchedulerSvc::scheduleEventView
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Definition: AvalancheSchedulerSvc.cpp:1050
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:278
AvalancheSchedulerSvc::freeSlots
unsigned int freeSlots() override
Get free slots number.
Definition: AvalancheSchedulerSvc.cpp:544
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:190
DataObjID
Definition: DataObjID.h:47
AvalancheSchedulerSvc::initialize
StatusCode initialize() override
Initialise.
Definition: AvalancheSchedulerSvc.cpp:74
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:63
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:156
std::chrono::duration::min
T min(T... args)
HistoDumpEx.v
v
Definition: HistoDumpEx.py:27
std::ostringstream
STL class.
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:42
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:142
ThreadLocalContext.h
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:666
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:178
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:324
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:25
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:84
compareRootHistos.state
def state
Definition: compareRootHistos.py:468
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:161
std::vector::begin
T begin(T... args)
std
STL namespace.
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
std::unordered_set::insert
T insert(T... args)
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:332
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
EventContext
Definition: EventContext.h:34
AlgsExecutionStates::State
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
Definition: AlgsExecutionStates.h:40
concurrency::AlgorithmNode::getAlgoIndex
const unsigned int & getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:525
TimelineEvent::algorithm
std::string algorithm
Definition: ITimelineSvc.h:31
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:726
AlgExecState::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:41
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:387
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:283
std::unordered_set::empty
T empty(T... args)
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:767
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:114
std::atomic::store
T store(T... args)
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:83
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:213
std::vector::end
T end(T... args)
AvalancheSchedulerSvc::pushNewEvents
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Definition: AvalancheSchedulerSvc.cpp:533
IAlgorithm.h
AlgExecState
Definition: IAlgExecStateSvc.h:37
std::setw
T setw(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::max
T max(T... args)
UNLIKELY
#define UNLIKELY(x)
Definition: Kernel.h:106
AvalancheSchedulerSvc::signoff
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
Definition: AvalancheSchedulerSvc.cpp:1016
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:77
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:237
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:252
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:163
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:475
AvalancheSchedulerSvc::popFinishedEvent
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Definition: AvalancheSchedulerSvc.cpp:550
std::unique_ptr< EventContext >
ProduceConsume.key
key
Definition: ProduceConsume.py:52
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
Cause
Definition: PrecedenceRulesGraph.h:399
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:228
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:210
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:240
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:806
IDataManagerSvc.h
std::thread::join
T join(T... args)
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
AvalancheSchedulerSvc.h
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
std::initializer_list
gaudirun.callback
callback
Definition: gaudirun.py:194
std::chrono::system_clock::now
T now(T... args)