The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
12#include "AlgTask.h"
13#include "FiberManager.h"
14#include "GraphDumper.h"
15#include "ThreadPoolSvc.h"
16
17// Framework includes
18#include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
25
26// C++
27#include <algorithm>
28#include <fstream>
29#include <map>
30#include <queue>
31#include <regex>
32#include <semaphore>
33#include <sstream>
34#include <string_view>
35#include <thread>
36#include <unordered_set>
37
38// External libs
39#include <boost/algorithm/string.hpp>
40#include <boost/thread.hpp>
41#include <boost/tokenizer.hpp>
42
43// Instantiation of a static factory class used by clients to create instances of this service
45
46#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47#define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
48
49namespace {
50 struct DataObjIDSorter {
51 bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
52 };
53
54 // Sort a DataObjIDColl in a well-defined, reproducible manner.
55 // Used for making debugging dumps.
56 std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
57 std::vector<const DataObjID*> v;
58 v.reserve( coll.size() );
59 for ( const DataObjID& id : coll ) v.push_back( &id );
60 std::sort( v.begin(), v.end(), DataObjIDSorter() );
61 return v;
62 }
63
64 bool subSlotAlgsInStates( const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
65 return std::any_of( slot.allSubSlots.begin(), slot.allSubSlots.end(),
66 [testStates]( const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
67 }
68} // namespace
69
70//---------------------------------------------------------------------------
71
77
79
80 // Initialise mother class (read properties, ...)
82 if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83
84 // Get hold of the TBBSvc. This should initialize the thread pool
85 m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86 if ( !m_threadPoolSvc.isValid() ) {
87 fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
89 }
90 auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
91 if ( !castTPS ) {
92 fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
94 }
95 m_arena = castTPS->getArena();
96 if ( !m_arena ) {
97 fatal() << "Cannot find valid TBB task_arena" << endmsg;
99 }
100
101 // Activate the scheduler in another thread.
102 info() << "Activating scheduler in a separate thread" << endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [this, &fiber_manager_initalized]() {
105 // Initialize FiberManager
106 this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
107 fiber_manager_initalized.release();
108 this->activate();
109 } );
110 // Wait for initialization to complete
111 fiber_manager_initalized.acquire();
112
113 while ( m_isActive != ACTIVE ) {
114 if ( m_isActive == FAILURE ) {
115 fatal() << "Terminating initialization" << endmsg;
116 return StatusCode::FAILURE;
117 } else {
118 ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
119 sleep( 1 );
120 }
121 }
122
123 if ( m_enableCondSvc ) {
124 // Get hold of the CondSvc
125 m_condSvc = serviceLocator()->service( "CondSvc" );
126 if ( !m_condSvc.isValid() ) {
127 warning() << "No CondSvc found, or not enabled. "
128 << "Will not manage CondAlgorithms" << endmsg;
129 m_enableCondSvc = false;
130 }
131 }
132
133 // Get the algo resource pool
134 m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
135 if ( !m_algResourcePool.isValid() ) {
136 fatal() << "Error retrieving AlgoResourcePool" << endmsg;
137 return StatusCode::FAILURE;
138 }
139
140 m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
141 if ( !m_algExecStateSvc.isValid() ) {
142 fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
143 return StatusCode::FAILURE;
144 }
145
146 // Get Whiteboard
148 if ( !m_whiteboard.isValid() ) {
149 fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
150 return StatusCode::FAILURE;
151 }
152
153 // Set the MaxEventsInFlight parameters from the number of WB stores
154 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
155
156 // Set the number of free slots
158
159 // Get the list of algorithms
160 const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() << "Found " << algsNumber << " algorithms" << endmsg;
164 } else {
165 error() << "No algorithms found" << endmsg;
166 return StatusCode::FAILURE;
167 }
168
169 /* Dependencies
170 1) Look for handles in algo, if none
171 2) Assume none are required
172 */
173
174 DataObjIDColl globalInp, globalOutp;
175
176 // figure out all outputs
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
178 for ( IAlgorithm* ialgoPtr : algos ) {
179 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180 if ( !algoPtr ) {
181 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
182 return StatusCode::FAILURE;
183 }
184
185 DataObjIDColl algoOutputs;
186 for ( auto id : algoPtr->outputDataObjs() ) {
187 globalOutp.insert( id );
188 algoOutputs.insert( id );
189 }
190 algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
191 }
192
193 std::ostringstream ostdd;
194 ostdd << "Data Dependencies for Algorithms:";
195
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
197 for ( IAlgorithm* ialgoPtr : algos ) {
198 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
199 if ( nullptr == algoPtr ) {
200 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201 << ": this will result in a crash." << endmsg;
202 return StatusCode::FAILURE;
203 }
204
205 DataObjIDColl i1, i2;
206 DHHVisitor avis( i1, i2 );
207 algoPtr->acceptDHVisitor( &avis );
208
209 ostdd << "\n " << algoPtr->name();
210
211 auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
212 auto owners = avis.owners_names_of( id );
213 if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
214 };
215
216 DataObjIDColl algoDependencies;
217 if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
218 for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
219 DataObjID id = *idp;
220 ostdd << "\n o INPUT " << id;
221 write_owners( id );
222 algoDependencies.insert( id );
223 globalInp.insert( id );
224 }
225 for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
226 ostdd << "\n o OUTPUT " << *id;
227 write_owners( *id );
228 if ( id->key().find( ":" ) != std::string::npos ) {
229 error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
230 << endmsg;
231 m_showDataDeps = true;
232 }
233 }
234 } else {
235 ostdd << "\n none";
236 }
237 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
238 }
239
240 if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
241
242 // If requested, dump a graph of the data dependencies in a .dot or .md file
243 if ( not m_dataDepsGraphFile.empty() ) {
244 if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
245 return StatusCode::FAILURE;
246 }
247 }
248
249 // Check if we have unmet global input dependencies, and, optionally, heal them
250 // WARNING: this step must be done BEFORE the Precedence Service is initialized
251 DataObjIDColl unmetDepInp, unusedOutp;
252 if ( m_checkDeps || m_checkOutput ) {
253 std::set<std::string> requiredInputKeys;
254 for ( auto o : globalInp ) {
255 // track aliases
256 // (assuming there should be no items with different class and same key corresponding to different objects)
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
259 }
260 if ( m_checkOutput ) {
261 for ( auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
263 // check ignores
264 bool ignored{};
265 for ( const std::string& algoName : m_checkOutputIgnoreList ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
269 ignored = true;
270 break;
271 }
272 }
273 }
274 if ( !ignored ) { unusedOutp.insert( o ); }
275 }
276 }
277 }
278 }
279
280 if ( m_checkDeps ) {
281 if ( unmetDepInp.size() > 0 ) {
282
283 auto printUnmet = [&]( auto msg ) {
284 for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg << " o " << *o << " required by Algorithm: " << endmsg;
286
287 for ( const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
289 }
290 };
291
292 if ( !m_useDataLoader.empty() ) {
293
294 // Find the DataLoader Alg
295 IAlgorithm* dataLoaderAlg( nullptr );
296 for ( IAlgorithm* algo : algos )
297 if ( m_useDataLoader == algo->name() ) {
298 dataLoaderAlg = algo;
299 break;
300 }
301
302 if ( dataLoaderAlg == nullptr ) {
303 fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
304 << "\" found, and unmet INPUT dependencies "
305 << "detected:" << endmsg;
306 printUnmet( fatal() );
307 return StatusCode::FAILURE;
308 }
309
310 info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
311 << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
312 printUnmet( info() );
313
314 // Set the property Load of DataLoader Alg
315 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
316 if ( !dataAlg ) {
317 fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
318 << endmsg;
319 return StatusCode::FAILURE;
320 }
321
322 for ( auto& id : unmetDepInp ) {
323 ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
324 << dataLoaderAlg->name() << endmsg;
326 }
327
328 } else {
329 fatal() << "Auto DataLoading not requested, "
330 << "and the following unmet INPUT dependencies were found:" << endmsg;
331 printUnmet( fatal() );
332 return StatusCode::FAILURE;
333 }
334
335 } else {
336 info() << "No unmet INPUT data dependencies were found" << endmsg;
337 }
338 }
339
340 if ( m_checkOutput ) {
341 if ( unusedOutp.size() > 0 ) {
342
343 auto printUnusedOutp = [&]( auto msg ) {
344 for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg << " o " << *o << " produced by Algorithm: " << endmsg;
346
347 for ( const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
349 }
350 };
351
352 fatal() << "The following unused OUTPUT items were found:" << endmsg;
353 printUnusedOutp( fatal() );
354 return StatusCode::FAILURE;
355 } else {
356 info() << "No unused OUTPUT items were found" << endmsg;
357 }
358 }
359
360 // Get the precedence service
361 m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
362 if ( !m_precSvc.isValid() ) {
363 fatal() << "Error retrieving PrecedenceSvc" << endmsg;
364 return StatusCode::FAILURE;
365 }
366 const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
367 if ( !precSvc ) {
368 fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
369 return StatusCode::FAILURE;
370 }
371
372 // Fill the containers to convert algo names to index
373 m_algname_vect.resize( algsNumber );
374 for ( IAlgorithm* algo : algos ) {
375 const std::string& name = algo->name();
376 auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
377 m_algname_index_map[name] = index;
378 m_algname_vect.at( index ) = name;
379 }
380
381 // Shortcut for the message service
382 SmartIF<IMessageSvc> messageSvc( serviceLocator() );
383 if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
384
386 for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
387 m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
388 m_eventSlots.back().complete = true;
389 }
390
392
393 // Clearly inform about the level of concurrency
394 info() << "Concurrency level information:" << endmsg;
395 info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
396 info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
397 info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
398
399 // Inform about task scheduling prescriptions
400 info() << "Task scheduling settings:" << endmsg;
401 info() << " o Avalanche generation mode: "
402 << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
403 info() << " o Preemptive scheduling of CPU-blocking tasks: "
405 ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
406 : "disabled" )
407 << endmsg;
408 info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
409
410 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
411
412 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
413
414 // Simulate execution flow
415 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
416
417 return sc;
418}
419//---------------------------------------------------------------------------
420
425
427 if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
428
429 sc = deactivate();
430 if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
431
432 debug() << "Deleting FiberManager" << endmsg;
433 m_fiberManager.reset();
434
435 info() << "Joining Scheduler thread" << endmsg;
436 m_thread.join();
437
438 // Final error check after thread pool termination
439 if ( m_isActive == FAILURE ) {
440 error() << "problems in scheduler thread" << endmsg;
441 return StatusCode::FAILURE;
442 }
443
444 return sc;
445}
446//---------------------------------------------------------------------------
447
459
460 ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
461
462 if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
463 error() << "problems initializing ThreadPoolSvc" << endmsg;
465 return;
466 }
467
468 // Wait for actions pushed into the queue by finishing tasks.
469 action thisAction;
471
473
474 // Continue to wait if the scheduler is running or there is something to do
475 ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
476 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
477 m_actionsQueue.pop( thisAction );
478 sc = thisAction();
479 ON_VERBOSE {
480 if ( sc.isFailure() )
481 verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
482 else
483 verbose() << "Action succeeded." << endmsg;
484 }
485 else sc.ignore();
486
487 // If all queued actions have been processed, update the slot states
488 if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
489 sc = iterate();
490 ON_VERBOSE {
491 if ( sc.isFailure() )
492 verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
493 else
494 verbose() << "Iteration succeeded." << endmsg;
495 }
496 else sc.ignore();
497 }
498 }
499
500 ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
501 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
502 error() << "Problems terminating thread pool" << endmsg;
504 }
505}
506
507//---------------------------------------------------------------------------
508
516
517 if ( m_isActive == ACTIVE ) {
518
519 // Set the number of slots available to an error code
520 m_freeSlots.store( 0 );
521
522 // Empty queue
523 action thisAction;
524 while ( m_actionsQueue.try_pop( thisAction ) ) {};
525
526 // This would be the last action
527 m_actionsQueue.push( [this]() -> StatusCode {
528 ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
530 return StatusCode::SUCCESS;
531 } );
532 }
533
534 return StatusCode::SUCCESS;
535}
536
537//---------------------------------------------------------------------------
538
539// EventSlot management
547
548 if ( !eventContext ) {
549 fatal() << "Event context is nullptr" << endmsg;
550 return StatusCode::FAILURE;
551 }
552
553 if ( m_freeSlots.load() == 0 ) {
554 ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
555 return StatusCode::FAILURE;
556 }
557
558 // no problem as push new event is only called from one thread (event loop manager)
559 --m_freeSlots;
560
561 auto action = [this, eventContext]() -> StatusCode {
562 // Event processing slot forced to be the same as the wb slot
563 const unsigned int thisSlotNum = eventContext->slot();
564 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
565 if ( !thisSlot.complete ) {
566 fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
567 return StatusCode::FAILURE;
568 }
569
570 ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
571 thisSlot.reset( eventContext );
572
573 // Result status code:
575
576 // promote to CR and DR the initial set of algorithms
577 Cause cs = { Cause::source::Root, "RootDecisionHub" };
578 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
580 result = StatusCode::FAILURE;
581 }
582
583 if ( this->iterate().isFailure() ) {
584 error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
585 result = StatusCode::FAILURE;
586 }
587
588 return result;
589 }; // end of lambda
590
591 // Kick off scheduling
592 ON_VERBOSE {
593 verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
594 verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
595 }
596
597 m_actionsQueue.push( action );
598
599 return StatusCode::SUCCESS;
600}
601
602//---------------------------------------------------------------------------
603
604StatusCode AvalancheSchedulerSvc::pushNewEvents( std::vector<EventContext*>& eventContexts ) {
605 StatusCode sc;
606 for ( auto context : eventContexts ) {
607 sc = pushNewEvent( context );
608 if ( sc != StatusCode::SUCCESS ) return sc;
609 }
610 return sc;
611}
612
613//---------------------------------------------------------------------------
614
615unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
616
617//---------------------------------------------------------------------------
618
620
621//---------------------------------------------------------------------------
626
627 // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
628 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
629 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
630 // << " active: " << m_isActive << endmsg;
631 return StatusCode::FAILURE;
632 } else {
633 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
634 // << " active: " << m_isActive << endmsg;
635 m_finishedEvents.pop( eventContext );
636 ++m_freeSlots;
637 ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
638 return StatusCode::SUCCESS;
639 }
640}
641
642//---------------------------------------------------------------------------
647
648 if ( m_finishedEvents.try_pop( eventContext ) ) {
649 ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
650 << endmsg;
651 ++m_freeSlots;
652 return StatusCode::SUCCESS;
653 }
654 return StatusCode::FAILURE;
655}
656
657//--------------------------------------------------------------------------
658
667
668 StatusCode global_sc( StatusCode::SUCCESS );
669
670 // Retry algorithms
671 const size_t retries = m_retryQueue.size();
672 for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
673 TaskSpec retryTS = std::move( m_retryQueue.front() );
674 m_retryQueue.pop();
675 global_sc = schedule( std::move( retryTS ) );
676 }
677
678 // Loop over all slots
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
681 for ( EventSlot& thisSlot : m_eventSlots ) {
682
683 // Ignore slots without a valid context (relevant when populating scheduler for first time)
684 if ( !thisSlot.eventContext ) continue;
685
686 int iSlot = thisSlot.eventContext->slot();
687
688 // Cache the states of the algorithms to improve readability and performance
689 AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
690
691 StatusCode partial_sc = StatusCode::FAILURE;
692
693 // Make an occupancy snapshot
694 if ( m_snapshotInterval != std::chrono::duration<int64_t, std::milli>::min() &&
696
697 // Initialise snapshot
698 if ( nextSnap.states.empty() ) {
699 nextSnap.time = now;
700 nextSnap.states.resize( m_eventSlots.size() );
701 }
702
703 // Store alg states
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
706 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707 slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
708 }
709
710 // Add subslot alg states
711 for ( auto& subslot : thisSlot.allSubSlots ) {
712 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713 slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
714 }
715 }
716 }
717
718 // Perform DR->SCHEDULED
719 const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
721 const std::string& algName{ index2algname( algIndex ) };
722 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
723 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
724
725 partial_sc =
726 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
727
728 ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
729 << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
730 << " on processing slot " << iSlot << endmsg;
731 }
732
733 // Check for algorithms ready in sub-slots
734 for ( auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
737 const std::string& algName{ index2algname( algIndex ) };
738 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
739 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
740 partial_sc =
741 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
742 }
743 }
744
746 std::stringstream s;
747 s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
748 << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
749 << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
750 auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
753 myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
754 myfile << s.str();
755 myfile.close();
756 }
757
758 // Not complete because this would mean that the slot is already free!
759 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
765
766 thisSlot.complete = true;
767 // if the event did not fail, add it to the finished events
768 // otherwise it is taken care of in the error handling
769 if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
770 ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
771 << thisSlot.eventContext->slot() << ")." << endmsg;
772 m_finishedEvents.push( thisSlot.eventContext.release() );
773 }
774
775 // now let's return the fully evaluated result of the control flow
776 ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
777
778 thisSlot.eventContext.reset( nullptr );
779
780 } else if ( isStalled( thisSlot ) ) {
781 m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
782 eventFailed( thisSlot.eventContext.get() ); // can't release yet
783 }
784 partial_sc.ignore();
785 } // end loop on slots
786
787 // Process snapshot
788 if ( !nextSnap.states.empty() ) {
789 m_lastSnapshot = nextSnap.time;
790 m_snapshotCallback( std::move( nextSnap ) );
791 }
792
793 ON_VERBOSE verbose() << "Iteration done." << endmsg;
794 m_needsUpdate.store( false );
795 return global_sc;
796}
797
798//---------------------------------------------------------------------------
799// Update algorithm state and, optionally, revise states of other downstream algorithms
800StatusCode AvalancheSchedulerSvc::revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate ) {
801 StatusCode sc;
802 auto slotIndex = contextPtr->slot();
803 EventSlot& slot = m_eventSlots[slotIndex];
804 Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
805
806 if ( contextPtr->usesSubSlot() ) {
807 // Sub-slot
808 auto subSlotIndex = contextPtr->subSlot();
809 EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
810
811 sc = subSlot.algsStates.set( iAlgo, state );
812
813 if ( sc.isSuccess() ) {
814 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
815 << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
816 // Revise states of algorithms downstream the precedence graph
817 if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
818 }
819 } else {
820 // Event level (standard behaviour)
821 sc = slot.algsStates.set( iAlgo, state );
822
823 if ( sc.isSuccess() ) {
824 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
825 << ", event:" << contextPtr->evt() << "]" << endmsg;
826 // Revise states of algorithms downstream the precedence graph
827 if ( iterate ) sc = m_precSvc->iterate( slot, cs );
828 }
829 }
830 return sc;
831}
832
833//---------------------------------------------------------------------------
834
842
843 if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845
846 error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
847
848 return true;
849 }
850 return false;
851}
852
853//---------------------------------------------------------------------------
854
860 const uint slotIdx = eventContext->slot();
861
862 error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
863
864 dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
865
866 // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
867 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
868
869 // Push into the finished events queue the failed context
870 m_eventSlots[slotIdx].complete = true;
871 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
872}
873
874//---------------------------------------------------------------------------
875
881
882 // To have just one big message
883 std::ostringstream outputMS;
884
885 outputMS << "Dumping scheduler state\n"
886 << "=========================================================================================\n"
887 << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 << "=========================================================================================\n\n";
889
890 //===========================================================================
891
892 outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 << "------------------\n\n";
894
895 // Figure if TimelineSvc is available (used below to detect threads IDs)
896 auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
899 } else {
900
901 // Figure optimal printout layout
902 size_t indt( 0 );
903 for ( auto& slot : m_eventSlots ) {
904
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
907 if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
908 }
909 }
910
911 // Figure the last running schedule across all slots
912 for ( auto& slot : m_eventSlots ) {
913
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
916
917 const std::string& algoName{ index2algname( algIndex ) };
918
919 outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
920 << slot.eventContext->slot();
921
922 // Try to get POSIX threads IDs the currently running tasks are scheduled to
923 if ( timelineSvc.isValid() ) {
924 TimelineEvent te{};
925 te.algorithm = algoName;
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
928
929 if ( timelineSvc->getTimelineEvent( te ) )
930 outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
931 else
932 outputMS << " thread.id: [unknown]"; // this means a task has just
933 // been signed off as SCHEDULED,
934 // but has not been assigned to a thread yet
935 // (i.e., not running yet)
936 }
937 outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
938 }
939 }
940 }
941
942 //===========================================================================
943
944 outputMS << "\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
946
947 int slotCount = -1;
948 bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949 subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
950 : false;
951
952 for ( auto& slot : m_eventSlots ) {
953 ++slotCount;
954 if ( slot.complete ) continue;
955
956 outputMS << "[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
958 << ", event: "
959 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
960
961 if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
962 outputMS << " ]:\n\n";
963
964 if ( 0 > iSlot || iSlot == slotCount ) {
965
966 // If an alg has thrown an error then it's not a failure of the CF/DF graph
967 if ( wasAlgError ) {
968 outputMS << "ERROR alg(s):";
969 int errorCount = 0;
970 const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
972 outputMS << " " << index2algname( algIndex );
973 ++errorCount;
974 }
975 if ( errorCount == 0 ) outputMS << " in subslot(s)";
976 outputMS << "\n\n";
977 } else {
978 // Snapshot of the Control Flow and FSM states
979 outputMS << m_precSvc->printState( slot ) << "\n";
980 }
981
982 // Mention sub slots (this is expensive if the number of sub-slots is high)
983 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984 outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
986 for ( auto& ss : slot.allSubSlots ) {
987 outputMS << "[ slot: " << slotID << ", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
989 << ", entry: " << ss.entryPoint << ", event: "
990 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
991 << " ]:\n\n";
992 if ( wasAlgError ) {
993 outputMS << "ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
996 outputMS << "\n\n";
997 } else {
998 // Snapshot of the Control Flow and FSM states in sub slot
999 outputMS << m_precSvc->printState( ss ) << "\n";
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 //===========================================================================
1007
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1011 }
1012
1013 outputMS << "\n=========================================================================================\n"
1014 << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 << "=========================================================================================\n\n";
1016
1017 info() << outputMS.str() << endmsg;
1018}
1019
1020//---------------------------------------------------------------------------
1021
1023
1024 // Check if a free Algorithm instance is available
1025 StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1026
1027 // If an instance is available, proceed to scheduling
1028 StatusCode sc;
1029 if ( getAlgSC.isSuccess() ) {
1030
1031 // Decide how to schedule the task and schedule it
1032 if ( -100 != m_threadPoolSize ) {
1033
1034 // Cache values before moving the TaskSpec further
1035 unsigned int algIndex{ ts.algIndex };
1036 std::string_view algName( ts.algName );
1037 unsigned int algRank{ ts.algRank };
1038 bool asynchronous{ ts.asynchronous };
1039 int slotIndex{ ts.slotIndex };
1040 EventContext* contextPtr{ ts.contextPtr };
1041
1042 if ( asynchronous ) {
1043 // Add to asynchronous scheduled queue
1044 m_scheduledAsynchronousQueue.push( std::move( ts ) );
1045
1046 // Schedule task
1047 m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1048 }
1049
1050 if ( !asynchronous ) {
1051 // Add the algorithm to the scheduled queue
1052 m_scheduledQueue.push( std::move( ts ) );
1053
1054 // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1055 m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1057 }
1058 sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1059
1060 ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1061 << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1062 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1064 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1065 : "" )
1066 << endmsg;
1067
1068 } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1069 // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1071 sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1072 AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1074 }
1075 } else { // if no Algorithm instance available, retry later
1076
1077 sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1078 // Add the algorithm to the retry queue
1079 m_retryQueue.push( std::move( ts ) );
1080 }
1081
1083
1084 return sc;
1085}
1086
1087//---------------------------------------------------------------------------
1088
1093
1094 Gaudi::Hive::setCurrentContext( ts.contextPtr );
1095
1097
1098 const AlgExecStateRef algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1099 AState state = algstate.execStatus().isSuccess()
1100 ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1101 : AState::ERROR;
1102
1103 // Update algorithm state and revise the downstream states
1104 auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1105
1106 ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1107 << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1108 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1110 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1111 : "" )
1112 << endmsg;
1113
1114 // Prompt a call to updateStates
1115 m_needsUpdate.store( true );
1116 return sc;
1117}
1118
1119//---------------------------------------------------------------------------
1120
1121// Method to inform the scheduler about event views
1122
1123StatusCode AvalancheSchedulerSvc::scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
1124 std::unique_ptr<EventContext> viewContext ) {
1125 // Prevent view nesting
1126 if ( sourceContext->usesSubSlot() ) {
1127 fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1128 return StatusCode::FAILURE;
1129 }
1130
1131 ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1132
1133 // It's not possible to create an std::functional from a move-capturing lambda
1134 // So, we have to release the unique pointer
1135 auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1136 &nodeName]() -> StatusCode {
1137 // Attach the sub-slot to the top-level slot
1138 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139
1140 if ( viewContextPtr ) {
1141 // Re-create the unique pointer
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.addSubSlot( std::move( viewContext ), nodeName );
1144 return StatusCode::SUCCESS;
1145 } else {
1146 // Disable the view node if there are no views
1147 topSlot.disableSubSlots( nodeName );
1148 return StatusCode::SUCCESS;
1149 }
1150 };
1151
1152 m_actionsQueue.push( std::move( action ) );
1153
1154 return StatusCode::SUCCESS;
1155}
1156
1157//---------------------------------------------------------------------------
1158
1159// Sample occupancy at fixed interval (ms)
1160// Negative value to deactivate, 0 to snapshot every change
1161// Each sample, apply the callback function to the result
1162
1163void AvalancheSchedulerSvc::recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) {
1164
1165 auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1166 if ( samplePeriod < 0 ) {
1167 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
1168 } else {
1169 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1170 m_snapshotCallback = std::move( callback );
1171 }
1172 return StatusCode::SUCCESS;
1173 };
1174
1175 m_actionsQueue.push( std::move( action ) );
1176}
1177
1178StatusCode AvalancheSchedulerSvc::dumpGraphFile( const std::map<std::string, DataObjIDColl>& inDeps,
1179 const std::map<std::string, DataObjIDColl>& outDeps ) const {
1180 // Both maps should have the same algorithm entries
1181 assert( inDeps.size() == outDeps.size() );
1182
1184 info() << "Dumping data dependencies graph to file: " << g.fileName() << endmsg;
1185
1186 // define algs and objects
1187 std::set<std::size_t> definedObjects;
1188
1189 // Regex for selection of algs and objects
1190 std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1191 std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1192
1193 // inDeps and outDeps should have the same entries
1194 std::size_t algoIndex = 0ul;
1195 for ( const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) ) continue;
1197 std::string algIndex = "Alg_" + std::to_string( algoIndex );
1198 g.addNode( algName, algIndex );
1199
1200 // inputs
1201 for ( const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1203
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1206 if ( inserted ) g.addNode( dep.key(), objIndex );
1207
1208 g.addEdge( dep.key(), objIndex, algName, algIndex );
1209 } // loop on ideps
1210
1211 const auto& odeps = outDeps.at( algName );
1212 for ( const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1214
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1217 if ( inserted ) g.addNode( dep.key(), objIndex );
1218
1219 g.addEdge( algName, algIndex, dep.key(), objIndex );
1220 } // loop on odeps
1221
1222 ++algoIndex;
1223 } // loop on inDeps
1224
1225 return StatusCode::SUCCESS;
1226}
#define ON_VERBOSE
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition DataObjID.h:121
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
#define DECLARE_COMPONENT(type)
Provide serialization function (output only) for some common STL classes (vectors,...
wrapper on an Algorithm state.
const StatusCode & execStatus() const
bool filterPassed() const
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
const boost::container::flat_set< int > algsInState(State state) const
size_t sizeOfSubset(State state) const
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
StatusCode set(unsigned int iAlgo, State newState)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
const DataObjIDColl & outputDataObjs() const override
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & inputDataObjs() const override
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
This class represents an entry point to all the event specific data.
ContextEvt_t evt() const
ContextID_t subSlot() const
ContextID_t slot() const
bool usesSubSlot() const
Base class from which all concrete algorithm classes should be derived.
Definition Algorithm.h:87
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
utilities to dump graphs in different formats
Definition GraphDumper.h:30
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
virtual const std::string & type() const =0
The type of the algorithm.
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
A service to resolve the task execution precedence.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
StatusCode finalize() override
Definition Service.cpp:223
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode initialize() override
Definition Service.cpp:118
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
bool isValid() const
Allow for check if smart pointer is valid.
Definition SmartIF.h:69
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
bool isFailure() const
Definition StatusCode.h:129
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition StatusCode.h:139
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
A service which initializes a TBB thread pool.
unsigned int getAlgoIndex() const
Get algorithm index.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
@ VERBOSE
Definition IMessageSvc.h:22
Struct to hold entries in the alg queues.
Class representing an event slot.
Definition EventSlot.h:23
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition EventSlot.h:82
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition EventSlot.h:99
bool complete
Flags completion of the event.
Definition EventSlot.h:88
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition EventSlot.h:60
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition EventSlot.h:48
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition EventSlot.h:84
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition EventSlot.h:77
std::string algorithm