The Gaudi Framework  master (34daa81a)
Loading...
Searching...
No Matches
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2026 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
12#include "AlgTask.h"
13#include "FiberManager.h"
14#include "GraphDumper.h"
15#include "ThreadPoolSvc.h"
16
17// Framework includes
18#include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
25
26// C++
27#include <algorithm>
28#include <fstream>
29#include <map>
30#include <queue>
31#include <regex>
32#include <semaphore>
33#include <sstream>
34#include <string_view>
35#include <thread>
36#include <unordered_set>
37
38// External libs
39#include <boost/algorithm/string.hpp>
40#include <boost/thread.hpp>
41#include <boost/tokenizer.hpp>
42
43// Instantiation of a static factory class used by clients to create instances of this service
45
46#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47#define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
48
49namespace {
50 struct DataObjIDSorter {
51 bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
52 };
53
54 // Sort a DataObjIDColl in a well-defined, reproducible manner.
55 // Used for making debugging dumps.
56 std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
57 std::vector<const DataObjID*> v;
58 v.reserve( coll.size() );
59 for ( const DataObjID& id : coll ) v.push_back( &id );
60 std::sort( v.begin(), v.end(), DataObjIDSorter() );
61 return v;
62 }
63
64 bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
65 return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
66 [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
67 }
68} // namespace
69
70//---------------------------------------------------------------------------
71
77
79
80 // Initialise mother class (read properties, ...)
82 if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83
84 // Get hold of the TBBSvc. This should initialize the thread pool
85 m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86 if ( !m_threadPoolSvc.isValid() ) {
87 fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
89 }
90 auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
91 if ( !castTPS ) {
92 fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
94 }
95 m_arena = castTPS->getArena();
96 if ( !m_arena ) {
97 fatal() << "Cannot find valid TBB task_arena" << endmsg;
99 }
100
101 // Activate the scheduler in another thread.
102 info() << "Activating scheduler in a separate thread" << endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [this, &fiber_manager_initalized]() {
105 // Initialize FiberManager
106 this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
107 fiber_manager_initalized.release();
108 this->activate();
109 } );
110 // Wait for initialization to complete
111 fiber_manager_initalized.acquire();
112
113 while ( m_isActive != ACTIVE ) {
114 if ( m_isActive == FAILURE ) {
115 fatal() << "Terminating initialization" << endmsg;
116 return StatusCode::FAILURE;
117 } else {
118 ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
119 sleep( 1 );
120 }
121 }
122
123 if ( m_enableCondSvc ) {
124 // Get hold of the CondSvc
125 m_condSvc = serviceLocator()->service( "CondSvc" );
126 if ( !m_condSvc.isValid() ) {
127 warning() << "No CondSvc found, or not enabled. "
128 << "Will not manage CondAlgorithms" << endmsg;
129 m_enableCondSvc = false;
130 }
131 }
132
133 // Get the algo resource pool
134 m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
135 if ( !m_algResourcePool.isValid() ) {
136 fatal() << "Error retrieving AlgoResourcePool" << endmsg;
137 return StatusCode::FAILURE;
138 }
139
140 m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
141 if ( !m_algExecStateSvc.isValid() ) {
142 fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
143 return StatusCode::FAILURE;
144 }
145
146 // Get Whiteboard
148 if ( !m_whiteboard.isValid() ) {
149 fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
150 return StatusCode::FAILURE;
151 }
152
153 // Set the MaxEventsInFlight parameters from the number of WB stores
154 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
155
156 // Set the number of free slots
158
159 // Get the list of algorithms
160 const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() << "Found " << algsNumber << " algorithms" << endmsg;
164 } else {
165 error() << "No algorithms found" << endmsg;
166 return StatusCode::FAILURE;
167 }
168
169 /* Dependencies
170 1) Look for handles in algo, if none
171 2) Assume none are required
172 */
173
174 DataObjIDColl globalInp, globalOutp;
175
176 // figure out all outputs
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
178 for ( IAlgorithm* ialgoPtr : algos ) {
179 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180 if ( !algoPtr ) {
181 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
182 return StatusCode::FAILURE;
183 }
184
185 DataObjIDColl algoOutputs;
186 for ( auto id : algoPtr->outputDataObjs() ) {
187 globalOutp.insert( id );
188 algoOutputs.insert( id );
189 }
190 algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
191 }
192
193 std::ostringstream ostdd;
194 ostdd << "Data Dependencies for Algorithms:";
195
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
197 for ( IAlgorithm* ialgoPtr : algos ) {
198 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
199 if ( nullptr == algoPtr ) {
200 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201 << ": this will result in a crash." << endmsg;
202 return StatusCode::FAILURE;
203 }
204
205 DataObjIDColl i1, i2;
206 DHHVisitor avis( i1, i2 );
207 algoPtr->acceptDHVisitor( &avis );
208
209 ostdd << "\n " << algoPtr->name();
210
211 auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
212 auto owners = avis.owners_names_of( id );
213 if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
214 };
215
216 DataObjIDColl algoDependencies;
217 if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
218 for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
219 DataObjID id = *idp;
220 ostdd << "\n o INPUT " << id;
221 write_owners( id );
222 algoDependencies.insert( id );
223 globalInp.insert( id );
224 }
225 for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
226 ostdd << "\n o OUTPUT " << *id;
227 write_owners( *id );
228 if ( id->key().find( ":" ) != std::string::npos ) {
229 error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
230 << endmsg;
231 m_showDataDeps = true;
232 }
233 }
234 } else {
235 ostdd << "\n none";
236 }
237 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
238 }
239
240 if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
241
242 // If requested, dump a graph of the data dependencies in a .dot, .md or .graphml file
243 if ( not m_dataDepsGraphFile.empty() ) {
244 if ( dumpDataDepsGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
245 return StatusCode::FAILURE;
246 }
247 }
248
249 // Check if we have unmet global input dependencies, and, optionally, heal them
250 // WARNING: this step must be done BEFORE the Precedence Service is initialized
251 DataObjIDColl unmetDepInp, unusedOutp;
252 if ( m_checkDeps || m_checkOutput ) {
253 std::set<std::string> requiredInputKeys;
254 for ( auto o : globalInp ) {
255 // track aliases
256 // (assuming there should be no items with different class and same key corresponding to different objects)
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
259 }
260 if ( m_checkOutput ) {
261 for ( auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
263 // check ignores
264 bool ignored{};
265 for ( const std::string& algoName : m_checkOutputIgnoreList ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
269 ignored = true;
270 break;
271 }
272 }
273 }
274 if ( !ignored ) { unusedOutp.insert( o ); }
275 }
276 }
277 }
278 }
279
280 if ( m_checkDeps ) {
281 if ( unmetDepInp.size() > 0 ) {
282
283 auto printUnmet = [&]( auto msg ) {
284 for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg << " o " << *o << " required by Algorithm: " << endmsg;
286
287 for ( const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
289 }
290 };
291
292 if ( !m_useDataLoader.empty() ) {
293
294 // Find the DataLoader Alg
295 IAlgorithm* dataLoaderAlg( nullptr );
296 for ( IAlgorithm* algo : algos )
297 if ( m_useDataLoader == algo->name() ) {
298 dataLoaderAlg = algo;
299 break;
300 }
301
302 if ( dataLoaderAlg == nullptr ) {
303 fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
304 << "\" found, and unmet INPUT dependencies "
305 << "detected:" << endmsg;
306 printUnmet( fatal() );
307 return StatusCode::FAILURE;
308 }
309
310 info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
311 << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
312 printUnmet( info() );
313
314 // Set the property Load of DataLoader Alg
315 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
316 if ( !dataAlg ) {
317 fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
318 << endmsg;
319 return StatusCode::FAILURE;
320 }
321
322 for ( auto& id : unmetDepInp ) {
323 ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
324 << dataLoaderAlg->name() << endmsg;
326 }
327
328 } else {
329 fatal() << "Auto DataLoading not requested, "
330 << "and the following unmet INPUT dependencies were found:" << endmsg;
331 printUnmet( fatal() );
332 return StatusCode::FAILURE;
333 }
334
335 } else {
336 info() << "No unmet INPUT data dependencies were found" << endmsg;
337 }
338 }
339
340 if ( m_checkOutput ) {
341 if ( unusedOutp.size() > 0 ) {
342
343 auto printUnusedOutp = [&]( auto msg ) {
344 for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg << " o " << *o << " produced by Algorithm: " << endmsg;
346
347 for ( const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
349 }
350 };
351
352 fatal() << "The following unused OUTPUT items were found:" << endmsg;
353 printUnusedOutp( fatal() );
354 return StatusCode::FAILURE;
355 } else {
356 info() << "No unused OUTPUT items were found" << endmsg;
357 }
358 }
359
360 // Get the precedence service
361 m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
362 if ( !m_precSvc.isValid() ) {
363 fatal() << "Error retrieving PrecedenceSvc" << endmsg;
364 return StatusCode::FAILURE;
365 }
366 const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
367 if ( !precSvc ) {
368 fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
369 return StatusCode::FAILURE;
370 }
371
372 // Fill the containers to convert algo names to index
373 m_algname_vect.resize( algsNumber );
374 for ( IAlgorithm* algo : algos ) {
375 const std::string& name = algo->name();
376 auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
377 m_algname_index_map[name] = index;
378 m_algname_vect.at( index ) = name;
379 }
380
381 // Shortcut for the message service
382 SmartIF<IMessageSvc> messageSvc( serviceLocator() );
383 if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
384
386 for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
387 m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
388 m_eventSlots.back().complete = true;
389 }
390
392
393 // Clearly inform about the level of concurrency
394 info() << "Concurrency level information:" << endmsg;
395 info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
396 info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
397 info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
398
399 // Inform about task scheduling prescriptions
400 info() << "Task scheduling settings:" << endmsg;
401 info() << " o Avalanche generation mode: "
402 << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
403 info() << " o Preemptive scheduling of CPU-blocking tasks: "
405 ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
406 : "disabled" )
407 << endmsg;
408 info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
409
410 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
411
412 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
413
414 // Simulate execution flow
415 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
416
417 return sc;
418}
419//---------------------------------------------------------------------------
420
425
427 if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
428
429 sc = deactivate();
430 if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
431
432 debug() << "Deleting FiberManager" << endmsg;
433 m_fiberManager.reset();
434
435 info() << "Joining Scheduler thread" << endmsg;
436 m_thread.join();
437
438 // Final error check after thread pool termination
439 if ( m_isActive == FAILURE ) {
440 error() << "problems in scheduler thread" << endmsg;
441 return StatusCode::FAILURE;
442 }
443
444 return sc;
445}
446//---------------------------------------------------------------------------
447
459
460 ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
461
462 if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
463 error() << "problems initializing ThreadPoolSvc" << endmsg;
465 return;
466 }
467
468 // Wait for actions pushed into the queue by finishing tasks.
469 action thisAction;
471
473
474 // Continue to wait if the scheduler is running or there is something to do
475 ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
476 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
477 m_actionsQueue.pop( thisAction );
478 sc = thisAction();
479 ON_VERBOSE {
480 if ( sc.isFailure() )
481 verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
482 else
483 verbose() << "Action succeeded." << endmsg;
484 }
485 else sc.ignore();
486
487 // If all queued actions have been processed, update the slot states
488 if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
489 sc = iterate();
490 ON_VERBOSE {
491 if ( sc.isFailure() )
492 verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
493 else
494 verbose() << "Iteration succeeded." << endmsg;
495 }
496 else sc.ignore();
497 }
498 }
499
500 ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
501 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
502 error() << "Problems terminating thread pool" << endmsg;
504 }
505}
506
507//---------------------------------------------------------------------------
508
516
517 if ( m_isActive == ACTIVE ) {
518
519 // Set the number of slots available to an error code
520 m_freeSlots.store( 0 );
521
522 // Empty queue
523 action thisAction;
524 while ( m_actionsQueue.try_pop( thisAction ) ) {};
525
526 // This would be the last action
527 m_actionsQueue.push( [this]() -> StatusCode {
528 ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
530 return StatusCode::SUCCESS;
531 } );
532 }
533
534 return StatusCode::SUCCESS;
535}
536
537//---------------------------------------------------------------------------
538
539// EventSlot management
547
548 if ( !eventContext ) {
549 fatal() << "Event context is nullptr" << endmsg;
550 return StatusCode::FAILURE;
551 }
552
553 if ( m_freeSlots.load() == 0 ) {
554 ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
555 return StatusCode::FAILURE;
556 }
557
558 // no problem as push new event is only called from one thread (event loop manager)
559 --m_freeSlots;
560
561 auto action = [this, eventContext]() -> StatusCode {
562 // Event processing slot forced to be the same as the wb slot
563 const unsigned int thisSlotNum = eventContext->slot();
564 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
565 if ( !thisSlot.complete ) {
566 fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
567 return StatusCode::FAILURE;
568 }
569
570 ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
571 thisSlot.reset( eventContext );
572
573 // Result status code:
575
576 // promote to CR and DR the initial set of algorithms
577 Cause cs = { Cause::source::Root, "RootDecisionHub" };
578 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
580 result = StatusCode::FAILURE;
581 }
582
583 if ( this->iterate().isFailure() ) {
584 error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
585 result = StatusCode::FAILURE;
586 }
587
588 return result;
589 }; // end of lambda
590
591 // Kick off scheduling
592 ON_VERBOSE {
593 verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
594 verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
595 }
596
597 m_actionsQueue.push( action );
598
599 return StatusCode::SUCCESS;
600}
601
602//---------------------------------------------------------------------------
603
604StatusCode AvalancheSchedulerSvc::pushNewEvents( std::vector<EventContext*>& eventContexts ) {
605 StatusCode sc;
606 for ( auto context : eventContexts ) {
607 sc = pushNewEvent( context );
608 if ( sc != StatusCode::SUCCESS ) return sc;
609 }
610 return sc;
611}
612
613//---------------------------------------------------------------------------
614
615unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
616
617//---------------------------------------------------------------------------
618
620
621//---------------------------------------------------------------------------
626
627 // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
628 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
629 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
630 // << " active: " << m_isActive << endmsg;
631 return StatusCode::FAILURE;
632 } else {
633 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
634 // << " active: " << m_isActive << endmsg;
635 m_finishedEvents.pop( eventContext );
636 ++m_freeSlots;
637 ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
638 return StatusCode::SUCCESS;
639 }
640}
641
642//---------------------------------------------------------------------------
647
648 if ( m_finishedEvents.try_pop( eventContext ) ) {
649 ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
650 << endmsg;
651 ++m_freeSlots;
652 return StatusCode::SUCCESS;
653 }
654 return StatusCode::FAILURE;
655}
656
657//--------------------------------------------------------------------------
658
667
668 StatusCode global_sc( StatusCode::SUCCESS );
669
670 // Retry algorithms
671 const size_t retries = m_retryQueue.size();
672 for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
673 TaskSpec retryTS = std::move( m_retryQueue.front() );
674 m_retryQueue.pop();
675 global_sc = schedule( std::move( retryTS ) );
676 }
677
678 // Loop over all slots
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
681 for ( EventSlot& thisSlot : m_eventSlots ) {
682
683 // Ignore slots without a valid context (relevant when populating scheduler for first time)
684 if ( !thisSlot.eventContext ) continue;
685
686 int iSlot = thisSlot.eventContext->slot();
687
688 // Cache the states of the algorithms to improve readability and performance
689 AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
690
691 StatusCode partial_sc = StatusCode::FAILURE;
692
693 // Make an occupancy snapshot
694 if ( m_snapshotInterval != std::chrono::duration<int64_t, std::milli>::min() &&
696
697 // Initialise snapshot
698 if ( nextSnap.states.empty() ) {
699 nextSnap.time = now;
700 nextSnap.states.resize( m_eventSlots.size() );
701 }
702
703 // Store alg states
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
706 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707 slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
708 }
709
710 // Add subslot alg states
711 for ( auto& subslot : thisSlot.allSubSlots ) {
712 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713 slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
714 }
715 }
716 }
717
718 // Perform DR->SCHEDULED
719 const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
721 const std::string& algName{ index2algname( algIndex ) };
722 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
723 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
724
725 partial_sc =
726 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
727
728 ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
729 << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
730 << " on processing slot " << iSlot << endmsg;
731 }
732
733 // Check for algorithms ready in sub-slots
734 for ( auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
737 const std::string& algName{ index2algname( algIndex ) };
738 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
739 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
740 partial_sc =
741 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
742 }
743 }
744
746 std::stringstream s;
747 s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
748 << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
749 << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
750 auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
753 myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
754 myfile << s.str();
755 myfile.close();
756 }
757
758 // Not complete because this would mean that the slot is already free!
759 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
765
766 thisSlot.complete = true;
767 // if the event did not fail, add it to the finished events
768 // otherwise it is taken care of in the error handling
769 if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
770 ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
771 << thisSlot.eventContext->slot() << ")." << endmsg;
772 m_finishedEvents.push( thisSlot.eventContext.release() );
773 }
774
775 // now let's return the fully evaluated result of the control flow
776 ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
777
778 thisSlot.eventContext.reset( nullptr );
779
780 } else if ( isStalled( thisSlot ) ) {
781 m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
782 eventFailed( thisSlot.eventContext.get() ); // can't release yet
783 }
784 partial_sc.ignore();
785 } // end loop on slots
786
787 // Process snapshot
788 if ( !nextSnap.states.empty() ) {
789 m_lastSnapshot = nextSnap.time;
790 m_snapshotCallback( std::move( nextSnap ) );
791 }
792
793 ON_VERBOSE verbose() << "Iteration done." << endmsg;
794 m_needsUpdate.store( false );
795 return global_sc;
796}
797
798//---------------------------------------------------------------------------
799// Update algorithm state and, optionally, revise states of other downstream algorithms
800StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
801 StatusCode sc;
802 auto slotIndex = contextPtr->slot();
803 EventSlot& slot = m_eventSlots[slotIndex];
804 Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
805
806 if ( contextPtr->usesSubSlot() ) {
807 // Sub-slot
808 auto subSlotIndex = contextPtr->subSlot();
809 EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
810
811 sc = subSlot.algsStates.set( iAlgo, state );
812
813 if ( sc.isSuccess() ) {
814 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
815 << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
816 // Revise states of algorithms downstream the precedence graph
817 if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
818 }
819 } else {
820 // Event level (standard behaviour)
821 sc = slot.algsStates.set( iAlgo, state );
822
823 if ( sc.isSuccess() ) {
824 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
825 << ", event:" << contextPtr->evt() << "]" << endmsg;
826 // Revise states of algorithms downstream the precedence graph
827 if ( iterate ) sc = m_precSvc->iterate( slot, cs );
828 }
829 }
830 return sc;
831}
832
833//---------------------------------------------------------------------------
834
842
843 if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845
846 error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
847
848 return true;
849 }
850 return false;
851}
852
853//---------------------------------------------------------------------------
854
860 const uint slotIdx = eventContext->slot();
861
862 error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
863
864 dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
865
866 // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
867 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
868
869 // Push into the finished events queue the failed context
870 m_eventSlots[slotIdx].complete = true;
871 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
872}
873
874//---------------------------------------------------------------------------
875
881
882 // To have just one big message
883 std::ostringstream outputMS;
884
885 outputMS << "Dumping scheduler state\n"
886 << "=========================================================================================\n"
887 << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 << "=========================================================================================\n\n";
889
890 //===========================================================================
891
892 outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 << "------------------\n\n";
894
895 // Figure if TimelineSvc is available (used below to detect threads IDs)
896 auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
899 } else {
900
901 // Figure optimal printout layout
902 size_t indt( 0 );
903 for ( auto& slot : m_eventSlots ) {
904
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
907 if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
908 }
909 }
910
911 // Figure the last running schedule across all slots
912 for ( auto& slot : m_eventSlots ) {
913
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
916
917 const std::string& algoName{ index2algname( algIndex ) };
918
919 outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
920 << slot.eventContext->slot();
921
922 // Try to get POSIX threads IDs the currently running tasks are scheduled to
923 if ( timelineSvc.isValid() ) {
924 TimelineEvent te{};
925 te.algorithm = algoName;
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
928
929 if ( timelineSvc->getTimelineEvent( te ) )
930 outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
931 else
932 outputMS << " thread.id: [unknown]"; // this means a task has just
933 // been signed off as SCHEDULED,
934 // but has not been assigned to a thread yet
935 // (i.e., not running yet)
936 }
937 outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
938 }
939 }
940 }
941
942 //===========================================================================
943
944 outputMS << "\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
946
947 int slotCount = -1;
948 bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949 subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
950 : false;
951
952 for ( auto& slot : m_eventSlots ) {
953 ++slotCount;
954 if ( slot.complete ) continue;
955
956 outputMS << "[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
958 << ", event: "
959 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
960
961 if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
962 outputMS << " ]:\n\n";
963
964 if ( 0 > iSlot || iSlot == slotCount ) {
965
966 // If an alg has thrown an error then it's not a failure of the CF/DF graph
967 if ( wasAlgError ) {
968 outputMS << "ERROR alg(s):";
969 int errorCount = 0;
970 const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
972 outputMS << " " << index2algname( algIndex );
973 ++errorCount;
974 }
975 if ( errorCount == 0 ) outputMS << " in subslot(s)";
976 outputMS << "\n\n";
977 } else {
978 // Snapshot of the Control Flow and FSM states
979 outputMS << m_precSvc->printState( slot ) << "\n";
980 }
981
982 // Mention sub slots (this is expensive if the number of sub-slots is high)
983 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984 outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
986 for ( auto& ss : slot.allSubSlots ) {
987 outputMS << "[ slot: " << slotID << ", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
989 << ", entry: " << ss.entryPoint << ", event: "
990 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
991 << " ]:\n\n";
992 if ( wasAlgError ) {
993 outputMS << "ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
996 outputMS << "\n\n";
997 } else {
998 // Snapshot of the Control Flow and FSM states in sub slot
999 outputMS << m_precSvc->printState( ss ) << "\n";
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 //===========================================================================
1007
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1011 }
1012
1013 outputMS << "\n=========================================================================================\n"
1014 << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 << "=========================================================================================\n\n";
1016
1017 info() << outputMS.str() << endmsg;
1018}
1019
1020//---------------------------------------------------------------------------
1021
1023
1024 // Check if a free Algorithm instance is available
1025 StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1026
1027 // If an instance is available, proceed to scheduling
1028 StatusCode sc;
1029 if ( getAlgSC.isSuccess() ) {
1030
1031 // Decide how to schedule the task and schedule it
1032 if ( -100 != m_threadPoolSize ) {
1033
1034 // Cache values before moving the TaskSpec further
1035 unsigned int algIndex{ ts.algIndex };
1036 std::string_view algName( ts.algName );
1037 unsigned int algRank{ ts.algRank };
1038 bool asynchronous{ ts.asynchronous };
1039 int slotIndex{ ts.slotIndex };
1040 EventContext* contextPtr{ ts.contextPtr };
1041
1042 if ( asynchronous ) {
1043 // Add to asynchronous scheduled queue
1044 m_scheduledAsynchronousQueue.push( std::move( ts ) );
1045
1046 // Schedule task
1047 m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1048 }
1049
1050 if ( !asynchronous ) {
1051 // Add the algorithm to the scheduled queue
1052 m_scheduledQueue.push( std::move( ts ) );
1053
1054 // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1055 m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1057 }
1058 sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1059
1060 ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1061 << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1062 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1064 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1065 : "" )
1066 << endmsg;
1067
1068 } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1069 // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1071 sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1072 AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1074 }
1075 } else { // if no Algorithm instance available, retry later
1076
1077 sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1078 // Add the algorithm to the retry queue
1079 m_retryQueue.push( std::move( ts ) );
1080 }
1081
1083
1084 return sc;
1085}
1086
1087//---------------------------------------------------------------------------
1088
1093
1094 Gaudi::Hive::setCurrentContext( ts.contextPtr );
1095
1097
1098 const AlgExecStateRef algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1099 AState state = algstate.execStatus().isSuccess()
1100 ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1101 : AState::ERROR;
1102
1103 // Update algorithm state and revise the downstream states
1104 auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1105
1106 ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1107 << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1108 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1110 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1111 : "" )
1112 << endmsg;
1113
1114 // Prompt a call to updateStates
1115 m_needsUpdate.store( true );
1116 return sc;
1117}
1118
1119//---------------------------------------------------------------------------
1120
1121// Method to inform the scheduler about event views
1122
1123StatusCode AvalancheSchedulerSvc::scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
1124 std::unique_ptr<EventContext> viewContext ) {
1125 // Prevent view nesting
1126 if ( sourceContext->usesSubSlot() ) {
1127 fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1128 return StatusCode::FAILURE;
1129 }
1130
1131 ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1132
1133 // It's not possible to create an std::functional from a move-capturing lambda
1134 // So, we have to release the unique pointer
1135 auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1136 &nodeName]() -> StatusCode {
1137 // Attach the sub-slot to the top-level slot
1138 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139
1140 if ( viewContextPtr ) {
1141 // Re-create the unique pointer
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.addSubSlot( std::move( viewContext ), nodeName );
1144 return StatusCode::SUCCESS;
1145 } else {
1146 // Disable the view node if there are no views
1147 topSlot.disableSubSlots( nodeName );
1148 return StatusCode::SUCCESS;
1149 }
1150 };
1151
1152 m_actionsQueue.push( std::move( action ) );
1153
1154 return StatusCode::SUCCESS;
1155}
1156
1157//---------------------------------------------------------------------------
1158
1159// Sample occupancy at fixed interval (ms)
1160// Negative value to deactivate, 0 to snapshot every change
1161// Each sample, apply the callback function to the result
1162
1163void AvalancheSchedulerSvc::recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) {
1164
1165 auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1166 if ( samplePeriod < 0 ) {
1167 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
1168 } else {
1169 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1170 m_snapshotCallback = std::move( callback );
1171 }
1172 return StatusCode::SUCCESS;
1173 };
1174
1175 m_actionsQueue.push( std::move( action ) );
1176}
1177
1178StatusCode AvalancheSchedulerSvc::dumpDataDepsGraphFile( const std::map<std::string, DataObjIDColl>& inDeps,
1179 const std::map<std::string, DataObjIDColl>& outDeps ) const {
1180 // Both maps should have the same algorithm entries
1181 assert( inDeps.size() == outDeps.size() );
1182
1184 info() << "Dumping data dependencies graph to file: " << g.fileName() << endmsg;
1185
1186 // define algs and objects
1187 std::set<std::size_t> definedObjects;
1188
1189 // Regex for selection of algs and objects
1190 std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1191 std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1192
1193 // inDeps and outDeps should have the same entries
1194 std::size_t algoIndex = 0ul;
1195 for ( const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) ) continue;
1197 std::string algIndex = "Alg_" + std::to_string( algoIndex );
1198 g.addNode( algIndex, algName );
1199
1200 // inputs
1201 for ( const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1203
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1206 if ( inserted ) g.addNode( objIndex, dep.key() );
1207
1208 g.addEdge( objIndex, algIndex );
1209 } // loop on ideps
1210
1211 const auto& odeps = outDeps.at( algName );
1212 for ( const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1214
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1217 if ( inserted ) g.addNode( objIndex, dep.key() );
1218
1219 g.addEdge( algIndex, objIndex );
1220 } // loop on odeps
1221
1222 ++algoIndex;
1223 } // loop on inDeps
1224
1225 return StatusCode::SUCCESS;
1226}
#define ON_VERBOSE
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition DataObjID.h:121
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
#define DECLARE_COMPONENT(type)
Provide serialization function (output only) for some common STL classes (vectors,...
wrapper on an Algorithm state.
const StatusCode & execStatus() const
bool filterPassed() const
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
const boost::container::flat_set< int > algsInState(State state) const
size_t sizeOfSubset(State state) const
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
StatusCode set(unsigned int iAlgo, State newState)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode dumpDataDepsGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
const DataObjIDColl & outputDataObjs() const override
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & inputDataObjs() const override
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
This class represents an entry point to all the event specific data.
ContextEvt_t evt() const
ContextID_t subSlot() const
ContextID_t slot() const
bool usesSubSlot() const
Base class from which all concrete algorithm classes should be derived.
Definition Algorithm.h:87
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
utilities to dump graphs in different formats
Definition GraphDumper.h:30
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
virtual const std::string & type() const =0
The type of the algorithm.
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
A service to resolve the task execution precedence.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
StatusCode finalize() override
Definition Service.cpp:223
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode initialize() override
Definition Service.cpp:118
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
bool isValid() const
Allow for check if smart pointer is valid.
Definition SmartIF.h:69
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
bool isFailure() const
Definition StatusCode.h:129
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition StatusCode.h:139
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
A service which initializes a TBB thread pool.
unsigned int getAlgoIndex() const
Get algorithm index.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
@ VERBOSE
Definition IMessageSvc.h:22
Struct to hold entries in the alg queues.
Class representing an event slot.
Definition EventSlot.h:23
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition EventSlot.h:82
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition EventSlot.h:99
bool complete
Flags completion of the event.
Definition EventSlot.h:88
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition EventSlot.h:60
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition EventSlot.h:48
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition EventSlot.h:84
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition EventSlot.h:77
std::string algorithm