#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>
The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).
Task precedence management
The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:
(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.
Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.
Scheduling principles
o Task scheduling prerequisites
A task is scheduled ASA all following conditions are met:
- if a control flow (CF) graph traversal reaches the task;
- when all data flow (DF) dependencies of the task are satisfied;
- when the DF-ready task pool parsing mechanism (*) considers it, and:
- a free (or re-entrant) algorithm instance to run within the task is available;
- there is a free computational resource to run the task.
o (*) Avalanche induction strategies
The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:
(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.
o Other mechanisms of throughput maximization
The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:
- I/O-bound tasks;
- tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
- synchronization-bound tasks.
Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.
- Author
- Illya Shapoval
- Version
- 1.0
Definition at line 114 of file AvalancheSchedulerSvc.h.
◆ action
◆ AState
◆ ActivationState
◆ activate()
void AvalancheSchedulerSvc::activate |
( |
| ) |
private |
Activate scheduler.
Activate the scheduler.
From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)
Definition at line 457 of file AvalancheSchedulerSvc.cpp.
459 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
462 error() <<
"problems initializing ThreadPoolSvc" <<
479 if ( sc.isFailure() )
480 verbose() <<
"Action did not succeed (which is not bad per se)." <<
490 if ( sc.isFailure() )
491 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
499 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
501 error() <<
"Problems terminating thread pool" <<
◆ algname2index()
unsigned int AvalancheSchedulerSvc::algname2index |
( |
const std::string & |
algoname | ) |
inlineprivate |
◆ deactivate()
Deactivate scheduler.
Deactivates the scheduler.
Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.
Definition at line 514 of file AvalancheSchedulerSvc.cpp.
◆ dumpGraphFile()
Definition at line 1177 of file AvalancheSchedulerSvc.cpp.
1180 assert( inDeps.
size() == outDeps.
size() );
1183 enum class FileType : short {
1190 fileExtension = FileType::DOT;
1192 fileExtension = FileType::MD;
1194 fileExtension = FileType::DOT;
1195 fileName = fileName +
1197 info() <<
"Dumping data dependencies graph to file: " << fileName <<
1207 if ( fileExtension == FileType::DOT ) {
1209 startGraph =
"digraph datadeps {\nrankdir=\"LR\";\n\n";
1210 stopGraph =
1213 return "Alg_" + idx +
" [label=\"" +
alg +
1217 return "obj_" +
std::to_string( obj.hash() ) +
" [label=\"" + obj.key() +
1229 startGraph =
"```mermaid\ngraph LR;\n\n";
1230 stopGraph =
1233 return "Alg_" + idx +
"{{" +
alg +
1237 return "obj_" +
std::to_string( obj.hash() ) +
">" + obj.key() +
1250 dataDepthGraphFile << startGraph;
1261 for (
const auto& [
name, ideps] : inDeps ) {
1266 for (
const auto& dep : ideps ) {
1269 const auto [itr, inserted] = definedObjects.
insert( dep.hash() );
1270 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1272 dataDepthGraphFile << defineInput( dep,
std::to_string( algoIndex ) );
1275 const auto& odeps = outDeps.
name );
1276 for (
const auto& dep : odeps ) {
1279 const auto [itr, inserted] = definedObjects.
insert( dep.hash() );
1280 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1282 dataDepthGraphFile << defineOutput(
std::to_string( algoIndex ), dep );
1289 dataDepthGraphFile << stopGraph;
1290 dataDepthGraphFile.close();
◆ dumpSchedulerState()
void AvalancheSchedulerSvc::dumpSchedulerState |
( |
int |
iSlot | ) |
private |
Dump the state of the scheduler.
Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.
Definition at line 879 of file AvalancheSchedulerSvc.cpp.
884 outputMS <<
"Dumping scheduler state\n"
885 <<
886 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
887 <<
891 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
892 <<
896 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
897 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
904 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
905 for ( uint algIndex : schedAlgs ) {
913 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
914 for ( uint algIndex : schedAlgs ) {
918 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
919 << slot.eventContext->slot();
922 if ( timelineSvc.isValid() ) {
925 te.slot = slot.eventContext->slot();
926 te.event = slot.eventContext->evt();
928 if ( timelineSvc->getTimelineEvent( te ) )
931 outputMS <<
" thread.id: [unknown]";
936 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
943 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
944 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
953 if ( slot.complete )
955 outputMS <<
"[ slot: "
956 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
958 << ( slot.eventContext->
valid() ?
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
960 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
961 outputMS <<
" ]:\n\n";
963 if ( 0 > iSlot || iSlot == slotCount ) {
967 outputMS <<
"ERROR alg(s):";
969 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
970 for ( uint algIndex : errorAlgs ) {
974 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
978 outputMS <<
m_precSvc->printState( slot ) <<
983 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
984 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
985 for (
auto& ss : slot.allSubSlots ) {
986 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
987 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
988 <<
", entry: " << ss.entryPoint <<
", event: "
992 outputMS <<
"ERROR alg(s):";
993 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
994 for ( uint algIndex : errorAlgs ) { outputMS <<
" " <<
index2algname( algIndex ); }
998 outputMS <<
m_precSvc->printState( ss ) <<
1007 if ( 0 <= iSlot && !wasAlgError ) {
1008 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1012 outputMS <<
1013 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1014 <<
◆ dumpState()
void AvalancheSchedulerSvc::dumpState |
( |
| ) |
override |
◆ eventFailed()
void AvalancheSchedulerSvc::eventFailed |
( |
EventContext * |
eventContext | ) |
private |
Method to execute if an event failed.
It can be possible that an event fails.
In this case this method is called. It dumps the state of the scheduler and marks the event as finished.
Definition at line 858 of file AvalancheSchedulerSvc.cpp.
859 const uint slotIdx = eventContext->
861 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
◆ finalize()
Here the scheduler is deactivated and the thread joined.
Definition at line 423 of file AvalancheSchedulerSvc.cpp.
426 if ( sc.isFailure() ) warning() <<
"Base class could not be finalized" <<
429 if ( sc.isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
431 debug() <<
"Deleting FiberManager" <<
434 info() <<
"Joining Scheduler thread" <<
439 error() <<
"problems in scheduler thread" <<
◆ freeSlots()
unsigned int AvalancheSchedulerSvc::freeSlots |
( |
| ) |
override |
◆ index2algname()
const std::string& AvalancheSchedulerSvc::index2algname |
( |
unsigned int |
index | ) |
inlineprivate |
◆ initialize()
Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.
In addition the algorithms list is acquired from the algResourcePool.
Definition at line 77 of file AvalancheSchedulerSvc.cpp.
81 if ( sc.isFailure() ) warning() <<
"Base class could not be initialized" <<
86 fatal() <<
"Error retrieving ThreadPoolSvc" <<
91 fatal() <<
"Cannot cast ThreadPoolSvc" <<
96 fatal() <<
"Cannot find valid TBB task_arena" <<
101 info() <<
"Activating scheduler in a separate thread" <<
102 std::binary_semaphore fiber_manager_initalized{ 0 };
106 fiber_manager_initalized.release();
110 fiber_manager_initalized.acquire();
114 fatal() <<
"Terminating initialization" <<
117 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
126 warning() <<
"No CondSvc found, or not enabled. "
127 <<
"Will not manage CondAlgorithms" <<
135 fatal() <<
"Error retrieving AlgoResourcePool" <<
141 fatal() <<
"Error retrieving AlgExecStateSvc" <<
148 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
160 const unsigned int algsNumber = algos.
161 if ( algsNumber != 0 ) {
162 info() <<
"Found " << algsNumber <<
" algorithms" <<
164 error() <<
"No algorithms found" <<
180 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
189 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
193 ostdd <<
"Data Dependencies for Algorithms:";
198 if (
nullptr == algoPtr ) {
199 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
200 <<
": this will result in a crash." <<
208 ostdd <<
"\n " << algoPtr->
210 auto write_owners = [&avis, &ostdd](
const DataObjID&
id ) {
211 auto owners = avis.owners_names_of(
id );
219 ostdd <<
"\n o INPUT " <<
221 algoDependencies.
id );
225 ostdd <<
"\n o OUTPUT " << *
227 if (
":" ) != std::string::npos ) {
228 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
236 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
243 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
253 for (
auto o : globalInp ) {
256 requiredInputKeys.
insert( o.key() );
257 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDepInp.
insert( o );
260 for (
auto o : globalOutp ) {
261 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
265 auto it = algosOutputDependenciesMap.
find( algoName );
266 if ( it != algosOutputDependenciesMap.
end() ) {
267 if ( it->second.find( o ) != it->second.end() ) {
273 if ( !ignored ) { unusedOutp.
insert( o ); }
280 if ( unmetDepInp.
size() > 0 ) {
282 auto printUnmet = [&](
auto msg ) {
283 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
284 msg <<
" o " << *o <<
" required by Algorithm: " <<
286 for (
const auto& p : algosInputDependenciesMap )
287 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
297 dataLoaderAlg = algo;
301 if ( dataLoaderAlg ==
nullptr ) {
303 <<
"\" found, and unmet INPUT dependencies "
305 printUnmet( fatal() );
309 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
310 << dataLoaderAlg->name() <<
"\" Algorithm" <<
311 printUnmet( info() );
316 fatal() <<
"Unable to dcast DataLoader \"" <<
value() <<
"\" IAlg to Gaudi::Algorithm"
321 for (
id : unmetDepInp ) {
322 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
323 << dataLoaderAlg->name() <<
328 fatal() <<
"Auto DataLoading not requested, "
329 <<
"and the following unmet INPUT dependencies were found:" <<
330 printUnmet( fatal() );
335 info() <<
"No unmet INPUT data dependencies were found" <<
340 if ( unusedOutp.
size() > 0 ) {
342 auto printUnusedOutp = [&](
auto msg ) {
343 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
344 msg <<
" o " << *o <<
" produced by Algorithm: " <<
346 for (
const auto& p : algosOutputDependenciesMap )
347 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
351 fatal() <<
"The following unused OUTPUT items were found:" <<
352 printUnusedOutp( fatal() );
355 info() <<
"No unused OUTPUT items were found" <<
362 fatal() <<
"Error retrieving PrecedenceSvc" <<
367 fatal() <<
"Unable to dcast PrecedenceSvc" <<
382 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
393 info() <<
"Concurrency level information:" <<
399 info() <<
"Task scheduling settings:" <<
400 info() <<
" o Avalanche generation mode: "
402 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
407 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
◆ isStalled()
bool AvalancheSchedulerSvc::isStalled |
( |
const EventSlot & |
slot | ) |
const |
private |
Check if scheduling in a particular slot is in a stall.
Check if we are in present of a stall condition for a particular slot.
This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.
Definition at line 840 of file AvalancheSchedulerSvc.cpp.
842 if ( !slot.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
843 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
◆ iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.
To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.
Definition at line 665 of file AvalancheSchedulerSvc.cpp.
671 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
678 OccupancySnapshot nextSnap;
683 if ( !thisSlot.eventContext )
685 int iSlot = thisSlot.eventContext->slot();
697 if ( nextSnap.states.empty() ) {
704 slotStateTotals.
resize( AState::MAXVALUE );
706 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
state ) );
710 for (
auto& subslot : thisSlot.allSubSlots ) {
712 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
state ) );
718 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
719 for ( uint algIndex : drAlgs ) {
722 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
725 schedule( TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
728 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
729 <<
" on processing slot " << iSlot <<
733 for (
auto& subslot : thisSlot.allSubSlots ) {
734 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
735 for ( uint algIndex : drAlgsSubSlot ) {
738 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
740 schedule( TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
746 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
758 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
759 !thisSlot.algsStates.containsAny(
761 !subSlotAlgsInStates( thisSlot,
763 !thisSlot.complete ) {
765 thisSlot.complete =
769 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
770 << thisSlot.eventContext->slot() <<
")." <<
777 thisSlot.eventContext.reset(
nullptr );
787 if ( !nextSnap.states.empty() ) {
◆ next()
bool AvalancheSchedulerSvc::next |
( |
TaskSpec & |
ts, |
bool |
asynchronous |
) |
| |
inline |
◆ popFinishedEvent()
Blocks until an event is available.
Get a finished event or block until one becomes available.
Definition at line 624 of file AvalancheSchedulerSvc.cpp.
636 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
◆ pushNewEvent()
Make an event available to the scheduler.
Add event to the scheduler.
There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.
Definition at line 545 of file AvalancheSchedulerSvc.cpp.
547 if ( !eventContext ) {
548 fatal() <<
"Event context is nullptr" <<
553 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
562 const unsigned int thisSlotNum = eventContext->
565 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
569 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
570 thisSlot.
reset( eventContext );
577 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
578 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
582 if ( this->
iterate().isFailure() ) {
583 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
592 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
◆ pushNewEvents()
◆ recordOccupancy()
void AvalancheSchedulerSvc::recordOccupancy |
( |
int |
samplePeriod, |
std::function< void(OccupancySnapshot)> |
callback |
) |
| |
overridevirtual |
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.
Definition at line 1162 of file AvalancheSchedulerSvc.cpp.
1165 if ( samplePeriod < 0 ) {
◆ revise()
Definition at line 799 of file AvalancheSchedulerSvc.cpp.
801 auto slotIndex = contextPtr->
807 auto subSlotIndex = contextPtr->
814 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
824 <<
", event:" << contextPtr->
evt() <<
"]" <<
◆ schedule()
Definition at line 1021 of file AvalancheSchedulerSvc.cpp.
1028 if ( getAlgSC.isSuccess() ) {
1034 unsigned int algIndex{
ts.algIndex };
1035 std::string_view algName(
ts.algName );
1036 unsigned int algRank{
ts.algRank };
1037 bool asynchronous{
ts.asynchronous };
1038 int slotIndex{
ts.slotIndex };
1041 if ( asynchronous ) {
1049 if ( !asynchronous ) {
1057 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1059 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1060 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1070 sc =
ts.contextPtr, AState::SCHEDULED );
1076 sc =
ts.contextPtr, AState::RESOURCELESS );
◆ scheduleEventView()
Method to inform the scheduler about event views.
Definition at line 1122 of file AvalancheSchedulerSvc.cpp.
1126 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
1134 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
1139 if ( viewContextPtr ) {
◆ signoff()
The call to this method is triggered only from within the AlgTask.
Definition at line 1091 of file AvalancheSchedulerSvc.cpp.
1099 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1105 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
1106 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
◆ tryPopFinishedEvent()
Try to fetch an event from the scheduler.
Try to get a finished event, if not available just return a failure.
Definition at line 645 of file AvalancheSchedulerSvc.cpp.
648 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
◆ AlgTask
◆ m_actionsQueue
tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue |
private |
◆ m_algExecStateSvc
◆ m_algname_index_map
◆ m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition at line 261 of file AvalancheSchedulerSvc.h.
◆ m_algosInFlight
unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0 |
private |
◆ m_algResourcePool
◆ m_arena
tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr } |
private |
◆ m_blockingAlgosInFlight
unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0 |
private |
◆ m_checkDeps
Initial value:{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }
Definition at line 196 of file AvalancheSchedulerSvc.h.
◆ m_checkOutput
Initial value:{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }
Definition at line 198 of file AvalancheSchedulerSvc.h.
◆ m_checkOutputIgnoreList
Initial value:{
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }
Definition at line 200 of file AvalancheSchedulerSvc.h.
◆ m_condSvc
◆ m_dataDepsGraphAlgoPattern
Initial value:{
this, "DataDepsGraphAlgPattern", ".*",
"Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
"deps graph" }
Definition at line 228 of file AvalancheSchedulerSvc.h.
◆ m_dataDepsGraphFile
Initial value:{
this, "DataDepsGraphFile", "",
"Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
"Algorithms" }
Definition at line 223 of file AvalancheSchedulerSvc.h.
◆ m_dataDepsGraphObjectPattern
Initial value:{
this, "DataDepsGraphObjectPattern", ".*",
"Regex pattern for selecting desired input or output by their full key" }
Definition at line 233 of file AvalancheSchedulerSvc.h.
◆ m_dumpIntraEventDynamics
Initial value:{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }
Definition at line 187 of file AvalancheSchedulerSvc.h.
◆ m_enableCondSvc
Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" } |
private |
◆ m_enablePreemptiveBlockingTasks
Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks |
private |
Initial value:{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }
Definition at line 189 of file AvalancheSchedulerSvc.h.
◆ m_eventSlots
◆ m_fiberManager
◆ m_finishedEvents
tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents |
private |
◆ m_freeSlots
std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 } |
private |
◆ m_isActive
◆ m_lastSnapshot
◆ m_maxAlgosInFlight
size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 } |
private |
◆ m_maxBlockingAlgosInFlight
Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight |
private |
Initial value:{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }
Definition at line 180 of file AvalancheSchedulerSvc.h.
◆ m_maxEventsInFlight
size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 } |
private |
◆ m_maxParallelismExtra
Initial value:{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }
Definition at line 175 of file AvalancheSchedulerSvc.h.
◆ m_needsUpdate
std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true } |
private |
◆ m_numOffloadThreads
Initial value:{
this, "NumOffloadThreads", 2,
"Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
"and use Boost Fiber functionality to suspend while waiting for offloaded work." }
Definition at line 192 of file AvalancheSchedulerSvc.h.
◆ m_optimizationMode
Initial value:{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }
Definition at line 185 of file AvalancheSchedulerSvc.h.
◆ m_precSvc
◆ m_retryQueue
◆ m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledAsynchronousQueue |
private |
◆ m_scheduledQueue
◆ m_showControlFlow
Initial value:{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }
Definition at line 218 of file AvalancheSchedulerSvc.h.
◆ m_showDataDeps
Initial value:{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }
Definition at line 212 of file AvalancheSchedulerSvc.h.
◆ m_showDataFlow
Initial value:{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }
Definition at line 215 of file AvalancheSchedulerSvc.h.
◆ m_simulateExecution
Initial value:{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }
Definition at line 182 of file AvalancheSchedulerSvc.h.
◆ m_snapshotCallback
std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback |
private |
◆ m_snapshotInterval
◆ m_thread
◆ m_threadPoolSize
Initial value:{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }
Definition at line 170 of file AvalancheSchedulerSvc.h.
◆ m_threadPoolSvc
◆ m_useDataLoader
Initial value:{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }
Definition at line 207 of file AvalancheSchedulerSvc.h.
◆ m_verboseSubSlots
Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" } |
private |
◆ m_whiteboard
◆ m_whiteboardSvcName
