28 #include <string_view> 30 #include <unordered_set> 33 #include "boost/algorithm/string.hpp" 34 #include "boost/thread.hpp" 35 #include "boost/tokenizer.hpp" 37 #include "tbb/tbb_stddef.h" 38 #if TBB_INTERFACE_VERSION_MAJOR < 12 39 # include "tbb/task_scheduler_init.h" 40 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 46 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 47 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 50 struct DataObjIDSorter {
87 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
92 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
109 warning() <<
"No CondSvc found, or not enabled. " 110 <<
"Will not manage CondAlgorithms" <<
endmsg;
118 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
124 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
131 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
143 const unsigned int algsNumber = algos.
size();
144 if ( algsNumber != 0 ) {
145 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
162 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
169 ostdd <<
"Data Dependencies for Algorithms:";
174 if (
nullptr == algoPtr ) {
175 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176 <<
": this will result in a crash." <<
endmsg;
180 ostdd <<
"\n " << algoPtr->
name();
186 ostdd <<
"\n o INPUT " << id;
187 if (
id.key().find(
":" ) != std::string::npos ) {
188 ostdd <<
" contains alternatives which require resolution...\n";
189 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
193 if ( itok != tokens.end() ) {
194 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
195 id.updateKey( *itok );
197 error() <<
"failed to find alternate in global output list" 198 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
202 algoDependencies.
insert(
id );
206 ostdd <<
"\n o OUTPUT " << *id;
207 if ( id->key().find(
":" ) != std::string::npos ) {
208 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 216 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
225 for (
auto o : globalInp )
226 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
228 if ( unmetDep.
size() > 0 ) {
231 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232 ost <<
"\n o " << *o <<
" required by Algorithm: ";
234 for (
const auto& p : algosDependenciesMap )
235 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
244 dataLoaderAlg = algo;
248 if ( dataLoaderAlg ==
nullptr ) {
250 <<
"\" found, and unmet INPUT dependencies " 256 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 257 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
260 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
262 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm" 267 for (
auto&
id : unmetDep ) {
268 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 269 << dataLoaderAlg->name() <<
endmsg;
274 fatal() <<
"Auto DataLoading not requested, " 275 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
280 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
287 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
292 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
307 if ( !messageSvc.
isValid() )
error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
318 info() <<
"Concurrency level information:" <<
endmsg;
323 info() <<
"Task scheduling settings:" <<
endmsg;
324 info() <<
" o Avalanche generation mode: " 326 info() <<
" o Preemptive scheduling of CPU-blocking tasks: " 355 info() <<
"Joining Scheduler thread" <<
endmsg;
360 error() <<
"problems in scheduler thread" <<
endmsg;
383 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
401 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
412 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
422 error() <<
"Problems terminating thread pool" <<
endmsg;
468 if ( !eventContext ) {
483 const unsigned int thisSlotNum = eventContext->
slot();
486 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
491 thisSlot.
reset( eventContext );
499 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
503 if ( this->
iterate().isFailure() ) {
504 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
513 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
526 for (
auto context : eventContexts ) {
565 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 588 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
598 if ( !thisSlot.eventContext )
continue;
600 int iSlot = thisSlot.eventContext->slot();
608 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
615 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
618 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
619 <<
" on processing slot " << iSlot <<
endmsg;
623 for (
auto& subslot : thisSlot.allSubSlots ) {
624 auto& subslotStates = subslot.algsStates;
625 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
631 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
637 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 641 #if TBB_INTERFACE_VERSION_MAJOR < 12 642 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
645 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 647 myfile.
open(
"IntraEventFSMOccupancy_" +
threads +
"T.csv", std::ios::app );
654 !thisSlot.algsStates.containsAny(
655 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
656 !subSlotAlgsInStates( thisSlot,
657 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
658 !thisSlot.complete ) {
660 thisSlot.complete =
true;
664 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 665 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
672 thisSlot.eventContext.reset(
nullptr );
690 auto slotIndex = contextPtr->
slot();
696 auto subSlotIndex = contextPtr->
subSlot();
703 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
713 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
732 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
748 const uint slotIdx = eventContext->
slot();
750 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
773 outputMS <<
"Dumping scheduler state\n" 774 <<
"=========================================================================================\n" 775 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 776 <<
"=========================================================================================\n\n";
780 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 781 <<
"------------------\n\n";
785 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
786 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
792 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
797 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
802 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 803 << slot.eventContext->slot();
806 if ( timelineSvc.isValid() ) {
809 te.slot = slot.eventContext->slot();
810 te.event = slot.eventContext->evt();
812 if ( timelineSvc->getTimelineEvent( te ) )
815 outputMS <<
" thread.id: [unknown]";
827 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 828 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
833 if ( slot.complete )
continue;
835 outputMS <<
"[ slot: " 836 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
838 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
841 if ( 0 > iSlot || iSlot == slotCount ) {
848 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
849 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
850 for (
auto& ss : slot.allSubSlots ) {
851 outputMS <<
"[ slot: " << slotID <<
", sub-slot: " 852 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
853 <<
", entry: " << ss.entryPoint <<
", event: " 854 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
865 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
869 outputMS <<
"\n=========================================================================================\n" 870 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 871 <<
"=========================================================================================\n\n";
890 if (
LIKELY( getAlgSC.isSuccess() ) ) {
896 unsigned int algIndex{
ts.algIndex};
897 std::string_view algName(
ts.algName );
898 unsigned int algRank{
ts.algRank};
899 bool blocking{
ts.blocking};
900 int slotIndex{
ts.slotIndex};
903 if (
LIKELY( !blocking ) ) {
912 tbb::task::enqueue( *algoTask );
927 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
929 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
930 <<
", rank:" << algRank <<
", blocking:" << ( blocking ?
"yes" :
"no" )
940 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
946 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
972 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
978 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
979 <<
", rank:" <<
ts.algRank <<
", blocking:" << (
ts.blocking ?
"yes" :
"no" )
999 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1007 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1012 if ( viewContextPtr ) {
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Struct to hold entries in the alg queues.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Class representing an event slot.
Gaudi::Property< std::string > m_whiteboardSvcName
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
StatusCode finalize() override
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
std::queue< TaskSpec > m_retryQueue
std::atomic< bool > m_needsUpdate
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
bool isValid() const
Allow for check if smart pointer is valid.
constexpr static const auto SUCCESS
bool filterPassed() const
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
T hardware_concurrency(T... args)
TYPE * get() const
Get interface pointer.
This class represents an entry point to all the event specific data.
std::string fullKey() const
virtual StatusCode acquireAlgorithm(std::string_view name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
const std::string & name() const override
Retrieve name of the service.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
StatusCode schedule(TaskSpec &&)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
SmartIF< IThreadPoolSvc > m_threadPoolSvc
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
ContextID_t subSlot() const
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
const StatusCode & ignore() const
Ignore/check StatusCode.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_simulateExecution
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Iterator begin(State kind)
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
size_t m_maxAlgosInFlight
virtual uint getPriority(const std::string &) const =0
Get task priority.
constexpr static const auto FAILURE
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
size_t m_maxEventsInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
const std::string & name() const override
The identifying name of the algorithm object.
AlgsExecutionStates algsStates
Vector of algorithms states.
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.