19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 36 struct DataObjIDSorter {
65 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
68 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
69 if ( !m_threadPoolSvc.isValid() ) {
70 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
75 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
76 m_thread =
std::thread( [
this]() { this->activate(); } );
78 while ( m_isActive != ACTIVE ) {
80 fatal() <<
"Terminating initialization" <<
endmsg;
83 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
88 if ( m_enableCondSvc ) {
90 m_condSvc = serviceLocator()->service(
"CondSvc" );
91 if ( !m_condSvc.isValid() ) {
92 warning() <<
"No CondSvc found, or not enabled. " 93 <<
"Will not manage CondAlgorithms" <<
endmsg;
94 m_enableCondSvc =
false;
99 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
100 if ( !m_algResourcePool.isValid() ) {
101 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
105 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
106 if ( !m_algExecStateSvc.isValid() ) {
107 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
112 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
113 if ( !m_whiteboard.isValid() ) {
114 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
119 if ( m_useIOBoundAlgScheduler ) {
120 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
121 if ( !m_IOBoundAlgScheduler.isValid() )
122 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
126 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
129 m_freeSlots = m_maxEventsInFlight;
136 const unsigned int algsNumber = algos.
size();
137 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
150 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
153 auto r = globalOutp.
insert(
id );
155 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 156 "though, or control flow may guarantee only one runs...!" 163 ostdd <<
"Data Dependencies for Algorithms:";
168 if (
nullptr == algoPtr ) {
169 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
170 <<
": this will result in a crash." <<
endmsg;
174 ostdd <<
"\n " << algoPtr->
name();
180 ostdd <<
"\n o INPUT " << id;
181 if (
id.key().find(
":" ) != std::string::npos ) {
182 ostdd <<
" contains alternatives which require resolution...\n";
183 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
187 if ( itok != tokens.end() ) {
188 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
189 id.updateKey( *itok );
191 error() <<
"failed to find alternate in global output list" 192 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
193 m_showDataDeps =
true;
196 algoDependencies.
insert(
id );
200 ostdd <<
"\n o OUTPUT " << *id;
201 if ( id->key().find(
":" ) != std::string::npos ) {
202 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 204 m_showDataDeps =
true;
210 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
213 if ( m_showDataDeps ) {
221 for (
auto o : globalInp )
222 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
224 if ( unmetDep.
size() > 0 ) {
227 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
228 ost <<
"\n o " << *o <<
" required by Algorithm: ";
230 for (
const auto& p : algosDependenciesMap )
231 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
234 if ( !m_useDataLoader.empty() ) {
239 if ( algo->name() == m_useDataLoader ) {
240 dataLoaderAlg = algo;
244 if ( dataLoaderAlg ==
nullptr ) {
245 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
246 <<
"\" found, and unmet INPUT dependencies " 252 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 253 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
258 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
262 for (
auto&
id : unmetDep ) {
263 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 264 << dataLoaderAlg->name() <<
endmsg;
269 fatal() <<
"Auto DataLoading not requested, " 270 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
275 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
280 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
281 if ( !m_precSvc.isValid() ) {
282 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
287 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
292 m_algname_vect.resize( algsNumber );
296 m_algname_index_map[
name] = index;
297 m_algname_vect.at( index ) =
name;
302 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
304 m_eventSlots.assign( m_maxEventsInFlight,
306 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
308 if ( m_threadPoolSize > 1 ) {
309 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
313 info() <<
"Concurrency level information:" <<
endmsg;
314 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
315 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
317 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
319 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
322 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
335 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
338 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
340 info() <<
"Joining Scheduler thread" <<
endmsg;
345 error() <<
"problems in scheduler thread" <<
endmsg;
365 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
367 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
368 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
381 while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
382 m_actionsQueue.pop( thisAction );
387 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
393 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
394 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
395 error() <<
"Problems terminating thread pool" <<
endmsg;
411 if ( m_isActive == ACTIVE ) {
413 m_actionsQueue.push( [
this]() {
return this->m_drain(); } );
416 m_isActive = INACTIVE;
435 unsigned int index = m_algname_index_map[algoname];
454 if ( !eventContext ) {
455 fatal() <<
"Event context is nullptr" <<
endmsg;
459 if ( m_freeSlots.load() == 0 ) {
460 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
469 const unsigned int thisSlotNum = eventContext->
slot();
470 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
472 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
476 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
477 thisSlot.
reset( eventContext );
484 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
485 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
489 if ( this->updateStates( thisSlotNum ).isFailure() ) {
490 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
500 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
501 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
503 m_actionsQueue.push(
action );
512 for (
auto context : eventContexts ) {
513 sc = pushNewEvent( context );
529 unsigned int slotNum = 0;
530 for (
auto& thisSlot : m_eventSlots ) {
531 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
532 updateStates( slotNum );
546 if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
553 m_finishedEvents.pop( eventContext );
555 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
566 if ( m_finishedEvents.try_pop( eventContext ) ) {
567 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 585 m_freeSlots.store( 0 );
587 const uint slotIdx = eventContext->
slot();
589 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed! ***" <<
endmsg;
591 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
594 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
598 while ( m_actionsQueue.try_pop( thisAction ) ) {
604 while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
605 m_finishedEvents.push( thisEvtContext );
607 m_finishedEvents.push( eventContext );
637 const int eventsSlotsSize( m_eventSlots.size() );
638 eventSlotsPtrs.
reserve( eventsSlotsSize );
639 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
640 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
645 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
648 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
652 auto& thisSlot = m_eventSlots[iSlot];
656 if ( algo_index >= 0 ) {
660 if ( !inputContext || iSlot != (
int)inputContext->
slot() || inputContext == thisSlot.eventContext ) {
661 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
662 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
667 unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
668 if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
669 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot <<
endmsg;
678 if ( !m_optimizationMode.empty() ) {
679 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
680 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
684 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
685 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
687 while ( !buffer.
empty() ) {
688 bool IOBound =
false;
689 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
692 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
694 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
697 <<
"Could not apply transition from " 699 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
705 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
706 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
709 bool IOBound =
false;
710 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
713 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
715 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
718 <<
"Could not apply transition from " 720 << index2algname( algIndex ) <<
" on processing slot " << iSlot <<
endmsg;
725 if ( thisSlot.subSlotAlgsReady.size() ) {
728 failedAlgs.
reserve( thisSlot.subSlotAlgsReady.size() );
731 for (
auto contextAlgPair = thisSlot.subSlotAlgsReady.begin(); contextAlgPair != thisSlot.subSlotAlgsReady.end();
733 if ( m_algosInFlight < m_maxAlgosInFlight ) {
734 partial_sc = promoteToScheduled( contextAlgPair->second, iSlot, contextAlgPair->first );
740 failedAlgs.
insert( failedAlgs.
end(), contextAlgPair, thisSlot.subSlotAlgsReady.end() );
746 thisSlot.subSlotAlgsReady = failedAlgs;
749 if ( m_dumpIntraEventDynamics ) {
751 s << index2algname( algo_index ) <<
", " << thisAlgsStates.
sizeOfSubset( State::CONTROLREADY ) <<
", " 755 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
757 myfile.
open(
"IntraEventConcurrencyDynamics_" +
threads +
"T.csv", std::ios::app );
763 if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
764 thisSlot.subSlotAlgsReady.empty() &&
769 thisSlot.complete =
true;
773 m_finishedEvents.push( thisSlot.eventContext );
774 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 775 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
781 thisSlot.eventContext =
nullptr;
783 StatusCode eventStalledSC = isStalled( iSlot );
786 eventFailed( thisSlot.eventContext ).ignore();
807 EventSlot& thisSlot = m_eventSlots[iSlot];
809 if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
813 info() <<
"About to declare a stall" <<
endmsg;
814 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
835 outputMS <<
"Dumping scheduler state\n" 836 <<
"=========================================================================================\n" 837 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 838 <<
"=========================================================================================\n\n";
842 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 843 <<
"------------------\n\n";
846 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc", false );
847 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
848 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
853 for (
auto& slot : m_eventSlots )
854 for (
auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
855 it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it )
856 if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
859 for (
auto& slot : m_eventSlots ) {
860 for (
auto it = slot.algsStates.begin( AlgsExecutionStates::State::SCHEDULED );
861 it != slot.algsStates.end( AlgsExecutionStates::State::SCHEDULED ); ++it ) {
863 const std::string algoName{index2algname( (uint)*it )};
865 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 866 << slot.eventContext->slot();
869 if ( timelineSvc.isValid() ) {
872 te.slot = slot.eventContext->slot();
873 te.event = slot.eventContext->evt();
875 if ( timelineSvc->getTimelineEvent( te ) )
878 outputMS <<
" thread.id: [unknown]";
883 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
890 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 891 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
894 for (
auto& slot : m_eventSlots ) {
896 if ( slot.complete )
continue;
898 outputMS <<
"[ slot: " 899 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
901 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
904 if ( 0 > iSlot or iSlot == slotCount ) {
907 outputMS << m_precSvc->printState( slot ) <<
"\n";
910 if ( slot.allSubSlots.size() ) {
911 outputMS <<
"\nNumber of sub-slots:" << slot.allSubSlots.size() <<
"\n";
912 outputMS <<
"Sub-slot algorithms ready:" << slot.subSlotAlgsReady.size() <<
"\n";
920 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
921 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
924 outputMS <<
"\n=========================================================================================\n" 925 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 926 <<
"=========================================================================================\n\n";
938 const std::string& algName( index2algname( iAlgo ) );
940 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
942 if ( sc.isSuccess() ) {
945 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
946 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
953 if ( -100 != m_threadPoolSize ) {
955 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
956 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
958 tbb::task::enqueue( *algoTask );
961 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
965 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 966 << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
976 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
986 ON_DEBUG debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si
1001 const std::string& algName( index2algname( iAlgo ) );
1003 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
1005 if ( sc.isSuccess() ) {
1007 ++m_IOBoundAlgosInFlight;
1011 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
1012 m_IOBoundAlgScheduler->push( *theTask );
1014 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
1015 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1025 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1030 <<
" to SCHEDULED on slot " << si <<
endmsg;
1033 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1034 <<
" on slot " << si <<
endmsg;
1050 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1053 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1054 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1062 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1072 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1073 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1079 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1080 << m_algosInFlight <<
endmsg;
1083 m_actionsQueue.push( [
this, iAlgo, eventContext]() {
return this->updateStates( -1, iAlgo, eventContext ); } );
1098 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1101 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1102 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1106 m_IOBoundAlgosInFlight--;
1110 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1121 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1122 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.updateState( iAlgo, state );
1128 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1129 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1132 m_actionsQueue.push( [
this, iAlgo, eventContext]() {
return this->updateStates( -1, iAlgo, eventContext ); } );
1143 int const topSlotIndex = sourceContext->
slot();
1144 EventSlot& topSlot = m_eventSlots[topSlotIndex];
1153 if ( viewContext ) {
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
constexpr static const auto FAILURE
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
EventContext * eventContext
Cache for the eventContext.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
#define DECLARE_COMPONENT(type)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
virtual StatusCode scheduleEventView(EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
Method to inform the scheduler about event views.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
std::vector< std::pair< EventContext *, int > > subSlotAlgsReady
Quick lookup for data-ready algorithms in sub-slots (top level only)
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode finalize() override
Finalise.
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
constexpr static const auto SUCCESS
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Iterator begin(State kind)
const StatusCode & ignore() const
Ignore/check StatusCode.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
std::string fullKey() const
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
static std::map< State, std::string > stateNames
StatusCode m_drain()
Drain the actions present in the queue.
StatusCode updateState(unsigned int iAlgo, State newState)