#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>
|
StatusCode | initialize () override |
| Initialise. More...
|
|
StatusCode | finalize () override |
| Finalise. More...
|
|
StatusCode | pushNewEvent (EventContext *eventContext) override |
| Make an event available to the scheduler. More...
|
|
StatusCode | pushNewEvents (std::vector< EventContext * > &eventContexts) override |
|
StatusCode | popFinishedEvent (EventContext *&eventContext) override |
| Blocks until an event is available. More...
|
|
StatusCode | tryPopFinishedEvent (EventContext *&eventContext) override |
| Try to fetch an event from the scheduler. More...
|
|
unsigned int | freeSlots () override |
| Get free slots number. More...
|
|
void | dumpState () override |
| Dump scheduler state for all slots. More...
|
|
virtual StatusCode | scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override |
| Method to inform the scheduler about event views. More...
|
|
virtual void | recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override |
| Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result. More...
|
|
bool | next (TaskSpec &ts, bool asynchronous) |
|
void const * | i_cast (const InterfaceID &tid) const override |
| Implementation of IInterface::i_cast. More...
|
|
StatusCode | queryInterface (const InterfaceID &ti, void **pp) override |
| Implementation of IInterface::queryInterface. More...
|
|
std::vector< std::string > | getInterfaceNames () const override |
| Implementation of IInterface::getInterfaceNames. More...
|
|
const std::string & | name () const override |
| Retrieve name of the service
More...
|
|
StatusCode | configure () override |
|
StatusCode | initialize () override |
|
StatusCode | start () override |
|
StatusCode | stop () override |
|
StatusCode | finalize () override |
|
StatusCode | terminate () override |
|
Gaudi::StateMachine::State | FSMState () const override |
|
Gaudi::StateMachine::State | targetFSMState () const override |
|
StatusCode | reinitialize () override |
|
StatusCode | restart () override |
|
StatusCode | sysInitialize () override |
| Initialize Service
More...
|
|
StatusCode | sysStart () override |
| Initialize Service
More...
|
|
StatusCode | sysStop () override |
| Initialize Service
More...
|
|
StatusCode | sysFinalize () override |
| Finalize Service
More...
|
|
StatusCode | sysReinitialize () override |
| Re-initialize the Service. More...
|
|
StatusCode | sysRestart () override |
| Re-initialize the Service. More...
|
|
| Service (std::string name, ISvcLocator *svcloc) |
| Standard Constructor
More...
|
|
SmartIF< ISvcLocator > & | serviceLocator () const override |
| Retrieve pointer to service locator
More...
|
|
template<typename IFace = IService> |
SmartIF< IFace > | service (const std::string &name, bool createIf=true) const |
|
template<class T > |
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none") |
|
template<class T > |
StatusCode | declareTool (ToolHandle< T > &handle, bool createIf=true) |
|
template<class T > |
StatusCode | declareTool (ToolHandle< T > &handle, const std::string &toolTypeAndName, bool createIf=true) |
| Declare used tool. More...
|
|
template<class T > |
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none") |
|
template<class T > |
void | addToolsArray (ToolHandleArray< T > &hndlArr) |
|
const std::vector< IAlgTool * > & | tools () const |
|
SmartIF< IAuditorSvc > & | auditorSvc () const |
| The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
|
|
| PropertyHolder ()=default |
|
Gaudi::Details::PropertyBase & | declareProperty (Gaudi::Details::PropertyBase &prop) |
| Declare a property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none") |
| Declare a PropertyBase instance setting name and documentation. More...
|
|
| requires (!Gaudi::Details::is_gaudi_property_v< TYPE >) Gaudi |
| Helper to wrap a regular data member and use it as a regular property. More...
|
|
Gaudi::Details::PropertyBase * | declareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="") |
| Declare a remote property. More...
|
|
StatusCode | setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override |
| set the property from another property with a different name More...
|
|
StatusCode | setProperty (const std::string &s) override |
| set the property from the formatted string More...
|
|
StatusCode | setProperty (const Gaudi::Details::PropertyBase &p) |
| Set the property from a property. More...
|
|
virtual StatusCode | setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p)=0 |
| Set the property from a property with a different name. More...
|
|
virtual StatusCode | setProperty (const std::string &s)=0 |
| Set the property by string. More...
|
|
StatusCode | setProperty (const std::string &name, const char *v) |
| Special case for string literals. More...
|
|
StatusCode | setProperty (const std::string &name, const std::string &v) |
| Special case for std::string. More...
|
|
StatusCode | setPropertyRepr (const std::string &n, const std::string &r) override |
| set the property from name and value string representation More...
|
|
StatusCode | getProperty (Gaudi::Details::PropertyBase *p) const override |
| get the property More...
|
|
const Gaudi::Details::PropertyBase & | getProperty (std::string_view name) const override |
| get the property by name More...
|
|
StatusCode | getProperty (std::string_view n, std::string &v) const override |
| convert the property to the string More...
|
|
const std::vector< Gaudi::Details::PropertyBase * > & | getProperties () const override |
| get all properties More...
|
|
bool | hasProperty (std::string_view name) const override |
| Return true if we have a property with the given name. More...
|
|
Gaudi::Details::PropertyBase * | property (std::string_view name) const |
| \fixme property and bindPropertiesTo should be protected More...
|
|
void | bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc) |
|
| PropertyHolder (const PropertyHolder &)=delete |
|
PropertyHolder & | operator= (const PropertyHolder &)=delete |
|
MSG::Level | msgLevel () const |
| get the cached level (originally extracted from the embedded MsgStream) More...
|
|
bool | msgLevel (MSG::Level lvl) const |
| get the output level from the embedded MsgStream More...
|
|
|
std::chrono::duration< int64_t, std::milli > | m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min() |
|
std::chrono::system_clock::time_point | m_lastSnapshot = std::chrono::system_clock::now() |
|
std::function< void(OccupancySnapshot)> | m_snapshotCallback |
|
Gaudi::Property< int > | m_threadPoolSize |
|
Gaudi::Property< int > | m_maxParallelismExtra |
|
Gaudi::Property< std::string > | m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" } |
|
Gaudi::Property< unsigned int > | m_maxBlockingAlgosInFlight |
|
Gaudi::Property< bool > | m_simulateExecution |
|
Gaudi::Property< std::string > | m_optimizationMode |
|
Gaudi::Property< bool > | m_dumpIntraEventDynamics |
|
Gaudi::Property< bool > | m_enablePreemptiveBlockingTasks |
|
Gaudi::Property< int > | m_numOffloadThreads |
|
Gaudi::Property< bool > | m_checkDeps |
|
Gaudi::Property< bool > | m_checkOutput |
|
Gaudi::Property< std::vector< std::string > > | m_checkOutputIgnoreList |
|
Gaudi::Property< std::string > | m_useDataLoader |
|
Gaudi::Property< bool > | m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" } |
|
Gaudi::Property< bool > | m_showDataDeps |
|
Gaudi::Property< bool > | m_showDataFlow |
|
Gaudi::Property< bool > | m_showControlFlow |
|
Gaudi::Property< bool > | m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" } |
|
Gaudi::Property< std::string > | m_dataDepsGraphFile |
|
Gaudi::Property< std::string > | m_dataDepsGraphAlgoPattern |
|
Gaudi::Property< std::string > | m_dataDepsGraphObjectPattern |
|
std::atomic< ActivationState > | m_isActive { INACTIVE } |
| Flag to track if the scheduler is active or not. More...
|
|
std::thread | m_thread |
| The thread in which the activate function runs. More...
|
|
std::unordered_map< std::string, unsigned int > | m_algname_index_map |
| Map to bookkeep the information necessary to the name2index conversion. More...
|
|
std::vector< std::string > | m_algname_vect |
| Vector to bookkeep the information necessary to the index2name conversion. More...
|
|
SmartIF< IPrecedenceSvc > | m_precSvc |
| A shortcut to the Precedence Service. More...
|
|
SmartIF< IHiveWhiteBoard > | m_whiteboard |
| A shortcut to the whiteboard. More...
|
|
std::vector< EventSlot > | m_eventSlots |
| Vector of events slots. More...
|
|
std::atomic_int | m_freeSlots { 0 } |
| Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
|
|
tbb::concurrent_bounded_queue< EventContext * > | m_finishedEvents |
| Queue of finished events. More...
|
|
SmartIF< IAlgExecStateSvc > | m_algExecStateSvc |
| Algorithm execution state manager. More...
|
|
SmartIF< ICondSvc > | m_condSvc |
| A shortcut to service for Conditions handling. More...
|
|
unsigned int | m_algosInFlight = 0 |
| Number of algorithms presently in flight. More...
|
|
unsigned int | m_blockingAlgosInFlight = 0 |
| Number of algorithms presently in flight. More...
|
|
SmartIF< IAlgResourcePool > | m_algResourcePool |
| Cache for the algorithm resource pool. More...
|
|
tbb::concurrent_bounded_queue< action > | m_actionsQueue |
| Queue where closures are stored and picked for execution. More...
|
|
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > | m_scheduledQueue |
| Queues for scheduled algorithms. More...
|
|
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > | m_scheduledAsynchronousQueue |
|
std::queue< TaskSpec > | m_retryQueue |
|
std::atomic< bool > | m_needsUpdate { true } |
|
SmartIF< IThreadPoolSvc > | m_threadPoolSvc |
|
tbb::task_arena * | m_arena { nullptr } |
|
std::unique_ptr< FiberManager > | m_fiberManager { nullptr } |
|
size_t | m_maxEventsInFlight { 0 } |
|
size_t | m_maxAlgosInFlight { 1 } |
|
|
using | base_class = extends |
| Typedef to this class. More...
|
|
using | extend_interfaces_base = extend_interfaces< Interfaces... > |
| Typedef to the base of this class. More...
|
|
using | Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)> |
|
using | PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > > |
| Typedef used to refer to this class from derived classes, as in. More...
|
|
using | base_class = CommonMessaging |
|
using | ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type |
| take union of the ext_iids of all Interfaces... More...
|
|
std::vector< IAlgTool * > & | tools () |
|
| ~Service () override |
|
int | outputLevel () const |
| get the Service's output level More...
|
|
MSG::Level | setUpMessaging () const |
| Set up local caches. More...
|
|
MSG::Level | resetMessaging () |
| Reinitialize internal states. More...
|
|
void | updateMsgStreamOutputLevel (int level) |
| Update the output level of the cached MsgStream. More...
|
|
Gaudi::StateMachine::State | m_state = Gaudi::StateMachine::OFFLINE |
| Service state
More...
|
|
Gaudi::StateMachine::State | m_targetState = Gaudi::StateMachine::OFFLINE |
| Service state
More...
|
|
Gaudi::Property< int > | m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" } |
| flag indicating whether ToolHandle tools have been added to m_tools More...
|
|
Gaudi::Property< bool > | m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" } |
|
Gaudi::Property< bool > | m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" } |
|
Gaudi::Property< bool > | m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" } |
|
Gaudi::Property< bool > | m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" } |
|
Gaudi::Property< bool > | m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" } |
|
Gaudi::Property< bool > | m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" } |
|
Gaudi::Property< bool > | m_autoRetrieveTools |
|
Gaudi::Property< bool > | m_checkToolDeps |
|
SmartIF< IAuditorSvc > | m_pAuditorSvc |
| Auditor Service
More...
|
|
Introduction
The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).
Task precedence management
The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:
(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.
Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.
Scheduling principles
o Task scheduling prerequisites
A task is scheduled ASA all following conditions are met:
- if a control flow (CF) graph traversal reaches the task;
- when all data flow (DF) dependencies of the task are satisfied;
- when the DF-ready task pool parsing mechanism (*) considers it, and:
- a free (or re-entrant) algorithm instance to run within the task is available;
- there is a free computational resource to run the task.
o (*) Avalanche induction strategies
The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:
(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.
o Other mechanisms of throughput maximization
The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:
- I/O-bound tasks;
- tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
- synchronization-bound tasks.
Credits
Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.
- Author
- Illya Shapoval
- Version
- 1.0
Definition at line 113 of file AvalancheSchedulerSvc.h.
◆ action
◆ AState
◆ ActivationState
◆ activate()
void AvalancheSchedulerSvc::activate |
( |
| ) |
|
|
private |
Activate scheduler.
Activate the scheduler.
From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)
Definition at line 458 of file AvalancheSchedulerSvc.cpp.
460 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
463 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
480 if ( sc.isFailure() )
481 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
491 if ( sc.isFailure() )
492 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
500 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
502 error() <<
"Problems terminating thread pool" <<
endmsg;
◆ algname2index()
unsigned int AvalancheSchedulerSvc::algname2index |
( |
const std::string & |
algoname | ) |
|
|
inlineprivate |
◆ deactivate()
Deactivate scheduler.
Deactivates the scheduler.
Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.
Definition at line 515 of file AvalancheSchedulerSvc.cpp.
◆ dumpGraphFile()
Definition at line 1178 of file AvalancheSchedulerSvc.cpp.
1181 assert( inDeps.size() == outDeps.size() );
1184 info() <<
"Dumping data dependencies graph to file: " <<
g.fileName() <<
endmsg;
1187 std::set<std::size_t> definedObjects;
1194 std::size_t algoIndex = 0ul;
1195 for (
const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) )
continue;
1197 std::string algIndex =
"Alg_" + std::to_string( algoIndex );
1198 g.addNode( algName, algIndex );
1201 for (
const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1206 if ( inserted )
g.addNode( dep.key(), objIndex );
1208 g.addEdge( dep.key(), objIndex, algName, algIndex );
1211 const auto& odeps = outDeps.at( algName );
1212 for (
const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1217 if ( inserted )
g.addNode( dep.key(), objIndex );
1219 g.addEdge( algName, algIndex, dep.key(), objIndex );
◆ dumpSchedulerState()
void AvalancheSchedulerSvc::dumpSchedulerState |
( |
int |
iSlot | ) |
|
|
private |
Dump the state of the scheduler.
Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.
Definition at line 880 of file AvalancheSchedulerSvc.cpp.
883 std::ostringstream outputMS;
885 outputMS <<
"Dumping scheduler state\n"
886 <<
"=========================================================================================\n"
887 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 <<
"=========================================================================================\n\n";
892 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 <<
"------------------\n\n";
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
919 outputMS <<
" task: " << std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
920 << slot.eventContext->slot();
923 if ( timelineSvc.isValid() ) {
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
929 if ( timelineSvc->getTimelineEvent( te ) )
932 outputMS <<
" thread.id: [unknown]";
937 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
944 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
954 if ( slot.complete )
continue;
956 outputMS <<
"[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
959 << ( slot.eventContext->
valid() ?
std::to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
961 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
962 outputMS <<
" ]:\n\n";
964 if ( 0 > iSlot || iSlot == slotCount ) {
968 outputMS <<
"ERROR alg(s):";
970 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
975 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
979 outputMS <<
m_precSvc->printState( slot ) <<
"\n";
984 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
986 for (
auto& ss : slot.allSubSlots ) {
987 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
989 <<
", entry: " << ss.entryPoint <<
", event: "
990 << ( ss.eventContext->
valid() ?
std::to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
993 outputMS <<
"ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS <<
" " <<
index2algname( algIndex ); }
999 outputMS <<
m_precSvc->printState( ss ) <<
"\n";
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1013 outputMS <<
"\n=========================================================================================\n"
1014 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 <<
"=========================================================================================\n\n";
1017 info() << outputMS.str() <<
endmsg;
◆ dumpState()
void AvalancheSchedulerSvc::dumpState |
( |
| ) |
|
|
override |
◆ eventFailed()
void AvalancheSchedulerSvc::eventFailed |
( |
EventContext * |
eventContext | ) |
|
|
private |
Method to execute if an event failed.
It can be possible that an event fails.
In this case this method is called. It dumps the state of the scheduler and marks the event as finished.
Definition at line 859 of file AvalancheSchedulerSvc.cpp.
860 const uint slotIdx = eventContext->
slot();
862 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
◆ finalize()
Finalise.
Here the scheduler is deactivated and the thread joined.
Definition at line 424 of file AvalancheSchedulerSvc.cpp.
427 if ( sc.isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
430 if ( sc.isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
432 debug() <<
"Deleting FiberManager" <<
endmsg;
435 info() <<
"Joining Scheduler thread" <<
endmsg;
440 error() <<
"problems in scheduler thread" <<
endmsg;
◆ freeSlots()
unsigned int AvalancheSchedulerSvc::freeSlots |
( |
| ) |
|
|
override |
◆ index2algname()
const std::string& AvalancheSchedulerSvc::index2algname |
( |
unsigned int |
index | ) |
|
|
inlineprivate |
◆ initialize()
Initialise.
Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.
In addition the algorithms list is acquired from the algResourcePool.
Definition at line 78 of file AvalancheSchedulerSvc.cpp.
82 if ( sc.isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
87 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
92 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
97 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
102 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [
this, &fiber_manager_initalized]() {
107 fiber_manager_initalized.release();
111 fiber_manager_initalized.acquire();
115 fatal() <<
"Terminating initialization" <<
endmsg;
118 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
127 warning() <<
"No CondSvc found, or not enabled. "
128 <<
"Will not manage CondAlgorithms" <<
endmsg;
136 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
142 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
149 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
165 error() <<
"No algorithms found" <<
endmsg;
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
181 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
187 globalOutp.insert(
id );
188 algoOutputs.insert(
id );
190 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
193 std::ostringstream ostdd;
194 ostdd <<
"Data Dependencies for Algorithms:";
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
199 if (
nullptr == algoPtr ) {
200 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
201 <<
": this will result in a crash." <<
endmsg;
209 ostdd <<
"\n " << algoPtr->
name();
211 auto write_owners = [&avis, &ostdd](
const DataObjID&
id ) {
212 auto owners = avis.owners_names_of(
id );
220 ostdd <<
"\n o INPUT " <<
id;
222 algoDependencies.insert(
id );
223 globalInp.insert(
id );
226 ostdd <<
"\n o OUTPUT " << *
id;
228 if (
id->key().find(
":" ) != std::string::npos ) {
229 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
237 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
244 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
253 std::set<std::string> requiredInputKeys;
254 for (
auto o : globalInp ) {
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
261 for (
auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
274 if ( !ignored ) { unusedOutp.insert( o ); }
281 if ( unmetDepInp.size() > 0 ) {
283 auto printUnmet = [&](
auto msg ) {
284 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
287 for (
const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
298 dataLoaderAlg = algo;
302 if ( dataLoaderAlg ==
nullptr ) {
304 <<
"\" found, and unmet INPUT dependencies "
306 printUnmet( fatal() );
310 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
"/"
311 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
312 printUnmet( info() );
317 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.
value() <<
"\" IAlg to Gaudi::Algorithm"
322 for (
auto&
id : unmetDepInp ) {
323 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
"/"
324 << dataLoaderAlg->name() <<
endmsg;
329 fatal() <<
"Auto DataLoading not requested, "
330 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
331 printUnmet( fatal() );
336 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
341 if ( unusedOutp.size() > 0 ) {
343 auto printUnusedOutp = [&](
auto msg ) {
344 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
347 for (
const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
352 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
353 printUnusedOutp( fatal() );
356 info() <<
"No unused OUTPUT items were found" <<
endmsg;
363 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
368 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
375 const std::string&
name = algo->name();
383 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
394 info() <<
"Concurrency level information:" <<
endmsg;
400 info() <<
"Task scheduling settings:" <<
endmsg;
401 info() <<
" o Avalanche generation mode: "
403 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
408 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
◆ isStalled()
bool AvalancheSchedulerSvc::isStalled |
( |
const EventSlot & |
slot | ) |
const |
|
private |
Check if scheduling in a particular slot is in a stall.
Check if we are in present of a stall condition for a particular slot.
This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.
Definition at line 841 of file AvalancheSchedulerSvc.cpp.
843 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
846 error() <<
"*** Stall detected, event context: " << slot.
eventContext.get() <<
endmsg;
◆ iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.
To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.
Definition at line 666 of file AvalancheSchedulerSvc.cpp.
672 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
675 global_sc =
schedule( std::move( retryTS ) );
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
684 if ( !thisSlot.eventContext )
continue;
686 int iSlot = thisSlot.eventContext->slot();
698 if ( nextSnap.states.empty() ) {
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
707 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
711 for (
auto& subslot : thisSlot.allSubSlots ) {
713 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
719 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
723 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
726 schedule( TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
729 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
730 <<
" on processing slot " << iSlot <<
endmsg;
734 for (
auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
739 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
741 schedule( TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
747 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
749 <<
", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() <<
"\n";
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
759 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
766 thisSlot.complete =
true;
770 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
771 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
778 thisSlot.eventContext.reset(
nullptr );
788 if ( !nextSnap.states.empty() ) {
◆ next()
bool AvalancheSchedulerSvc::next |
( |
TaskSpec & |
ts, |
|
|
bool |
asynchronous |
|
) |
| |
|
inline |
◆ popFinishedEvent()
Blocks until an event is available.
Get a finished event or block until one becomes available.
Definition at line 625 of file AvalancheSchedulerSvc.cpp.
637 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
◆ pushNewEvent()
Make an event available to the scheduler.
Add event to the scheduler.
There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.
Definition at line 546 of file AvalancheSchedulerSvc.cpp.
548 if ( !eventContext ) {
549 fatal() <<
"Event context is nullptr" <<
endmsg;
554 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
563 const unsigned int thisSlotNum = eventContext->
slot();
566 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
570 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
571 thisSlot.
reset( eventContext );
578 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
583 if ( this->
iterate().isFailure() ) {
584 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
593 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
◆ pushNewEvents()
◆ recordOccupancy()
void AvalancheSchedulerSvc::recordOccupancy |
( |
int |
samplePeriod, |
|
|
std::function< void(OccupancySnapshot)> |
callback |
|
) |
| |
|
overridevirtual |
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.
Definition at line 1163 of file AvalancheSchedulerSvc.cpp.
1166 if ( samplePeriod < 0 ) {
1169 this->
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
◆ revise()
Definition at line 800 of file AvalancheSchedulerSvc.cpp.
802 auto slotIndex = contextPtr->
slot();
808 auto subSlotIndex = contextPtr->
subSlot();
815 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
825 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
◆ schedule()
Definition at line 1022 of file AvalancheSchedulerSvc.cpp.
1029 if ( getAlgSC.isSuccess() ) {
1035 unsigned int algIndex{
ts.algIndex };
1036 std::string_view algName(
ts.algName );
1037 unsigned int algRank{
ts.algRank };
1038 bool asynchronous{
ts.asynchronous };
1039 int slotIndex{
ts.slotIndex };
1042 if ( asynchronous ) {
1050 if ( !asynchronous ) {
1058 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1060 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1061 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1071 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1077 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
◆ scheduleEventView()
Method to inform the scheduler about event views.
Definition at line 1123 of file AvalancheSchedulerSvc.cpp.
1127 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1135 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.release(),
1140 if ( viewContextPtr ) {
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.
addSubSlot( std::move( viewContext ), nodeName );
◆ signoff()
The call to this method is triggered only from within the AlgTask.
Definition at line 1092 of file AvalancheSchedulerSvc.cpp.
1100 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1106 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1107 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
◆ tryPopFinishedEvent()
Try to fetch an event from the scheduler.
Try to get a finished event, if not available just return a failure.
Definition at line 646 of file AvalancheSchedulerSvc.cpp.
649 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
◆ AlgTask
◆ m_actionsQueue
tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue |
|
private |
◆ m_algExecStateSvc
◆ m_algname_index_map
std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map |
|
private |
◆ m_algname_vect
std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect |
|
private |
Vector to bookkeep the information necessary to the index2name conversion.
Definition at line 260 of file AvalancheSchedulerSvc.h.
◆ m_algosInFlight
unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0 |
|
private |
◆ m_algResourcePool
◆ m_arena
tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr } |
|
private |
◆ m_blockingAlgosInFlight
unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0 |
|
private |
◆ m_checkDeps
Initial value:{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }
Definition at line 195 of file AvalancheSchedulerSvc.h.
◆ m_checkOutput
Initial value:{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }
Definition at line 197 of file AvalancheSchedulerSvc.h.
◆ m_checkOutputIgnoreList
Gaudi::Property<std::vector<std::string> > AvalancheSchedulerSvc::m_checkOutputIgnoreList |
|
private |
Initial value:{
this,
"CheckOutputUsageIgnoreList",
{},
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }
Definition at line 199 of file AvalancheSchedulerSvc.h.
◆ m_condSvc
◆ m_dataDepsGraphAlgoPattern
Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern |
|
private |
Initial value:{
this, "DataDepsGraphAlgPattern", ".*",
"Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
"deps graph" }
Definition at line 227 of file AvalancheSchedulerSvc.h.
◆ m_dataDepsGraphFile
Initial value:{
this, "DataDepsGraphFile", "",
"Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
"Algorithms" }
Definition at line 222 of file AvalancheSchedulerSvc.h.
◆ m_dataDepsGraphObjectPattern
Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern |
|
private |
Initial value:{
this, "DataDepsGraphObjectPattern", ".*",
"Regex pattern for selecting desired input or output by their full key" }
Definition at line 232 of file AvalancheSchedulerSvc.h.
◆ m_dumpIntraEventDynamics
Initial value:{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }
Definition at line 186 of file AvalancheSchedulerSvc.h.
◆ m_enableCondSvc
Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" } |
|
private |
◆ m_enablePreemptiveBlockingTasks
Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks |
|
private |
Initial value:{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }
Definition at line 188 of file AvalancheSchedulerSvc.h.
◆ m_eventSlots
std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots |
|
private |
◆ m_fiberManager
std::unique_ptr<FiberManager> AvalancheSchedulerSvc::m_fiberManager { nullptr } |
|
private |
◆ m_finishedEvents
tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents |
|
private |
◆ m_freeSlots
std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 } |
|
private |
◆ m_isActive
◆ m_lastSnapshot
std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now() |
|
private |
◆ m_maxAlgosInFlight
size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 } |
|
private |
◆ m_maxBlockingAlgosInFlight
Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight |
|
private |
Initial value:{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }
Definition at line 179 of file AvalancheSchedulerSvc.h.
◆ m_maxEventsInFlight
size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 } |
|
private |
◆ m_maxParallelismExtra
Initial value:{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }
Definition at line 174 of file AvalancheSchedulerSvc.h.
◆ m_needsUpdate
std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true } |
|
private |
◆ m_numOffloadThreads
Initial value:{
this, "NumOffloadThreads", 2,
"Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
"and use Boost Fiber functionality to suspend while waiting for offloaded work." }
Definition at line 191 of file AvalancheSchedulerSvc.h.
◆ m_optimizationMode
Initial value:{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }
Definition at line 184 of file AvalancheSchedulerSvc.h.
◆ m_precSvc
◆ m_retryQueue
◆ m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledAsynchronousQueue |
|
private |
◆ m_scheduledQueue
◆ m_showControlFlow
Initial value:{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }
Definition at line 217 of file AvalancheSchedulerSvc.h.
◆ m_showDataDeps
Initial value:{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }
Definition at line 211 of file AvalancheSchedulerSvc.h.
◆ m_showDataFlow
Initial value:{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }
Definition at line 214 of file AvalancheSchedulerSvc.h.
◆ m_simulateExecution
Initial value:{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }
Definition at line 181 of file AvalancheSchedulerSvc.h.
◆ m_snapshotCallback
std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback |
|
private |
◆ m_snapshotInterval
std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min() |
|
private |
◆ m_thread
std::thread AvalancheSchedulerSvc::m_thread |
|
private |
◆ m_threadPoolSize
Initial value:{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }
Definition at line 169 of file AvalancheSchedulerSvc.h.
◆ m_threadPoolSvc
◆ m_useDataLoader
Initial value:{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }
Definition at line 206 of file AvalancheSchedulerSvc.h.
◆ m_verboseSubSlots
Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" } |
|
private |
◆ m_whiteboard
◆ m_whiteboardSvcName
Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" } |
|
private |
The documentation for this class was generated from the following files:
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
GAUDI_API void setCurrentContext(const EventContext *ctx)
A service to resolve the task execution precedence.
MsgStream & hex(MsgStream &log)
StatusCode initialize() override
wrapper on an Algorithm state.
Gaudi::Property< std::string > m_useDataLoader
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
bool filterPassed() const
Gaudi::Property< std::string > m_optimizationMode
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Class representing an event slot.
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
std::chrono::system_clock::time_point m_lastSnapshot
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
utilities to dump graphs in different formats
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::unique_ptr< FiberManager > m_fiberManager
StatusCode schedule(TaskSpec &&)
Gaudi::Property< bool > m_showControlFlow
std::atomic< bool > m_needsUpdate
Gaudi::Property< bool > m_enableCondSvc
StatusCode deactivate()
Deactivate scheduler.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
StatusCode finalize() override
std::vector< EventSlot > m_eventSlots
Vector of events slots.
unsigned int getAlgoIndex() const
Get algorithm index.
Gaudi::Property< int > m_numOffloadThreads
tbb::task_arena * m_arena
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
bool complete
Flags completion of the event.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< int > m_threadPoolSize
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
size_t m_maxEventsInFlight
bool isValid() const
Allow for check if smart pointer is valid.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
const std::string & name() const override
Retrieve name of the service
Gaudi::Property< int > m_maxParallelismExtra
MsgStream & dec(MsgStream &log)
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Base class from which all concrete algorithm classes should be derived.
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< bool > m_checkOutput
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
const ValueType & value() const
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Gaudi::Property< bool > m_simulateExecution
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
AlgsExecutionStates::State AState
unsigned int m_algosInFlight
Number of algorithms presently in flight.
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showDataDeps
size_t m_maxAlgosInFlight
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Gaudi::Property< bool > m_dumpIntraEventDynamics
StatusCode set(unsigned int iAlgo, State newState)
std::queue< TaskSpec > m_retryQueue
constexpr static const auto SUCCESS
ContextID_t subSlot() const
TYPE * get() const
Get interface pointer.
const DataObjIDColl & outputDataObjs() const override
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_dataDepsGraphFile
std::string toString() const override
value -> string
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
void activate()
Activate scheduler.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
const DataObjIDColl & inputDataObjs() const override
std::thread m_thread
The thread in which the activate function runs.
Gaudi::Property< bool > m_showDataFlow
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
constexpr static const auto FAILURE
size_t sizeOfSubset(State state) const
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
std::function< void(OccupancySnapshot)> m_snapshotCallback
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::function< StatusCode()> action
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
const boost::container::flat_set< int > algsInState(State state) const
AlgsExecutionStates algsStates
Vector of algorithms states.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Gaudi::Property< bool > m_verboseSubSlots
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
A service which initializes a TBB thread pool.
const StatusCode & execStatus() const