20 #include <unordered_set> 23 #include "boost/algorithm/string.hpp" 24 #include "boost/thread.hpp" 25 #include "boost/tokenizer.hpp" 27 #include "tbb/task_scheduler_init.h" 37 struct DataObjIDSorter {
66 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
69 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
70 if ( !m_threadPoolSvc.isValid() ) {
71 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
76 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
79 while ( m_isActive != ACTIVE ) {
81 fatal() <<
"Terminating initialization" <<
endmsg;
84 info() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
89 if ( m_enableCondSvc ) {
91 m_condSvc = serviceLocator()->service(
"CondSvc" );
92 if ( !m_condSvc.isValid() ) {
93 warning() <<
"No CondSvc found, or not enabled. " 94 <<
"Will not manage CondAlgorithms" <<
endmsg;
95 m_enableCondSvc =
false;
100 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
101 if ( !m_algResourcePool.isValid() ) {
102 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
107 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
108 if ( !m_precSvc.isValid() ) {
109 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
114 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
118 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
119 if ( !m_algExecStateSvc.isValid() ) {
120 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
125 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
126 if ( !m_whiteboard.isValid() ) {
127 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
132 if ( m_useIOBoundAlgScheduler ) {
133 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
134 if ( !m_IOBoundAlgScheduler.isValid() )
135 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
139 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
142 m_freeSlots = m_maxEventsInFlight;
149 const unsigned int algsNumber = algos.
size();
150 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
163 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
166 auto r = globalOutp.
insert(
id );
168 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 169 "though, or control flow may guarantee only one runs...!" 176 ostdd <<
"Data Dependencies for Algorithms:";
181 if (
nullptr == algoPtr ) {
182 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
183 <<
": this will result in a crash." <<
endmsg;
187 ostdd <<
"\n " << algoPtr->
name();
193 ostdd <<
"\n o INPUT " << id;
194 if (
id.key().find(
":" ) != std::string::npos ) {
195 ostdd <<
" contains alternatives which require resolution...\n";
196 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
200 if ( itok != tokens.end() ) {
201 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
202 id.updateKey( *itok );
204 error() <<
"failed to find alternate in global output list" 205 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
206 m_showDataDeps =
true;
209 algoDependencies.
insert(
id );
213 ostdd <<
"\n o OUTPUT " << *id;
214 if ( id->key().find(
":" ) != std::string::npos ) {
215 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 217 m_showDataDeps =
true;
226 if ( m_showDataDeps ) {
231 m_algname_vect.resize( algsNumber );
236 m_algname_index_map[
name] = index;
237 m_algname_vect.at( index ) =
name;
238 if ( algo->name() == m_useDataLoader ) {
239 dataLoaderAlg = algo;
246 for (
auto o : globalInp ) {
247 if ( globalOutp.
find( o ) == globalOutp.
end() ) {
252 if ( unmetDep.
size() > 0 ) {
255 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
256 ost <<
"\n o " << *o <<
" required by Algorithm: ";
257 for (
size_t i = 0; i < m_algosDependencies.
size(); ++i ) {
258 if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].
end() ) {
259 ost <<
"\n * " << m_algname_vect[i];
264 if ( m_useDataLoader !=
"" ) {
266 if ( dataLoaderAlg ==
nullptr ) {
267 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
268 <<
"\" found, and unmet INPUT dependencies " 274 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 275 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
280 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
284 for (
auto&
id : unmetDep ) {
285 debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
291 fatal() <<
"Auto DataLoading not requested, " 292 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
297 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
303 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
305 m_eventSlots.assign( m_maxEventsInFlight,
EventSlot( m_algosDependencies, algsNumber,
307 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
309 if ( m_threadPoolSize > 1 ) {
310 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
314 info() <<
"Concurrency level information:" <<
endmsg;
315 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
316 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
318 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
320 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
323 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
336 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be finalized" <<
endmsg;
339 if ( !sc.
isSuccess() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
341 info() <<
"Joining Scheduler thread" <<
endmsg;
346 error() <<
"problems in scheduler thread" <<
endmsg;
366 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
368 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
381 info() <<
"Start checking the actionsQueue" <<
endmsg;
382 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383 m_actionsQueue.pop( thisAction );
386 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
391 info() <<
"Terminating thread-pool resources" <<
endmsg;
392 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393 error() <<
"Problems terminating thread pool" <<
endmsg;
409 if ( m_isActive == ACTIVE ) {
414 m_isActive = INACTIVE;
433 unsigned int index = m_algname_index_map[algoname];
452 if ( !eventContext ) {
453 fatal() <<
"Event context is nullptr" <<
endmsg;
457 if ( m_freeSlots.load() == 0 ) {
458 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"A free processing slot could not be found." <<
endmsg;
467 const unsigned int thisSlotNum = eventContext->
slot();
468 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
470 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
474 debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
475 thisSlot.
reset( eventContext );
479 m_precSvc->iterate( thisSlot, cs );
481 return this->updateStates( thisSlotNum );
486 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
487 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
489 m_actionsQueue.push(
action );
498 for (
auto context : eventContexts ) {
499 sc = pushNewEvent( context );
515 unsigned int slotNum = 0;
516 for (
auto& thisSlot : m_eventSlots ) {
517 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
518 updateStates( slotNum );
532 if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
539 m_finishedEvents.pop( eventContext );
542 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
553 if ( m_finishedEvents.try_pop( eventContext ) ) {
555 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 573 m_freeSlots.store( 0 );
575 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
578 m_algExecStateSvc->dump( ost, *eventContext );
580 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
582 dumpSchedulerState( -1 );
586 while ( m_actionsQueue.try_pop( thisAction ) ) {
592 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
593 m_finishedEvents.push( thisEvtContext );
595 m_finishedEvents.push( eventContext );
617 m_updateNeeded =
true;
627 const int eventsSlotsSize( m_eventSlots.size() );
628 eventSlotsPtrs.
reserve( eventsSlotsSize );
629 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
630 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
635 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
638 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
642 auto& thisSlot = m_eventSlots[iSlot];
646 if ( !algo_name.
empty() ) {
648 m_precSvc->iterate( thisSlot, cs );
654 if ( !m_optimizationMode.empty() ) {
655 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
656 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
660 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
661 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
690 while ( !buffer.
empty() ) {
691 bool IOBound =
false;
692 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
695 partial_sc = promoteToScheduled( buffer.
top(), iSlot );
697 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot );
700 if ( partial_sc.isFailure() )
701 verbose() <<
"Could not apply transition from " 703 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
709 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
710 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
713 bool IOBound =
false;
714 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
717 partial_sc = promoteToScheduled( algIndex, iSlot );
719 partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
723 verbose() <<
"Could not apply transition from " 725 << index2algname( algIndex ) <<
" on processing slot " << iSlot <<
endmsg;
729 if ( m_dumpIntraEventDynamics ) {
731 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset( State::CONTROLREADY ) <<
", " 735 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
743 if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
748 thisSlot.complete =
true;
752 m_finishedEvents.push( thisSlot.eventContext );
754 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " << thisSlot.eventContext->slot()
759 if ( msgLevel(
MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) <<
endmsg;
761 thisSlot.eventContext =
nullptr;
763 StatusCode eventStalledSC = isStalled( iSlot );
766 eventFailed( thisSlot.eventContext ).ignore();
787 EventSlot& thisSlot = m_eventSlots[iSlot];
789 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
792 info() <<
"About to declare a stall" <<
endmsg;
793 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
794 dumpSchedulerState( iSlot );
815 outputMessageStream <<
"============================== Execution Task State =============================" 817 dumpState( outputMessageStream );
819 outputMessageStream << std::endl
820 <<
"============================== Scheduler State =================================" 824 for (
auto& thisSlot : m_eventSlots ) {
826 if ( thisSlot.complete )
continue;
829 if ( msgLevel(
MSG::DEBUG ) ) m_precSvc->dumpPrecedenceRules( thisSlot );
831 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
832 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
834 if ( 0 > iSlot or iSlot == slotCount ) {
835 outputMessageStream <<
"Algorithms states:" <<
std::endl;
837 const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
838 for (
unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
839 outputMessageStream <<
" o " << index2algname( algoIdx ) <<
" [" 841 DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
842 const int depsSize = deps.
size();
843 if ( depsSize == 0 ) outputMessageStream <<
" none";
846 for (
auto d : deps ) {
847 outputMessageStream << d <<
" ";
848 if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
854 if ( !missing.
empty() ) {
855 outputMessageStream <<
". The following are missing: ";
856 for (
auto d : missing ) {
857 outputMessageStream << d <<
" ";
865 outputMessageStream <<
"\nWhiteboard contents: " <<
std::endl;
866 for (
auto& product : wbSlotContent ) outputMessageStream <<
" o " << product <<
std::endl;
869 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
870 outputMessageStream << m_precSvc->printState( thisSlot ) <<
std::endl;
874 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
876 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
886 const std::string& algName( index2algname( iAlgo ) );
888 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
890 if ( sc.isSuccess() ) {
891 EventContext* eventContext( m_eventSlots[si].eventContext );
892 if ( !eventContext ) {
893 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" <<
endmsg;
899 eventContext->
slot(), ialgoPtr, eventContext );
901 if ( -100 != m_threadPoolSize ) {
905 tbb::task* triggerAlgoStateUpdate =
909 triggerAlgoStateUpdate->set_ref_count( 1 );
911 tbb::task* algoTask =
new ( triggerAlgoStateUpdate->allocate_child() )
912 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
914 tbb::task::enqueue( *algoTask );
917 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
919 promote2ExecutedClosure();
923 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
924 <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
928 if ( msgLevel(
MSG::VERBOSE ) ) dumpSchedulerState( -1 );
930 if ( updateSc.isSuccess() )
932 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
936 debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
950 const std::string& algName( index2algname( iAlgo ) );
952 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
954 if ( sc.isSuccess() ) {
955 EventContext* eventContext( m_eventSlots[si].eventContext );
956 if ( !eventContext ) {
957 fatal() <<
"[Asynchronous] Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" 962 ++m_IOBoundAlgosInFlight;
966 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
967 m_IOBoundAlgScheduler->push( *theTask );
970 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
971 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
975 if ( updateSc.isSuccess() )
977 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
981 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " 996 if ( !castedAlgo ) fatal() <<
"The casting did not succeed!" <<
endmsg;
1000 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
1003 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1006 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1007 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1016 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1017 << m_algosInFlight <<
endmsg;
1021 m_actionsQueue.push( updateAction );
1022 m_updateNeeded =
false;
1025 debug() <<
"Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
1028 state = State::EVTACCEPTED;
1030 state = State::EVTREJECTED;
1037 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1052 if ( !castedAlgo ) fatal() <<
"[Asynchronous] The casting did not succeed!" <<
endmsg;
1056 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
1058 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1061 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1062 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1066 m_IOBoundAlgosInFlight--;
1071 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1072 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1076 m_actionsQueue.push( updateAction );
1077 m_updateNeeded =
false;
1080 debug() <<
"[Asynchronous] Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si
1084 state = State::EVTACCEPTED;
1086 state = State::EVTREJECTED;
1093 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1115 m_sState.erase( itr );
1120 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
1130 for (
auto it : m_sState ) {
1142 ost <<
"dumping Executing Threads: [" << m_sState.size() <<
"]" <<
std::endl;
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Header file for class GaudiAlgorithm.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
static std::list< SchedulerState > m_sState
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
static std::mutex m_ssMut
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void dumpState() override
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
std::string fullKey() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)