20 #include <unordered_set> 23 #include "boost/algorithm/string.hpp" 24 #include "boost/thread.hpp" 25 #include "boost/tokenizer.hpp" 27 #include "tbb/task_scheduler_init.h" 37 struct DataObjIDSorter {
66 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
69 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
70 if ( !m_threadPoolSvc.isValid() ) {
71 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
76 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
77 m_thread =
std::thread( [
this]() { this->activate(); } );
79 while ( m_isActive != ACTIVE ) {
81 fatal() <<
"Terminating initialization" <<
endmsg;
84 info() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
89 if ( m_enableCondSvc ) {
91 m_condSvc = serviceLocator()->service(
"CondSvc" );
92 if ( !m_condSvc.isValid() ) {
93 warning() <<
"No CondSvc found, or not enabled. " 94 <<
"Will not manage CondAlgorithms" <<
endmsg;
95 m_enableCondSvc =
false;
100 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
101 if ( !m_algResourcePool.isValid() ) {
102 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
106 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
107 if ( !m_algExecStateSvc.isValid() ) {
108 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
113 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
114 if ( !m_whiteboard.isValid() ) {
115 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
120 if ( m_useIOBoundAlgScheduler ) {
121 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
122 if ( !m_IOBoundAlgScheduler.isValid() )
123 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
127 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
130 m_freeSlots = m_maxEventsInFlight;
137 const unsigned int algsNumber = algos.
size();
138 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
151 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
154 auto r = globalOutp.
insert(
id );
156 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 157 "though, or control flow may guarantee only one runs...!" 164 ostdd <<
"Data Dependencies for Algorithms:";
169 if (
nullptr == algoPtr ) {
170 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
171 <<
": this will result in a crash." <<
endmsg;
175 ostdd <<
"\n " << algoPtr->
name();
181 ostdd <<
"\n o INPUT " << id;
182 if (
id.key().find(
":" ) != std::string::npos ) {
183 ostdd <<
" contains alternatives which require resolution...\n";
184 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
188 if ( itok != tokens.end() ) {
189 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
190 id.updateKey( *itok );
192 error() <<
"failed to find alternate in global output list" 193 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
194 m_showDataDeps =
true;
197 algoDependencies.
insert(
id );
201 ostdd <<
"\n o OUTPUT " << *id;
202 if ( id->key().find(
":" ) != std::string::npos ) {
203 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 205 m_showDataDeps =
true;
211 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
214 if ( m_showDataDeps ) {
222 for (
auto o : globalInp )
223 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
225 if ( unmetDep.
size() > 0 ) {
228 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
229 ost <<
"\n o " << *o <<
" required by Algorithm: ";
231 for (
const auto& p : algosDependenciesMap )
232 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
235 if ( !m_useDataLoader.empty() ) {
240 if ( algo->name() == m_useDataLoader ) {
241 dataLoaderAlg = algo;
245 if ( dataLoaderAlg ==
nullptr ) {
246 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
247 <<
"\" found, and unmet INPUT dependencies " 253 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 254 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
259 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
263 for (
auto&
id : unmetDep ) {
264 debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
270 fatal() <<
"Auto DataLoading not requested, " 271 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
276 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
281 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
282 if ( !m_precSvc.isValid() ) {
283 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
288 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
293 m_algname_vect.resize( algsNumber );
297 m_algname_index_map[
name] = index;
298 m_algname_vect.at( index ) =
name;
303 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
305 m_eventSlots.assign( m_maxEventsInFlight,
307 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
309 if ( m_threadPoolSize > 1 ) {
310 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
314 info() <<
"Concurrency level information:" <<
endmsg;
315 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
316 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
318 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
320 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
323 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
336 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be finalized" <<
endmsg;
339 if ( !sc.
isSuccess() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
341 info() <<
"Joining Scheduler thread" <<
endmsg;
346 error() <<
"problems in scheduler thread" <<
endmsg;
366 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
368 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
381 info() <<
"Start checking the actionsQueue" <<
endmsg;
382 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383 m_actionsQueue.pop( thisAction );
386 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
391 info() <<
"Terminating thread-pool resources" <<
endmsg;
392 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393 error() <<
"Problems terminating thread pool" <<
endmsg;
409 if ( m_isActive == ACTIVE ) {
411 m_actionsQueue.push( [
this]() {
return this->m_drain(); } );
414 m_isActive = INACTIVE;
433 unsigned int index = m_algname_index_map[algoname];
452 if ( !eventContext ) {
453 fatal() <<
"Event context is nullptr" <<
endmsg;
457 if ( m_freeSlots.load() == 0 ) {
458 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"A free processing slot could not be found." <<
endmsg;
467 const unsigned int thisSlotNum = eventContext->
slot();
468 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
470 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
474 debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
475 thisSlot.
reset( eventContext );
482 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
487 if ( this->updateStates( thisSlotNum ).isFailure() ) {
488 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
497 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
498 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
500 m_actionsQueue.push(
action );
509 for (
auto context : eventContexts ) {
510 sc = pushNewEvent( context );
526 unsigned int slotNum = 0;
527 for (
auto& thisSlot : m_eventSlots ) {
528 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
529 updateStates( slotNum );
543 if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
550 m_finishedEvents.pop( eventContext );
553 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
564 if ( m_finishedEvents.try_pop( eventContext ) ) {
566 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 584 m_freeSlots.store( 0 );
586 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
589 m_algExecStateSvc->dump( ost, *eventContext );
591 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
593 dumpSchedulerState( -1 );
595 m_precSvc->dumpPrecedenceRules( m_eventSlots[eventContext->
slot()] );
599 while ( m_actionsQueue.try_pop( thisAction ) ) {
605 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
606 m_finishedEvents.push( thisEvtContext );
608 m_finishedEvents.push( eventContext );
638 const int eventsSlotsSize( m_eventSlots.size() );
639 eventSlotsPtrs.
reserve( eventsSlotsSize );
640 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
641 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
646 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
649 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
653 auto& thisSlot = m_eventSlots[iSlot];
657 if ( algo_index >= 0 ) {
661 if ( !inputContext || iSlot != (
int)inputContext->
slot() || inputContext == thisSlot.eventContext ) {
662 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
663 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
668 unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
669 if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
670 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot <<
endmsg;
679 if ( !m_optimizationMode.empty() ) {
680 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
681 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
685 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
686 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
688 while ( !buffer.
empty() ) {
689 bool IOBound =
false;
690 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
693 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
695 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
698 if ( partial_sc.isFailure() )
699 verbose() <<
"Could not apply transition from " 701 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
707 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
708 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
711 bool IOBound =
false;
712 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
715 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
717 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
721 verbose() <<
"Could not apply transition from " 723 << index2algname( algIndex ) <<
" on processing slot " << iSlot <<
endmsg;
728 if ( thisSlot.subSlotAlgsReady.size() ) {
731 failedAlgs.
reserve( thisSlot.subSlotAlgsReady.size() );
734 for (
auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
736 if ( m_algosInFlight < m_maxAlgosInFlight ) {
737 partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
743 failedAlgs.
insert( failedAlgs.
end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
749 thisSlot.subSlotAlgsReady = failedAlgs;
752 if ( m_dumpIntraEventDynamics ) {
754 s << index2algname( algo_index ) <<
", " << thisAlgsStates.
sizeOfSubset( State::CONTROLREADY ) <<
", " 758 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
760 myfile.
open(
"IntraEventConcurrencyDynamics_" +
threads +
"T.csv", std::ios::app );
766 if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
767 thisSlot.subSlotAlgsReady.empty() &&
772 thisSlot.complete =
true;
776 m_finishedEvents.push( thisSlot.eventContext );
778 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " << thisSlot.eventContext->slot()
783 if ( msgLevel(
MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) <<
endmsg;
785 thisSlot.eventContext =
nullptr;
787 StatusCode eventStalledSC = isStalled( iSlot );
790 eventFailed( thisSlot.eventContext ).ignore();
811 EventSlot& thisSlot = m_eventSlots[iSlot];
813 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
817 info() <<
"About to declare a stall" <<
endmsg;
818 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
819 dumpSchedulerState( iSlot );
840 outputMessageStream <<
"============================== Execution Task State =============================" 842 dumpState( outputMessageStream );
844 outputMessageStream << std::endl
845 <<
"============================== Scheduler State =================================" 849 for (
auto& thisSlot : m_eventSlots ) {
851 if ( thisSlot.complete )
continue;
853 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
854 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
856 if ( 0 > iSlot or iSlot == slotCount ) {
859 outputMessageStream <<
"\nControl Flow and FSM states:" <<
std::endl;
860 outputMessageStream << m_precSvc->printState( thisSlot ) <<
std::endl;
863 if ( thisSlot.allSubSlots.size() ) {
864 outputMessageStream <<
"\nSub-slots:" << thisSlot.allSubSlots.size() <<
std::endl;
865 outputMessageStream <<
"Sub-slot algorithms ready:" << thisSlot.subSlotAlgsReady.size() <<
std::endl;
870 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
872 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
882 const std::string& algName( index2algname( iAlgo ) );
884 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
886 if ( sc.isSuccess() ) {
889 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
890 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
897 if ( -100 != m_threadPoolSize ) {
899 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
900 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
902 tbb::task::enqueue( *algoTask );
905 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
910 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
911 <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
921 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
925 if ( msgLevel(
MSG::VERBOSE ) ) dumpSchedulerState( -1 );
932 debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
946 const std::string& algName( index2algname( iAlgo ) );
948 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
950 if ( sc.isSuccess() ) {
952 ++m_IOBoundAlgosInFlight;
956 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
957 m_IOBoundAlgScheduler->push( *theTask );
960 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
961 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
971 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
977 verbose() <<
"[Asynchronous] Promoting " << algName <<
" to SCHEDULED on slot " << si <<
endmsg;
981 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " 998 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1001 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1002 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1011 debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1014 state = State::EVTACCEPTED;
1016 state = State::EVTREJECTED;
1025 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1026 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1035 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1036 << m_algosInFlight <<
endmsg;
1039 m_actionsQueue.push( [
this, iAlgo, eventContext]() {
return this->updateStates( -1, iAlgo, eventContext ); } );
1054 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1057 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1058 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1062 m_IOBoundAlgosInFlight--;
1067 debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1070 state = State::EVTACCEPTED;
1072 state = State::EVTREJECTED;
1081 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1082 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1087 verbose() <<
"[Asynchronous] Promoting " << algo->name() <<
" on slot " << si <<
" to " 1091 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1092 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1095 m_actionsQueue.push( [
this, iAlgo, eventContext]() {
return this->updateStates( -1, iAlgo, eventContext ); } );
1116 m_sState.erase( itr );
1121 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
1131 for (
auto it : m_sState ) {
1143 ost <<
"dumping Executing Threads: [" << m_sState.size() <<
"]" <<
std::endl;
1155 int const topSlotIndex = sourceContext->
slot();
1156 EventSlot& topSlot = m_eventSlots[topSlotIndex];
1165 if ( viewContext ) {
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Header file for class GaudiAlgorithm.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
virtual StatusCode scheduleEventView(EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
Method to inform the scheduler about event views.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
static std::list< SchedulerState > m_sState
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
static std::mutex m_ssMut
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void dumpState() override
std::string fullKey() const
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)