22 #include <unordered_set> 25 #include "boost/thread.hpp" 26 #include "boost/tokenizer.hpp" 27 #include "boost/algorithm/string.hpp" 29 #include "tbb/task_scheduler_init.h" 39 struct DataObjIDSorter
74 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
77 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
78 if ( !m_threadPoolSvc.isValid() ) {
79 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
84 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
87 while ( m_isActive != ACTIVE ) {
89 fatal() <<
"Terminating initialization" <<
endmsg;
92 info() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
98 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
99 if ( !m_algResourcePool.isValid() ) {
100 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
104 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc");
105 if (!m_algExecStateSvc.isValid()) {
106 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
111 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
112 if ( !m_whiteboard.isValid() ) {
113 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
118 if ( m_useIOBoundAlgScheduler ) {
119 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
120 if ( !m_IOBoundAlgScheduler.isValid() )
121 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
125 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
128 m_freeSlots = m_maxEventsInFlight;
135 const unsigned int algsNumber = algos.
size();
136 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
149 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
152 auto r = globalOutp.
insert(
id);
154 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" <<
endmsg;
160 ostdd <<
"Data Dependencies for Algorithms:";
165 if (
nullptr == algoPtr ) {
166 fatal() <<
"Could not convert IAlgorithm into Algorithm for " 168 <<
": this will result in a crash." <<
endmsg;
172 ostdd <<
"\n " << algoPtr->
name();
178 ostdd <<
"\n o INPUT " << id;
179 if (
id.key().find(
":")!=std::string::npos) {
180 ostdd <<
" contains alternatives which require resolution...\n";
181 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(),boost::char_separator<char>{
":"}};
186 if (itok!=tokens.end()) {
187 ostdd <<
"found matching output for " << *itok
188 <<
" -- updating scheduler info\n";
191 error() <<
"failed to find alternate in global output list" 192 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name()
194 m_showDataDeps =
true;
197 algoDependencies.
insert(
id );
201 ostdd <<
"\n o OUTPUT " << *id;
202 if (id->key().find(
":")!=std::string::npos) {
203 error() <<
" in Alg " << algoPtr->
name()
204 <<
" alternatives are NOT allowed for outputs! id: " 206 m_showDataDeps =
true;
215 if ( m_showDataDeps ) {
220 m_algname_vect.reserve( algsNumber );
221 unsigned int index = 0;
225 m_algname_index_map[
name] = index;
226 m_algname_vect.emplace_back( name );
227 if (algo->name() == m_useDataLoader) {
228 dataLoaderAlg = algo;
236 for (
auto o : globalInp ) {
237 if ( globalOutp.
find( o ) == globalOutp.
end() ) {
242 if ( unmetDep.
size() > 0 ) {
245 for (
const DataObjID* o : sortedDataObjIDColl (unmetDep) ) {
246 ost <<
"\n o " << *o <<
" required by Algorithm: ";
247 for (
size_t i = 0; i < m_algosDependencies.
size(); ++i ) {
248 if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].
end() ) {
249 ost <<
"\n * " << m_algname_vect[i];
254 if ( m_useDataLoader !=
"" ) {
256 if (dataLoaderAlg ==
nullptr) {
257 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
258 <<
"\" found, and unmet INPUT dependencies " 263 info() <<
"Will attribute the following unmet INPUT dependencies to \"" 264 << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
271 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value()
272 <<
"\" IAlg to Algorithm" <<
endmsg;
276 for (
auto&
id : unmetDep) {
277 debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " 278 << dataLoaderAlg->
type() <<
"/" << dataLoaderAlg->name()
284 fatal() <<
"Auto DataLoading not requested, " 285 <<
"and the following unmet INPUT dependencies were found:" 291 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
298 fatal() <<
"Unable to dcast algResourcePool" <<
endmsg;
301 sc = m_efManager.initialize( algPool->
getPRGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
302 unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
306 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
308 m_eventSlots.assign( m_maxEventsInFlight,
309 EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
310 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
312 if (m_threadPoolSize > 1) {
313 m_maxAlgosInFlight = (size_t) m_threadPoolSize;
317 info() <<
"Concurrency level information:" <<
endmsg;
318 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
319 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
323 if (m_showControlFlow) {
325 <<
"========== Algorithm and Sequence Configuration ==========" 327 info() << m_efg->dumpControlFlow() <<
endmsg;
330 if (m_showDataFlow) {
332 <<
"======================= Data Flow ========================" 334 info() << m_efg->dumpDataFlow() <<
endmsg;
338 if ( m_simulateExecution ) {
340 m_efManager.simulateExecutionFlow( vis );
353 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be finalized" <<
endmsg;
356 if ( !sc.
isSuccess() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
358 info() <<
"Joining Scheduler thread" <<
endmsg;
363 error() <<
"problems in scheduler thread" <<
endmsg;
385 debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
387 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
388 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
400 info() <<
"Start checking the actionsQueue" <<
endmsg;
401 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
402 m_actionsQueue.pop( thisAction );
405 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
410 info() <<
"Terminating thread-pool resources" <<
endmsg;
411 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
412 error() <<
"Problems terminating thread pool" <<
endmsg;
427 if ( m_isActive == ACTIVE ) {
432 m_isActive = INACTIVE;
446 return m_algname_vect[index];
452 unsigned int index = m_algname_index_map[algoname];
470 if ( !eventContext ) {
471 fatal() <<
"Event context is nullptr" <<
endmsg;
475 if ( m_freeSlots.load() == 0 ) {
476 if ( msgLevel(
MSG::DEBUG ) ) debug() <<
"A free processing slot could not be found." <<
endmsg;
485 const unsigned int thisSlotNum = eventContext->
slot();
486 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
488 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
492 debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " 494 thisSlot.
reset( eventContext );
498 m_efManager.touchReadyAlgorithms( vis );
500 return this->updateStates( thisSlotNum );
505 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
506 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
508 m_actionsQueue.push(
action );
516 for (
auto context : eventContexts ) {
517 sc = pushNewEvent( context );
525 return std::max( m_freeSlots.load(), 0 );
534 unsigned int slotNum = 0;
535 for (
auto& thisSlot : m_eventSlots ) {
536 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
537 updateStates( slotNum );
550 if ( m_freeSlots.load() == (int) m_maxEventsInFlight or
551 m_isActive == INACTIVE ) {
558 m_finishedEvents.pop( eventContext );
561 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " 572 if ( m_finishedEvents.try_pop( eventContext ) ) {
574 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 591 m_freeSlots.store( 0 );
593 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " 594 << eventContext->
slot() <<
" failed! ***" <<
endmsg;
597 m_algExecStateSvc->dump(ost, *eventContext);
599 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot()
602 dumpSchedulerState(-1);
606 while ( m_actionsQueue.try_pop( thisAction ) ) {
612 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
613 m_finishedEvents.push( thisEvtContext );
615 m_finishedEvents.push( eventContext );
636 m_updateNeeded =
true;
646 const int eventsSlotsSize( m_eventSlots.size() );
647 eventSlotsPtrs.
reserve( eventsSlotsSize );
648 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
649 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
654 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
657 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
661 auto& thisSlot = m_eventSlots[iSlot];
665 if ( !algo_name.
empty() )
666 m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
672 if ( !m_optimizationMode.empty() ) {
673 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
674 return ( m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( i ) )->getRank() <
675 m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( j ) )->getRank() );
679 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
680 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
708 while ( !buffer.
empty() ) {
709 bool IOBound =
false;
710 if ( m_useIOBoundAlgScheduler )
711 IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( buffer.
top() ) )->isIOBound();
714 partial_sc = promoteToScheduled( buffer.
top(), iSlot );
716 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot );
719 if (partial_sc.isFailure())
720 verbose() <<
"Could not apply transition from " 722 <<
" for algorithm " << index2algname(buffer.
top()) <<
" on processing slot " << iSlot <<
endmsg;
728 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
729 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
732 bool IOBound =
false;
733 if ( m_useIOBoundAlgScheduler )
734 IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode( index2algname( algIndex ) )->isIOBound();
737 partial_sc = promoteToScheduled( algIndex, iSlot );
739 partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
743 verbose() <<
"Could not apply transition from " 745 <<
" for algorithm " << index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
749 if (m_dumpIntraEventDynamics) {
752 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset(State::CONTROLREADY) <<
", " 753 << thisAlgsStates.
sizeOfSubset(State::DATAREADY) <<
", " 754 << thisAlgsStates.
sizeOfSubset(State::SCHEDULED) <<
", " 758 :
std::to_string(tbb::task_scheduler_init::default_num_threads());
766 if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
771 thisSlot.complete =
true;
775 m_finishedEvents.push(thisSlot.eventContext);
777 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 778 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
783 m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
787 thisSlot.eventContext =
nullptr;
792 eventFailed(thisSlot.eventContext).ignore();
812 EventSlot& thisSlot = m_eventSlots[iSlot];
814 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
817 info() <<
"About to declare a stall" <<
endmsg;
818 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
819 dumpSchedulerState( iSlot );
839 outputMessageStream <<
"============================== Execution Task State =============================" 841 dumpState( outputMessageStream );
843 outputMessageStream << std::endl
844 <<
"============================== Scheduler State =================================" 848 for (
auto thisSlot : m_eventSlots ) {
850 if ( thisSlot.complete )
continue;
852 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
853 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
855 if ( 0 > iSlot or iSlot == slotCount ) {
856 outputMessageStream <<
"Algorithms states:" <<
std::endl;
858 const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
859 for (
unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
860 outputMessageStream <<
" o " << index2algname( algoIdx ) <<
" [" 862 DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
863 const int depsSize = deps.
size();
864 if ( depsSize == 0 ) outputMessageStream <<
" none";
867 for (
auto d : deps ) {
868 outputMessageStream << d <<
" ";
869 if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
875 if ( !missing.
empty() ) {
876 outputMessageStream <<
". The following are missing: ";
877 for (
auto d : missing ) {
878 outputMessageStream << d <<
" ";
886 outputMessageStream <<
"\nWhiteboard contents: " <<
std::endl;
887 for (
auto& product : wbSlotContent ) outputMessageStream <<
" o " << product <<
std::endl;
890 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
892 m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
894 outputMessageStream << cFlowStateStringStream.
str() <<
std::endl;
898 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
900 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
909 const std::string& algName( index2algname( iAlgo ) );
911 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
913 if ( sc.isSuccess() ) {
914 EventContext* eventContext( m_eventSlots[si].eventContext );
915 if ( !eventContext ) {
916 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" <<
endmsg;
924 eventContext->
slot(),
928 if (-100 != m_threadPoolSize) {
932 tbb::task* triggerAlgoStateUpdate =
new(tbb::task::allocate_root())
936 triggerAlgoStateUpdate->set_ref_count(1);
938 tbb::task* algoTask =
new(triggerAlgoStateUpdate->allocate_child())
941 tbb::task::enqueue( *algoTask);
944 AlgoExecutionTask theTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
946 promote2ExecutedClosure();
950 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
951 <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
955 if ( msgLevel(
MSG::VERBOSE ) ) dumpSchedulerState( -1 );
957 if (updateSc.isSuccess())
959 verbose() <<
"Promoting " << index2algname(iAlgo) <<
" to SCHEDULED on slot " 964 debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
977 const std::string& algName( index2algname( iAlgo ) );
979 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
981 if ( sc.isSuccess() ) {
982 EventContext* eventContext( m_eventSlots[si].eventContext );
983 if ( !eventContext ) {
984 fatal() <<
"[Asynchronous] Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" 989 ++m_IOBoundAlgosInFlight;
992 IOBoundAlgTask(ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc);
993 m_IOBoundAlgScheduler->push(*theTask);
996 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " 997 << eventContext->
evt() <<
" in slot " << si
998 <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1002 if (updateSc.isSuccess())
1004 verbose() <<
"[Asynchronous] Promoting " << index2algname(iAlgo)
1005 <<
" to SCHEDULED on slot " << si <<
endmsg;
1009 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " 1023 if ( !castedAlgo ) fatal() <<
"The casting did not succeed!" <<
endmsg;
1028 eventFailed(eventContext).ignore();
1031 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1034 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1035 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1044 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1045 << m_algosInFlight <<
endmsg;
1049 m_actionsQueue.push( updateAction );
1050 m_updateNeeded =
false;
1053 debug() <<
"Trying to handle execution result of " << index2algname( iAlgo ) <<
" on slot " << si <<
endmsg;
1056 state = State::EVTACCEPTED;
1058 state = State::EVTREJECTED;
1065 verbose() <<
"Promoting " << index2algname(iAlgo) <<
" on slot " << si <<
" to " 1079 if ( !castedAlgo ) fatal() <<
"[Asynchronous] The casting did not succeed!" <<
endmsg;
1084 eventFailed(eventContext).ignore();
1086 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1089 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1090 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1094 m_IOBoundAlgosInFlight--;
1099 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1100 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1104 m_actionsQueue.push( updateAction );
1105 m_updateNeeded =
false;
1108 debug() <<
"[Asynchronous] Trying to handle execution result of " 1109 << index2algname(iAlgo) <<
" on slot " << si <<
endmsg;
1112 state = State::EVTACCEPTED;
1114 state = State::EVTREJECTED;
1121 verbose() <<
"[Asynchronous] Promoting " << index2algname(iAlgo) <<
" on slot " 1143 m_sState.erase( itr );
1148 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
1158 for (
auto it : m_sState ) {
1170 ost <<
"dumping Executing Threads: [" << m_sState.size() <<
"]" <<
std::endl;
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
const std::string & name() const override
The identifying name of the algorithm object.
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
Header file for class GaudiAlgorithm.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
T duration_cast(T...args)
void activate()
Activate scheduler.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
static std::list< SchedulerState > m_sState
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
static std::mutex m_ssMut
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
void dumpState() override
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
std::string fullKey() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)