20 #include <unordered_set> 23 #include "boost/algorithm/string.hpp" 24 #include "boost/thread.hpp" 25 #include "boost/tokenizer.hpp" 27 #include "tbb/task_scheduler_init.h" 37 struct DataObjIDSorter {
66 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
69 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
70 if ( !m_threadPoolSvc.isValid() ) {
71 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
76 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
79 while ( m_isActive != ACTIVE ) {
81 fatal() <<
"Terminating initialization" <<
endmsg;
84 info() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
89 if ( m_enableCondSvc ) {
91 m_condSvc = serviceLocator()->service(
"CondSvc" );
92 if ( !m_condSvc.isValid() ) {
93 warning() <<
"No CondSvc found, or not enabled. " 94 <<
"Will not manage CondAlgorithms" <<
endmsg;
95 m_enableCondSvc =
false;
100 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
101 if ( !m_algResourcePool.isValid() ) {
102 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
107 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
108 if ( !m_precSvc.isValid() ) {
109 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
114 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
118 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
119 if ( !m_algExecStateSvc.isValid() ) {
120 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
125 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
126 if ( !m_whiteboard.isValid() ) {
127 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
132 if ( m_useIOBoundAlgScheduler ) {
133 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
134 if ( !m_IOBoundAlgScheduler.isValid() )
135 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
139 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
142 m_freeSlots = m_maxEventsInFlight;
149 const unsigned int algsNumber = algos.
size();
150 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
163 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
166 auto r = globalOutp.
insert(
id );
168 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 169 "though, or control flow may guarantee only one runs...!" 176 ostdd <<
"Data Dependencies for Algorithms:";
181 if (
nullptr == algoPtr ) {
182 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
183 <<
": this will result in a crash." <<
endmsg;
187 ostdd <<
"\n " << algoPtr->
name();
193 ostdd <<
"\n o INPUT " << id;
194 if (
id.key().find(
":" ) != std::string::npos ) {
195 ostdd <<
" contains alternatives which require resolution...\n";
196 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
200 if ( itok != tokens.end() ) {
201 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
202 id.updateKey( *itok );
204 error() <<
"failed to find alternate in global output list" 205 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
206 m_showDataDeps =
true;
209 algoDependencies.
insert(
id );
213 ostdd <<
"\n o OUTPUT " << *id;
214 if ( id->key().find(
":" ) != std::string::npos ) {
215 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 217 m_showDataDeps =
true;
226 if ( m_showDataDeps ) {
231 m_algname_vect.resize( algsNumber );
236 m_algname_index_map[
name] = index;
237 m_algname_vect.at( index ) =
name;
238 if ( algo->name() == m_useDataLoader ) {
239 dataLoaderAlg = algo;
246 for (
auto o : globalInp ) {
247 if ( globalOutp.
find( o ) == globalOutp.
end() ) {
252 if ( unmetDep.
size() > 0 ) {
255 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
256 ost <<
"\n o " << *o <<
" required by Algorithm: ";
257 for (
size_t i = 0; i < m_algosDependencies.
size(); ++i ) {
258 if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].
end() ) {
259 ost <<
"\n * " << m_algname_vect[i];
264 if ( m_useDataLoader !=
"" ) {
266 if ( dataLoaderAlg ==
nullptr ) {
267 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
268 <<
"\" found, and unmet INPUT dependencies " 274 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 275 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
280 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
284 for (
auto&
id : unmetDep ) {
285 debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
291 fatal() <<
"Auto DataLoading not requested, " 292 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
297 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
303 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
305 m_eventSlots.assign( m_maxEventsInFlight,
EventSlot( m_algosDependencies, algsNumber,
307 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
309 if ( m_threadPoolSize > 1 ) {
310 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
314 info() <<
"Concurrency level information:" <<
endmsg;
315 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
316 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
318 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
320 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
323 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
336 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be finalized" <<
endmsg;
339 if ( !sc.
isSuccess() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
341 info() <<
"Joining Scheduler thread" <<
endmsg;
346 error() <<
"problems in scheduler thread" <<
endmsg;
366 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
368 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
381 info() <<
"Start checking the actionsQueue" <<
endmsg;
382 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383 m_actionsQueue.pop( thisAction );
386 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
391 info() <<
"Terminating thread-pool resources" <<
endmsg;
392 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393 error() <<
"Problems terminating thread pool" <<
endmsg;
409 if ( m_isActive == ACTIVE ) {
414 m_isActive = INACTIVE;
433 unsigned int index = m_algname_index_map[algoname];
452 if ( !eventContext ) {
453 fatal() <<
"Event context is nullptr" <<
endmsg;
457 if ( m_freeSlots.load() == 0 ) {
458 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"A free processing slot could not be found." <<
endmsg;
467 const unsigned int thisSlotNum = eventContext->
slot();
468 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
470 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
474 debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
475 thisSlot.
reset( eventContext );
479 m_precSvc->iterate( thisSlot, cs );
481 return this->updateStates( thisSlotNum );
486 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
487 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
489 m_actionsQueue.push(
action );
498 for (
auto context : eventContexts ) {
499 sc = pushNewEvent( context );
515 unsigned int slotNum = 0;
516 for (
auto& thisSlot : m_eventSlots ) {
517 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
518 updateStates( slotNum );
532 if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
539 m_finishedEvents.pop( eventContext );
542 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
553 if ( m_finishedEvents.try_pop( eventContext ) ) {
555 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 573 m_freeSlots.store( 0 );
575 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
578 m_algExecStateSvc->dump( ost, *eventContext );
580 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
582 dumpSchedulerState( -1 );
586 while ( m_actionsQueue.try_pop( thisAction ) ) {
592 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
593 m_finishedEvents.push( thisEvtContext );
595 m_finishedEvents.push( eventContext );
617 m_updateNeeded =
true;
627 const int eventsSlotsSize( m_eventSlots.size() );
628 eventSlotsPtrs.
reserve( eventsSlotsSize );
629 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
630 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
635 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
638 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
642 auto& thisSlot = m_eventSlots[iSlot];
646 if ( !algo_name.
empty() ) {
648 m_precSvc->iterate( thisSlot, cs );
654 if ( !m_optimizationMode.empty() ) {
655 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
656 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
660 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
661 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
690 while ( !buffer.
empty() ) {
691 bool IOBound =
false;
692 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
695 partial_sc = promoteToScheduled( buffer.
top(), iSlot );
697 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot );
700 if ( partial_sc.isFailure() )
701 verbose() <<
"Could not apply transition from " 703 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
709 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
710 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
713 bool IOBound =
false;
714 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
717 partial_sc = promoteToScheduled( algIndex, iSlot );
719 partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
723 verbose() <<
"Could not apply transition from " 725 << index2algname( algIndex ) <<
" on processing slot " << iSlot <<
endmsg;
729 if ( m_dumpIntraEventDynamics ) {
731 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset( State::CONTROLREADY ) <<
", " 735 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
743 if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
748 thisSlot.complete =
true;
752 m_finishedEvents.push( thisSlot.eventContext );
754 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " << thisSlot.eventContext->slot()
759 if ( msgLevel(
MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) <<
endmsg;
761 thisSlot.eventContext =
nullptr;
763 StatusCode eventStalledSC = isStalled( iSlot );
766 eventFailed( thisSlot.eventContext ).ignore();
787 EventSlot& thisSlot = m_eventSlots[iSlot];
789 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
792 info() <<
"About to declare a stall" <<
endmsg;
793 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
794 dumpSchedulerState( iSlot );
815 outputMessageStream <<
"============================== Execution Task State =============================" 817 dumpState( outputMessageStream );
819 outputMessageStream << std::endl
820 <<
"============================== Scheduler State =================================" 824 for (
auto& thisSlot : m_eventSlots ) {
826 if ( thisSlot.complete )
continue;
829 if ( msgLevel(
MSG::DEBUG ) ) m_precSvc->dumpPrecedenceRules( thisSlot );
831 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
832 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
834 if ( 0 > iSlot or iSlot == slotCount ) {
835 outputMessageStream <<
"Algorithms states:" <<
std::endl;
837 const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
838 for (
unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
839 outputMessageStream <<
" o " << index2algname( algoIdx ) <<
" [" 841 DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
842 const int depsSize = deps.
size();
843 if ( depsSize == 0 ) outputMessageStream <<
" none";
846 for (
auto d : deps ) {
847 outputMessageStream << d <<
" ";
848 if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
858 outputMessageStream <<
"\nWhiteboard contents: " <<
std::endl;
859 for (
auto& product : wbSlotContent ) outputMessageStream <<
" o " << product <<
std::endl;
862 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
863 outputMessageStream << m_precSvc->printState( thisSlot ) <<
std::endl;
867 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
869 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
879 const std::string& algName( index2algname( iAlgo ) );
881 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
883 if ( sc.isSuccess() ) {
884 EventContext* eventContext( m_eventSlots[si].eventContext );
885 if ( !eventContext ) {
886 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" <<
endmsg;
892 eventContext->
slot(), ialgoPtr, eventContext );
894 if ( -100 != m_threadPoolSize ) {
898 tbb::task* triggerAlgoStateUpdate =
902 triggerAlgoStateUpdate->set_ref_count( 1 );
904 tbb::task* algoTask =
new ( triggerAlgoStateUpdate->allocate_child() )
905 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
907 tbb::task::enqueue( *algoTask );
910 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
912 promote2ExecutedClosure();
916 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
917 <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
921 if ( msgLevel(
MSG::VERBOSE ) ) dumpSchedulerState( -1 );
923 if ( updateSc.isSuccess() )
925 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
929 debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
943 const std::string& algName( index2algname( iAlgo ) );
945 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
947 if ( sc.isSuccess() ) {
948 EventContext* eventContext( m_eventSlots[si].eventContext );
949 if ( !eventContext ) {
950 fatal() <<
"[Asynchronous] Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" 955 ++m_IOBoundAlgosInFlight;
959 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
960 m_IOBoundAlgScheduler->push( *theTask );
963 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
964 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
968 if ( updateSc.isSuccess() )
970 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" to SCHEDULED on slot " << si <<
endmsg;
974 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " 989 if ( !castedAlgo ) fatal() <<
"The casting did not succeed!" <<
endmsg;
993 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
996 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
999 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1000 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1009 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1010 << m_algosInFlight <<
endmsg;
1014 m_actionsQueue.push( updateAction );
1015 m_updateNeeded =
false;
1018 debug() <<
"Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
1021 state = State::EVTACCEPTED;
1023 state = State::EVTREJECTED;
1030 verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1045 if ( !castedAlgo ) fatal() <<
"[Asynchronous] The casting did not succeed!" <<
endmsg;
1049 if ( m_algExecStateSvc->eventStatus( *eventContext ) !=
EventStatus::Success ) eventFailed( eventContext ).ignore();
1051 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1054 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1055 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1059 m_IOBoundAlgosInFlight--;
1064 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1065 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1069 m_actionsQueue.push( updateAction );
1070 m_updateNeeded =
false;
1073 debug() <<
"[Asynchronous] Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si
1077 state = State::EVTACCEPTED;
1079 state = State::EVTREJECTED;
1086 verbose() <<
"[Asynchronous] Promoting " << index2algname( iAlgo ) <<
" on slot " << si <<
" to " 1108 m_sState.erase( itr );
1113 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
1123 for (
auto it : m_sState ) {
1135 ost <<
"dumping Executing Threads: [" << m_sState.size() <<
"]" <<
std::endl;
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Header file for class GaudiAlgorithm.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
static std::list< SchedulerState > m_sState
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
static std::mutex m_ssMut
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void dumpState() override
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
std::string fullKey() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)