Go to the documentation of this file.
32 #include <string_view>
34 #include <unordered_set>
37 #include <boost/algorithm/string.hpp>
38 #include <boost/thread.hpp>
39 #include <boost/tokenizer.hpp>
44 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
45 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
48 struct DataObjIDSorter {
56 v.reserve( coll.
size() );
57 for (
const DataObjID&
id : coll )
v.push_back( &
id );
64 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
80 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
85 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
90 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
95 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
103 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
108 fatal() <<
"Terminating initialization" <<
endmsg;
111 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
120 warning() <<
"No CondSvc found, or not enabled. "
121 <<
"Will not manage CondAlgorithms" <<
endmsg;
129 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
135 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
142 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
154 const unsigned int algsNumber = algos.
size();
155 if ( algsNumber != 0 ) {
156 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
158 error() <<
"No algorithms found" <<
endmsg;
174 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
183 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
187 ostdd <<
"Data Dependencies for Algorithms:";
192 if (
nullptr == algoPtr ) {
193 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
194 <<
": this will result in a crash." <<
endmsg;
202 ostdd <<
"\n " << algoPtr->
name();
204 auto write_owners = [&avis, &ostdd](
const DataObjID id ) {
213 ostdd <<
"\n o INPUT " <<
id;
215 algoDependencies.
insert(
id );
219 ostdd <<
"\n o OUTPUT " << *
id;
221 if (
id->key().find(
":" ) != std::string::npos ) {
222 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
230 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
237 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
247 for (
auto o : globalInp ) {
250 requiredInputKeys.
insert( o.key() );
251 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDepInp.
insert( o );
254 for (
auto o : globalOutp ) {
255 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
259 auto it = algosOutputDependenciesMap.
find( algoName );
260 if ( it != algosOutputDependenciesMap.
end() ) {
261 if ( it->second.find( o ) != it->second.end() ) {
267 if ( !ignored ) { unusedOutp.
insert( o ); }
274 if ( unmetDepInp.
size() > 0 ) {
276 auto printUnmet = [&](
auto msg ) {
277 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
278 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
280 for (
const auto& p : algosInputDependenciesMap )
281 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
291 dataLoaderAlg = algo;
295 if ( dataLoaderAlg ==
nullptr ) {
297 <<
"\" found, and unmet INPUT dependencies "
299 printUnmet( fatal() );
303 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/"
304 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
305 printUnmet( info() );
310 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.
value() <<
"\" IAlg to Gaudi::Algorithm"
315 for (
auto&
id : unmetDepInp ) {
316 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/"
317 << dataLoaderAlg->name() <<
endmsg;
322 fatal() <<
"Auto DataLoading not requested, "
323 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
324 printUnmet( fatal() );
329 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
334 if ( unusedOutp.
size() > 0 ) {
336 auto printUnusedOutp = [&](
auto msg ) {
337 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
338 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
340 for (
const auto& p : algosOutputDependenciesMap )
341 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
345 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
346 printUnusedOutp( fatal() );
349 info() <<
"No unused OUTPUT items were found" <<
endmsg;
356 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
361 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
376 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
387 info() <<
"Concurrency level information:" <<
endmsg;
392 info() <<
"Task scheduling settings:" <<
endmsg;
393 info() <<
" o Avalanche generation mode: "
395 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
400 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
419 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
422 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
424 debug() <<
"Deleting FiberManager" <<
endmsg;
427 info() <<
"Joining Scheduler thread" <<
endmsg;
432 error() <<
"problems in scheduler thread" <<
endmsg;
452 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
455 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
473 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
484 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
492 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
494 error() <<
"Problems terminating thread pool" <<
endmsg;
540 if ( !eventContext ) {
541 fatal() <<
"Event context is nullptr" <<
endmsg;
546 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
555 const unsigned int thisSlotNum = eventContext->
slot();
558 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
562 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
563 thisSlot.
reset( eventContext );
570 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
571 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
575 if ( this->
iterate().isFailure() ) {
576 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
585 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
598 for (
auto context : eventContexts ) {
629 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
641 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
664 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
671 OccupancySnapshot nextSnap;
676 if ( !thisSlot.eventContext )
continue;
678 int iSlot = thisSlot.eventContext->slot();
690 if ( nextSnap.states.empty() ) {
697 slotStateTotals.
resize( AState::MAXVALUE );
699 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
703 for (
auto& subslot : thisSlot.allSubSlots ) {
705 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
711 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
712 for ( uint algIndex : drAlgs ) {
715 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
718 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
721 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
722 <<
" on processing slot " << iSlot <<
endmsg;
726 for (
auto& subslot : thisSlot.allSubSlots ) {
727 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
728 for ( uint algIndex : drAlgsSubSlot ) {
731 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
733 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
739 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
751 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
752 !thisSlot.algsStates.containsAny(
753 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
754 !subSlotAlgsInStates( thisSlot,
755 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
756 !thisSlot.complete ) {
758 thisSlot.complete =
true;
762 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
763 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
770 thisSlot.eventContext.reset(
nullptr );
780 if ( !nextSnap.states.empty() ) {
794 auto slotIndex = contextPtr->
slot();
800 auto subSlotIndex = contextPtr->
subSlot();
807 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
817 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
835 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
836 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
852 const uint slotIdx = eventContext->
slot();
854 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
877 outputMS <<
"Dumping scheduler state\n"
878 <<
"=========================================================================================\n"
879 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
880 <<
"=========================================================================================\n\n";
884 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
885 <<
"------------------\n\n";
889 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
890 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
897 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
898 for ( uint algIndex : schedAlgs ) {
906 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
907 for ( uint algIndex : schedAlgs ) {
911 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
912 << slot.eventContext->slot();
915 if ( timelineSvc.isValid() ) {
918 te.slot = slot.eventContext->slot();
919 te.event = slot.eventContext->evt();
921 if ( timelineSvc->getTimelineEvent( te ) )
924 outputMS <<
" thread.id: [unknown]";
929 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
936 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
937 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
941 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
944 for (
auto& slot : m_eventSlots ) {
946 if ( slot.complete )
continue;
948 outputMS <<
"[ slot: "
949 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
951 << ( slot.eventContext->
valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
953 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
954 outputMS <<
" ]:\n\n";
956 if ( 0 > iSlot || iSlot == slotCount ) {
960 outputMS <<
"ERROR alg(s):";
962 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
963 for ( uint algIndex : errorAlgs ) {
964 outputMS <<
" " << index2algname( algIndex );
967 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
971 outputMS << m_precSvc->printState( slot ) <<
"\n";
975 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
976 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
977 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
978 for (
auto& ss : slot.allSubSlots ) {
979 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
980 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
981 <<
", entry: " << ss.entryPoint <<
", event: "
985 outputMS <<
"ERROR alg(s):";
986 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
987 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
991 outputMS << m_precSvc->printState( ss ) <<
"\n";
1000 if ( 0 <= iSlot && !wasAlgError ) {
1001 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1002 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1005 outputMS <<
"\n=========================================================================================\n"
1006 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1007 <<
"=========================================================================================\n\n";
1009 info() << outputMS.str() <<
endmsg;
1027 unsigned int algIndex{
ts.algIndex };
1028 std::string_view algName(
ts.algName );
1029 unsigned int algRank{
ts.algRank };
1030 bool asynchronous{
ts.asynchronous };
1031 int slotIndex{
ts.slotIndex };
1034 if ( asynchronous ) {
1042 if ( !asynchronous ) {
1050 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1052 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1053 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1063 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1069 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1092 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1098 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1099 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
1119 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1127 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1130 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1132 if ( viewContextPtr ) {
1158 if ( samplePeriod < 0 ) {
1173 assert( inDeps.
size() == outDeps.
size() );
1176 enum class FileType : short {
UNKNOWN, DOT, MD };
1183 fileExtension = FileType::DOT;
1185 fileExtension = FileType::MD;
1187 fileExtension = FileType::DOT;
1188 fileName = fileName +
".dot";
1190 info() <<
"Dumping data dependencies graph to file: " << fileName <<
endmsg;
1200 if ( fileExtension == FileType::DOT ) {
1202 startGraph =
"digraph datadeps {\nrankdir=\"LR\";\n\n";
1203 stopGraph =
"\n}\n";
1206 return "Alg_" + idx +
" [label=\"" +
alg +
"\";shape=box];\n";
1210 return "obj_" +
std::to_string( obj.hash() ) +
" [label=\"" + obj.key() +
"\"];\n";
1222 startGraph =
"```mermaid\ngraph LR;\n\n";
1223 stopGraph =
"\n```\n";
1226 return "Alg_" + idx +
"{{" +
alg +
"}}\n";
1230 return "obj_" +
std::to_string( obj.hash() ) +
">" + obj.key() +
"]\n";
1243 dataDepthGraphFile << startGraph;
1254 for (
const auto& [
name, ideps] : inDeps ) {
1259 for (
const auto& dep : ideps ) {
1262 if ( definedObjects.
find( dep.hash() ) == definedObjects.
end() ) {
1263 definedObjects.
insert( dep.hash() );
1264 dataDepthGraphFile << defineObj( dep );
1266 dataDepthGraphFile << defineInput( dep,
std::to_string( algoIndex ) );
1269 const auto& odeps = outDeps.
at(
name );
1270 for (
const auto& dep : odeps ) {
1273 if ( definedObjects.
find( dep.hash() ) == definedObjects.
end() ) {
1274 definedObjects.
insert( dep.hash() );
1275 dataDepthGraphFile << defineObj( dep );
1277 dataDepthGraphFile << defineOutput(
std::to_string( algoIndex ), dep );
1284 dataDepthGraphFile << stopGraph;
1285 dataDepthGraphFile.
close();
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
GAUDI_API void setCurrentContext(const EventContext *ctx)
A service to resolve the task execution precedence.
const std::string name() const
property name
StatusCode initialize() override
Gaudi::Property< std::string > m_useDataLoader
Struct to hold entries in the alg queues.
StatusCode finalize() override
Finalise.
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
Gaudi::Property< std::string > m_optimizationMode
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Class representing an event slot.
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
std::chrono::system_clock::time_point m_lastSnapshot
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::unique_ptr< FiberManager > m_fiberManager
StatusCode schedule(TaskSpec &&)
Gaudi::Property< bool > m_showControlFlow
std::atomic< bool > m_needsUpdate
Gaudi::Property< bool > m_enableCondSvc
StatusCode deactivate()
Deactivate scheduler.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
StatusCode finalize() override
std::vector< EventSlot > m_eventSlots
Vector of events slots.
unsigned int getAlgoIndex() const
Get algorithm index.
Gaudi::Property< int > m_numOffloadThreads
tbb::task_arena * m_arena
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
bool complete
Flags completion of the event.
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< int > m_threadPoolSize
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
size_t m_maxEventsInFlight
bool isValid() const
Allow for check if smart pointer is valid.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
const std::string & name() const override
Retrieve name of the service
T hardware_concurrency(T... args)
Gaudi::Property< int > m_maxParallelismExtra
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
void schedule(F &&func)
Schedule work to run on the asynchronous pool.
Base class from which all concrete algorithm classes should be derived.
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< bool > m_checkOutput
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
const ValueType & value() const
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
const StatusCode & execStatus() const
Gaudi::Property< bool > m_simulateExecution
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
AlgsExecutionStates::State AState
unsigned int m_algosInFlight
Number of algorithms presently in flight.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
unsigned int freeSlots() override
Get free slots number.
Gaudi::Property< bool > m_showDataDeps
size_t m_maxAlgosInFlight
T regex_search(T... args)
void dumpState() override
Dump scheduler state for all slots.
StatusCode initialize() override
Initialise.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
T emplace_back(T... args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Gaudi::Property< bool > m_dumpIntraEventDynamics
StatusCode set(unsigned int iAlgo, State newState)
std::queue< TaskSpec > m_retryQueue
constexpr static const auto SUCCESS
ContextID_t subSlot() const
TYPE * get() const
Get interface pointer.
const DataObjIDColl & outputDataObjs() const override
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
#define DECLARE_COMPONENT(type)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_dataDepsGraphFile
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
std::string toString() const override
value -> string
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
bool filterPassed() const
void activate()
Activate scheduler.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
const DataObjIDColl & inputDataObjs() const override
std::thread m_thread
The thread in which the activate function runs.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Gaudi::Property< bool > m_showDataFlow
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
constexpr static const auto FAILURE
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
size_t sizeOfSubset(State state) const
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
std::function< void(OccupancySnapshot)> m_snapshotCallback
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
const boost::container::flat_set< int > algsInState(State state) const
AlgsExecutionStates algsStates
Vector of algorithms states.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
A service which initializes a TBB thread pool.