30 #include <unordered_set> 33 #include "boost/algorithm/string.hpp" 34 #include "boost/thread.hpp" 35 #include "boost/tokenizer.hpp" 37 #include "tbb/tbb_stddef.h" 38 #if TBB_INTERFACE_VERSION_MAJOR < 12 39 # include "tbb/task_scheduler_init.h" 40 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 45 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 46 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 49 struct DataObjIDSorter {
86 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
91 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
108 warning() <<
"No CondSvc found, or not enabled. " 109 <<
"Will not manage CondAlgorithms" <<
endmsg;
117 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
123 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
130 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
138 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
149 const unsigned int algsNumber = algos.
size();
150 if ( algsNumber != 0 ) {
151 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
168 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
175 ostdd <<
"Data Dependencies for Algorithms:";
180 if (
nullptr == algoPtr ) {
181 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
182 <<
": this will result in a crash." <<
endmsg;
186 ostdd <<
"\n " << algoPtr->
name();
192 ostdd <<
"\n o INPUT " << id;
193 if (
id.key().find(
":" ) != std::string::npos ) {
194 ostdd <<
" contains alternatives which require resolution...\n";
195 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
199 if ( itok != tokens.end() ) {
200 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
201 id.updateKey( *itok );
203 error() <<
"failed to find alternate in global output list" 204 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
208 algoDependencies.
insert(
id );
212 ostdd <<
"\n o OUTPUT " << *id;
213 if ( id->key().find(
":" ) != std::string::npos ) {
214 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 222 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
231 for (
auto o : globalInp )
232 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
234 if ( unmetDep.
size() > 0 ) {
237 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
238 ost <<
"\n o " << *o <<
" required by Algorithm: ";
240 for (
const auto& p : algosDependenciesMap )
241 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
250 dataLoaderAlg = algo;
254 if ( dataLoaderAlg ==
nullptr ) {
256 <<
"\" found, and unmet INPUT dependencies " 262 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 263 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
266 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
268 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm" 273 for (
auto&
id : unmetDep ) {
274 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 275 << dataLoaderAlg->name() <<
endmsg;
280 fatal() <<
"Auto DataLoading not requested, " 281 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
286 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
293 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
298 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
313 if ( !messageSvc.
isValid() )
error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
324 info() <<
"Concurrency level information:" <<
endmsg;
350 info() <<
"Joining Scheduler thread" <<
endmsg;
355 error() <<
"problems in scheduler thread" <<
endmsg;
378 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
396 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
407 verbose() <<
"updateStates did not succeed (which is not bad per se)." <<
endmsg;
417 error() <<
"Problems terminating thread pool" <<
endmsg;
463 if ( !eventContext ) {
478 const unsigned int thisSlotNum = eventContext->
slot();
481 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
486 thisSlot.
reset( eventContext );
494 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
499 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
508 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
521 for (
auto context : eventContexts ) {
560 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 587 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
599 if ( !thisSlot.eventContext )
continue;
601 int iSlot = thisSlot.eventContext->slot();
609 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
612 bool IOBound =
false;
616 partial_sc =
enqueue( algIndex, iSlot, thisSlot.eventContext.get() );
621 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " <<
index2algname( algIndex )
622 <<
" on processing slot " << iSlot <<
endmsg;
626 for (
auto& subslot : thisSlot.allSubSlots ) {
627 auto& subslotStates = subslot.algsStates;
628 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
630 partial_sc =
enqueue( algIndex, iSlot, subslot.eventContext.get() );
640 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 644 #if TBB_INTERFACE_VERSION_MAJOR < 12 645 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
648 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 650 myfile.
open(
"IntraEventFSMOccupancy_" +
threads +
"T.csv", std::ios::app );
657 !thisSlot.algsStates.containsAny(
658 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
659 !subSlotAlgsInStates( thisSlot,
660 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
661 !thisSlot.complete ) {
663 thisSlot.complete =
true;
667 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 668 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
675 thisSlot.eventContext.reset(
nullptr );
698 size_t const subSlotIndex = contextPtr->
subSlot();
699 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo,
state );
720 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
736 const uint slotIdx = eventContext->
slot();
738 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
761 outputMS <<
"Dumping scheduler state\n" 762 <<
"=========================================================================================\n" 763 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 764 <<
"=========================================================================================\n\n";
768 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 769 <<
"------------------\n\n";
773 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
774 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
780 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
785 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
790 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 791 << slot.eventContext->slot();
794 if ( timelineSvc.isValid() ) {
797 te.slot = slot.eventContext->slot();
798 te.event = slot.eventContext->evt();
800 if ( timelineSvc->getTimelineEvent( te ) )
803 outputMS <<
" thread.id: [unknown]";
815 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 816 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
821 if ( slot.complete )
continue;
823 outputMS <<
"[ slot: " 824 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
826 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
829 if ( 0 > iSlot || iSlot == slotCount ) {
836 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
837 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
838 for (
auto& ss : slot.allSubSlots ) {
839 outputMS <<
"[ slot: " << slotID <<
", sub-slot: " 840 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
841 <<
", entry: " << ss.entryPoint <<
", event: " 842 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
853 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
857 outputMS <<
"\n=========================================================================================\n" 858 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 859 <<
"=========================================================================================\n\n";
870 unsigned int rank = 0;
879 if ( getAlgSC.isSuccess() ) {
889 tbb::task* algoTask =
892 tbb::task::enqueue( *algoTask );
902 state = AState::SCHEDULED;
906 m_retryQueue.push( {iAlgo, si, eventContext, rank,
nullptr} );
908 state = AState::RESOURCELESS;
933 if ( sc.isSuccess() ) {
936 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
937 this->
m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
964 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
971 <<
"[Asynchronous] Promoting " << algName <<
" to SCHEDULED on slot " << si <<
endmsg;
975 <<
" on slot " << si <<
endmsg;
993 ON_DEBUG debug() <<
"Trying to handle execution result of " << algName <<
" on slot " << si <<
endmsg;
997 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1004 <<
"Promoting " << algName <<
" on slot " << si <<
" to " <<
state <<
endmsg;
1006 ON_DEBUG debug() <<
"Algorithm " << algName <<
" executed in slot " << si <<
". Algorithms scheduled are " 1025 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1026 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1032 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1037 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1044 <<
"[Asynchronous] Promoting " << algo->name() <<
" on slot " << si <<
" to " <<
state <<
endmsg;
1046 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1062 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1070 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1075 if ( viewContextPtr ) {
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Wrapper around I/O-bound Gaudi-algorithms.
EventContext * contextPtr
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Class representing an event slot.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
StatusCode finalize() override
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
std::atomic< bool > m_needsUpdate
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
bool isValid() const
Allow for check if smart pointer is valid.
constexpr static const auto SUCCESS
bool filterPassed() const
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
Struct to hold entries in the alg queues.
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
T hardware_concurrency(T... args)
TYPE * get() const
Get interface pointer.
This class represents an entry point to all the event specific data.
std::string fullKey() const
friend class AlgoExecutionTask
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
tbb::task * execute() override
Gaudi::Property< bool > m_checkDeps
Gaudi::Property< bool > m_useIOBoundAlgScheduler
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
const std::string & name() const override
Retrieve name of the service.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
ContextID_t subSlot() const
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
const StatusCode & ignore() const
Ignore/check StatusCode.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< bool > m_simulateExecution
StatusCode setAlgState(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
StatusCode updateStates()
Loop on algorithm in the slots and promote them to successive states.
Iterator begin(State kind)
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
size_t m_maxAlgosInFlight
virtual uint getPriority(const std::string &) const =0
Get task priority.
StatusCode enqueue(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
constexpr static const auto FAILURE
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
virtual StatusCode push(IAlgTask &task)=0
tbb::concurrent_priority_queue< AlgQueueEntry, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
std::queue< AlgQueueEntry > m_retryQueue
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
size_t m_maxEventsInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
const std::string & name() const override
The identifying name of the algorithm object.
AlgsExecutionStates algsStates
Vector of algorithms states.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.