20 #include <unordered_set> 23 #include "boost/algorithm/string.hpp" 24 #include "boost/thread.hpp" 25 #include "boost/tokenizer.hpp" 27 #include "tbb/task_scheduler_init.h" 37 struct DataObjIDSorter {
66 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
69 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
70 if ( !m_threadPoolSvc.isValid() ) {
71 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
76 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
79 while ( m_isActive != ACTIVE ) {
81 fatal() <<
"Terminating initialization" <<
endmsg;
84 info() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
89 if ( m_enableCondSvc ) {
91 m_condSvc = serviceLocator()->service(
"CondSvc" );
92 if ( !m_condSvc.isValid() ) {
93 warning() <<
"No CondSvc found, or not enabled. " 94 <<
"Will not manage CondAlgorithms" <<
endmsg;
95 m_enableCondSvc =
false;
100 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
101 if ( !m_algResourcePool.isValid() ) {
102 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
107 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
108 if ( !m_precSvc.isValid() ) {
109 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
114 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
118 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
119 if ( !m_algExecStateSvc.isValid() ) {
120 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
125 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
126 if ( !m_whiteboard.isValid() ) {
127 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
132 if ( m_useIOBoundAlgScheduler ) {
133 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
134 if ( !m_IOBoundAlgScheduler.isValid() )
135 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
139 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
142 m_freeSlots = m_maxEventsInFlight;
149 const unsigned int algsNumber = algos.
size();
150 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
163 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
166 auto r = globalOutp.
insert(
id );
168 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 169 "though, or control flow may guarantee only one runs...!" 176 ostdd <<
"Data Dependencies for Algorithms:";
181 if (
nullptr == algoPtr ) {
182 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
183 <<
": this will result in a crash." <<
endmsg;
187 ostdd <<
"\n " << algoPtr->
name();
193 ostdd <<
"\n o INPUT " << id;
194 if (
id.key().find(
":" ) != std::string::npos ) {
195 ostdd <<
" contains alternatives which require resolution...\n";
196 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
200 if ( itok != tokens.end() ) {
201 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
202 id.updateKey( *itok );
204 error() <<
"failed to find alternate in global output list" 205 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
206 m_showDataDeps =
true;
209 algoDependencies.
insert(
id );
213 ostdd <<
"\n o OUTPUT " << *id;
214 if ( id->key().find(
":" ) != std::string::npos ) {
215 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 217 m_showDataDeps =
true;
226 if ( m_showDataDeps ) {
231 m_algname_vect.resize( algsNumber );
236 m_algname_index_map[
name] = index;
237 m_algname_vect.at( index ) =
name;
238 if ( algo->name() == m_useDataLoader ) {
239 dataLoaderAlg = algo;
246 for (
auto o : globalInp ) {
247 if ( globalOutp.
find( o ) == globalOutp.
end() ) {
252 if ( unmetDep.
size() > 0 ) {
255 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
256 ost <<
"\n o " << *o <<
" required by Algorithm: ";
257 for (
size_t i = 0; i < m_algosDependencies.
size(); ++i ) {
258 if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].
end() ) {
259 ost <<
"\n * " << m_algname_vect[i];
264 if ( m_useDataLoader !=
"" ) {
266 if ( dataLoaderAlg ==
nullptr ) {
267 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
268 <<
"\" found, and unmet INPUT dependencies " 274 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 275 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
280 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
284 for (
auto&
id : unmetDep ) {
285 debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
291 fatal() <<
"Auto DataLoading not requested, " 292 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
297 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
303 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
305 m_eventSlots.assign( m_maxEventsInFlight,
EventSlot( m_algosDependencies, algsNumber,
307 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
309 if ( m_threadPoolSize > 1 ) {
310 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
314 info() <<
"Concurrency level information:" <<
endmsg;
315 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
316 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
318 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
320 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
323 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
336 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be finalized" <<
endmsg;
339 if ( !sc.
isSuccess() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
341 info() <<
"Joining Scheduler thread" <<
endmsg;
346 error() <<
"problems in scheduler thread" <<
endmsg;
366 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
368 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
381 info() <<
"Start checking the actionsQueue" <<
endmsg;
382 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383 m_actionsQueue.pop( thisAction );
386 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
391 info() <<
"Terminating thread-pool resources" <<
endmsg;
392 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393 error() <<
"Problems terminating thread pool" <<
endmsg;
409 if ( m_isActive == ACTIVE ) {
414 m_isActive = INACTIVE;
433 unsigned int index = m_algname_index_map[algoname];
452 if ( !eventContext ) {
453 fatal() <<
"Event context is nullptr" <<
endmsg;
457 if ( m_freeSlots.load() == 0 ) {
458 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"A free processing slot could not be found." <<
endmsg;
467 const unsigned int thisSlotNum = eventContext->
slot();
468 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
470 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
474 debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
475 thisSlot.
reset( eventContext );
482 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
487 if ( this->updateStates( thisSlotNum ).isFailure() ) {
488 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
497 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
498 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
500 m_actionsQueue.push(
action );
509 for (
auto context : eventContexts ) {
510 sc = pushNewEvent( context );
526 unsigned int slotNum = 0;
527 for (
auto& thisSlot : m_eventSlots ) {
528 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
529 updateStates( slotNum );
543 if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
550 m_finishedEvents.pop( eventContext );
553 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
564 if ( m_finishedEvents.try_pop( eventContext ) ) {
566 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 584 m_freeSlots.store( 0 );
586 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
589 m_algExecStateSvc->dump( ost, *eventContext );
591 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
593 dumpSchedulerState( -1 );
597 while ( m_actionsQueue.try_pop( thisAction ) ) {
603 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
604 m_finishedEvents.push( thisEvtContext );
606 m_finishedEvents.push( eventContext );
628 m_updateNeeded =
true;
638 const int eventsSlotsSize( m_eventSlots.size() );
639 eventSlotsPtrs.
reserve( eventsSlotsSize );
640 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
641 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
646 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
649 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
653 auto& thisSlot = m_eventSlots[iSlot];
657 if ( !algo_name.
empty() ) {
659 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
660 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
668 if ( !m_optimizationMode.empty() ) {
669 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
670 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
674 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
675 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
704 while ( !buffer.
empty() ) {
705 bool IOBound =
false;
706 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
709 partial_sc = promoteToScheduled( buffer.
top(), iSlot );
711 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot );
714 if ( partial_sc.isFailure() )
715 verbose() <<
"Could not apply transition from " 717 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
723 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
724 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
727 bool IOBound =
false;
728 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
731 partial_sc = promoteToScheduled( algIndex, iSlot );
733 partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
737 verbose() <<
"Could not apply transition from " 739 << index2algname( algIndex ) <<
" on processing slot " << iSlot <<
endmsg;
743 if ( m_dumpIntraEventDynamics ) {
745 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset( State::CONTROLREADY ) <<
", " 749 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
757 if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
762 thisSlot.complete =
true;
766 m_finishedEvents.push( thisSlot.eventContext );
768 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " << thisSlot.eventContext->slot()
773 if ( msgLevel(
MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) <<
endmsg;
775 thisSlot.eventContext =
nullptr;
777 StatusCode eventStalledSC = isStalled( iSlot );
780 eventFailed( thisSlot.eventContext ).ignore();
801 EventSlot& thisSlot = m_eventSlots[iSlot];
803 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
806 info() <<
"About to declare a stall" <<
endmsg;
807 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
808 dumpSchedulerState( iSlot );
829 outputMessageStream <<
"============================== Execution Task State =============================" 831 dumpState( outputMessageStream );
833 outputMessageStream << std::endl
834 <<
"============================== Scheduler State =================================" 838 for (
auto& thisSlot : m_eventSlots ) {
840 if ( thisSlot.complete )
continue;
843 if ( msgLevel(
MSG::DEBUG ) ) m_precSvc->dumpPrecedenceRules( thisSlot );
845 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
846 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
848 if ( 0 > iSlot or iSlot == slotCount ) {
849 outputMessageStream <<
"Algorithms states:" <<
std::endl;
851 const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
852 for (
unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
853 outputMessageStream <<
" o " << index2algname( algoIdx ) <<
" [" 855 DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
856 const int depsSize = deps.
size();
857 if ( depsSize == 0 ) outputMessageStream <<
" none";
860 for (
auto d : deps ) {
861 outputMessageStream << d <<
" ";
862 if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
872 outputMessageStream <<
"\nWhiteboard contents: " <<
std::endl;
873 for (
auto& product : wbSlotContent ) outputMessageStream <<
" o " << product <<
std::endl;
876 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
877 outputMessageStream << m_precSvc->printState( thisSlot ) <<
std::endl;
881 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
883 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
893 const std::string& algName( index2algname( iAlgo ) );
895 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
897 if ( sc.isSuccess() ) {
898 EventContext* eventContext( m_eventSlots[si].eventContext );
899 if ( !eventContext ) {
900 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" <<
endmsg;
906 eventContext->
slot(), ialgoPtr, eventContext );
908 if ( -100 != m_threadPoolSize ) {
912 tbb::task* triggerAlgoStateUpdate =
916 triggerAlgoStateUpdate->set_ref_count( 1 );
918 tbb::task* algoTask =
new ( triggerAlgoStateUpdate->allocate_child() )
919 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
921 tbb::task::enqueue( *algoTask );
924 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
926 promote2ExecutedClosure();
930 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
931 <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
935 if ( msgLevel(
MSG::VERBOSE ) ) dumpSchedulerState( -1 );
937 if ( updateSc.isSuccess() )
939 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
943 debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
957 const std::string& algName( index2algname( iAlgo ) );
959 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
961 if ( sc.isSuccess() ) {
962 EventContext* eventContext( m_eventSlots[si].eventContext );
963 if ( !eventContext ) {
964 fatal() <<
"[Asynchronous] Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" 969 ++m_IOBoundAlgosInFlight;
973 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
974 m_IOBoundAlgScheduler->push( *theTask );
977 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
978 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
982 if ( updateSc.isSuccess() )
984 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
988 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " 1003 if ( !castedAlgo ) fatal() <<
"The casting did not succeed!" <<
endmsg;
1007 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
1010 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1013 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1014 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1023 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1024 << m_algosInFlight <<
endmsg;
1028 m_actionsQueue.push( updateAction );
1029 m_updateNeeded =
false;
1032 debug() <<
"Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
1035 state = State::EVTACCEPTED;
1037 state = State::EVTREJECTED;
1044 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1059 if ( !castedAlgo ) fatal() <<
"[Asynchronous] The casting did not succeed!" <<
endmsg;
1063 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
1065 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1068 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1069 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1073 m_IOBoundAlgosInFlight--;
1078 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1079 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1083 m_actionsQueue.push( updateAction );
1084 m_updateNeeded =
false;
1087 debug() <<
"[Asynchronous] Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si
1091 state = State::EVTACCEPTED;
1093 state = State::EVTREJECTED;
1100 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1122 m_sState.erase( itr );
1127 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
1137 for (
auto it : m_sState ) {
1149 ost <<
"dumping Executing Threads: [" << m_sState.size() <<
"]" <<
std::endl;
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Header file for class GaudiAlgorithm.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
static std::list< SchedulerState > m_sState
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
static std::mutex m_ssMut
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void dumpState() override
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
std::string fullKey() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)