19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 35 struct DataObjIDSorter {
66 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
69 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
70 if ( !m_threadPoolSvc.isValid() ) {
71 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
76 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
77 m_thread =
std::thread( [
this]() { this->activate(); } );
79 while ( m_isActive != ACTIVE ) {
81 fatal() <<
"Terminating initialization" <<
endmsg;
84 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
89 if ( m_enableCondSvc ) {
91 m_condSvc = serviceLocator()->service(
"CondSvc" );
92 if ( !m_condSvc.isValid() ) {
93 warning() <<
"No CondSvc found, or not enabled. " 94 <<
"Will not manage CondAlgorithms" <<
endmsg;
95 m_enableCondSvc =
false;
100 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
101 if ( !m_algResourcePool.isValid() ) {
102 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
106 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
107 if ( !m_algExecStateSvc.isValid() ) {
108 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
113 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
114 if ( !m_whiteboard.isValid() ) {
115 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
120 if ( m_useIOBoundAlgScheduler ) {
121 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
122 if ( !m_IOBoundAlgScheduler.isValid() )
123 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
127 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
130 m_freeSlots = m_maxEventsInFlight;
134 const unsigned int algsNumber = algos.
size();
135 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
148 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
152 auto r = globalOutp.
insert(
id );
154 warning() <<
"multiple algorithms declare " <<
id 155 <<
" as output! could be a single instance in multiple paths " 156 "though, or control flow may guarantee only one runs...!" 163 ostdd <<
"Data Dependencies for Algorithms:";
168 if (
nullptr == algoPtr ) {
169 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
170 <<
": this will result in a crash." <<
endmsg;
174 ostdd <<
"\n " << algoPtr->
name();
180 ostdd <<
"\n o INPUT " << id;
181 if (
id.key().find(
":" ) != std::string::npos ) {
182 ostdd <<
" contains alternatives which require resolution...\n";
183 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
187 if ( itok != tokens.end() ) {
188 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
189 id.updateKey( *itok );
191 error() <<
"failed to find alternate in global output list" 192 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
193 m_showDataDeps =
true;
196 algoDependencies.
insert(
id );
200 ostdd <<
"\n o OUTPUT " << *id;
201 if ( id->key().find(
":" ) != std::string::npos ) {
202 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 204 m_showDataDeps =
true;
210 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
213 if ( m_showDataDeps ) { info() << ostdd.
str() <<
endmsg; }
219 for (
auto o : globalInp )
220 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
222 if ( unmetDep.
size() > 0 ) {
225 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
226 ost <<
"\n o " << *o <<
" required by Algorithm: ";
228 for (
const auto& p : algosDependenciesMap )
229 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
232 if ( !m_useDataLoader.empty() ) {
237 if ( algo->name() == m_useDataLoader ) {
238 dataLoaderAlg = algo;
242 if ( dataLoaderAlg ==
nullptr ) {
243 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
244 <<
"\" found, and unmet INPUT dependencies " 250 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 251 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
256 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm" 261 for (
auto&
id : unmetDep ) {
262 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 263 << dataLoaderAlg->name() <<
endmsg;
268 fatal() <<
"Auto DataLoading not requested, " 269 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
274 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
279 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
280 if ( !m_precSvc.isValid() ) {
281 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
286 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
291 m_algname_vect.resize( algsNumber );
295 m_algname_index_map[
name] = index;
296 m_algname_vect.at( index ) =
name;
301 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
303 m_eventSlots.reserve( m_maxEventsInFlight );
304 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
306 m_eventSlots.back().complete =
true;
308 m_actionsCounts.assign( m_maxEventsInFlight, 0 );
310 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
313 info() <<
"Concurrency level information:" <<
endmsg;
314 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
315 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
317 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
319 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
322 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
334 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
337 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
339 info() <<
"Joining Scheduler thread" <<
endmsg;
344 error() <<
"problems in scheduler thread" <<
endmsg;
364 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
366 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
367 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
380 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
381 m_actionsQueue.pop( thisAction );
385 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
391 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
392 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393 error() <<
"Problems terminating thread pool" <<
endmsg;
408 if ( m_isActive == ACTIVE ) {
411 m_freeSlots.store( 0 );
415 while ( m_actionsQueue.try_pop( thisAction ) ) {};
420 m_isActive = INACTIVE;
437 unsigned int index = m_algname_index_map[algoname];
452 if ( !eventContext ) {
453 fatal() <<
"Event context is nullptr" <<
endmsg;
457 if ( m_freeSlots.load() == 0 ) {
458 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
467 const unsigned int thisSlotNum = eventContext->
slot();
468 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
470 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
474 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
475 thisSlot.
reset( eventContext );
482 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
487 if ( this->updateStates( thisSlotNum ).isFailure() ) {
488 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
497 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
498 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
501 m_actionsQueue.push(
action );
510 for (
auto context : eventContexts ) {
511 sc = pushNewEvent( context );
527 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
534 m_finishedEvents.pop( eventContext );
536 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
546 if ( m_finishedEvents.try_pop( eventContext ) ) {
547 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 568 const int source_slot ) {
578 const int eventsSlotsSize( m_eventSlots.size() );
579 eventSlotsPtrs.
reserve( eventsSlotsSize );
580 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
581 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
586 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
589 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
593 auto& thisSlot = m_eventSlots[iSlot];
597 if ( algo_index >= 0 ) {
601 if ( sub_slot == -1 || iSlot != source_slot ) {
602 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
603 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
607 if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
608 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot <<
" of " << iSlot <<
endmsg;
617 if ( !m_optimizationMode.empty() ) {
618 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
619 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
623 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it )
625 while ( !buffer.
empty() ) {
626 bool IOBound =
false;
627 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
630 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
632 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
635 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " 636 << index2algname( buffer.
top() ) <<
" on processing slot " << iSlot <<
endmsg;
642 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
645 bool IOBound =
false;
646 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
649 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
651 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
654 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << index2algname( algIndex )
655 <<
" on processing slot " << iSlot <<
endmsg;
660 for (
auto& subslot : thisSlot.allSubSlots ) {
661 auto& subslotStates = subslot.algsStates;
662 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
664 partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
672 if ( m_dumpIntraEventDynamics ) {
674 s << ( algo_index != -1 ? index2algname( algo_index ) :
"START" ) <<
", " 675 << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 679 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
681 myfile.
open(
"IntraEventFSMOccupancy_" +
threads +
"T.csv", std::ios::app );
687 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
688 !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
689 !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
690 !thisSlot.complete ) {
692 thisSlot.complete =
true;
696 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 697 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
698 m_finishedEvents.push( thisSlot.eventContext.release() );
704 thisSlot.eventContext.reset(
nullptr );
706 }
else if ( isStalled( thisSlot ) ) {
708 eventFailed( thisSlot.eventContext.get() );
729 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
745 const uint slotIdx = eventContext->
slot();
747 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
749 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
752 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
755 m_eventSlots[slotIdx].complete =
true;
756 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
770 outputMS <<
"Dumping scheduler state\n" 771 <<
"=========================================================================================\n" 772 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 773 <<
"=========================================================================================\n\n";
777 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 778 <<
"------------------\n\n";
781 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc", false );
782 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
783 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
788 for (
auto& slot : m_eventSlots )
789 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
790 if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
793 for (
auto& slot : m_eventSlots ) {
794 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
799 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 800 << slot.eventContext->slot();
803 if ( timelineSvc.isValid() ) {
806 te.slot = slot.eventContext->slot();
807 te.event = slot.eventContext->evt();
809 if ( timelineSvc->getTimelineEvent( te ) )
812 outputMS <<
" thread.id: [unknown]";
817 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
824 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 825 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
828 for (
auto& slot : m_eventSlots ) {
830 if ( slot.complete )
continue;
832 outputMS <<
"[ slot: " 833 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
835 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
838 if ( 0 > iSlot || iSlot == slotCount ) {
841 outputMS << m_precSvc->printState( slot ) <<
"\n";
844 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
845 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
846 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
847 for (
auto& ss : slot.allSubSlots ) {
848 outputMS <<
"[ slot: " << slotID <<
" sub-slot entry: " << ss.entryPoint <<
" event: " 849 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
851 outputMS << m_precSvc->printState( ss ) <<
"\n";
860 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
861 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
864 outputMS <<
"\n=========================================================================================\n" 865 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 866 <<
"=========================================================================================\n\n";
877 const std::string& algName( index2algname( iAlgo ) );
879 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
881 if ( sc.isSuccess() ) {
884 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
885 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
892 if ( -100 != m_threadPoolSize ) {
894 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
895 AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
897 tbb::task::enqueue( *algoTask );
900 AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
901 promote2ExecutedClosure );
905 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 906 << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
913 size_t const subSlotIndex = eventContext->
subSlot();
914 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
917 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
926 ON_DEBUG debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si
940 const std::string& algName( index2algname( iAlgo ) );
942 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
944 if ( sc.isSuccess() ) {
946 ++m_IOBoundAlgosInFlight;
947 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
948 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
957 IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
958 m_IOBoundAlgScheduler->push( *theTask );
960 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
961 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
968 size_t const subSlotIndex = eventContext->
subSlot();
969 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
972 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
976 <<
"[Asynchronous] Promoting " << algName <<
" to SCHEDULED on slot " << si <<
endmsg;
979 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
980 <<
" on slot " << si <<
endmsg;
993 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
996 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 997 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1005 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1007 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1009 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1013 int subSlotIndex = -1;
1016 subSlotIndex = eventContext->
subSlot();
1017 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1024 <<
"Promoting " << algo->name() <<
" on slot " << si <<
" to " << state <<
endmsg;
1026 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1027 << m_algosInFlight <<
endmsg;
1030 ++m_actionsCounts[si];
1031 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1032 --this->m_actionsCounts[si];
1033 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1047 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1050 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1051 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1055 --m_IOBoundAlgosInFlight;
1059 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1062 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1064 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1068 int subSlotIndex = -1;
1071 subSlotIndex = eventContext->
subSlot();
1072 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1079 <<
"[Asynchronous] Promoting " << algo->name() <<
" on slot " << si <<
" to " << state <<
endmsg;
1081 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1082 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1085 ++m_actionsCounts[si];
1086 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1087 --this->m_actionsCounts[si];
1088 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1102 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1110 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1113 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1115 if ( viewContextPtr ) {
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
StatusCode finalize() override
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
constexpr static const auto SUCCESS
void activate()
Activate scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Base class from which all concrete algorithm classes should be derived.
constexpr static const auto FAILURE
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
ContextID_t subSlot() const
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.