19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 36 struct DataObjIDSorter {
70 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
73 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
74 if ( !m_threadPoolSvc.isValid() ) {
75 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
80 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
81 m_thread =
std::thread( [
this]() { this->activate(); } );
83 while ( m_isActive != ACTIVE ) {
85 fatal() <<
"Terminating initialization" <<
endmsg;
88 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
93 if ( m_enableCondSvc ) {
95 m_condSvc = serviceLocator()->service(
"CondSvc" );
96 if ( !m_condSvc.isValid() ) {
97 warning() <<
"No CondSvc found, or not enabled. " 98 <<
"Will not manage CondAlgorithms" <<
endmsg;
99 m_enableCondSvc =
false;
104 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
105 if ( !m_algResourcePool.isValid() ) {
106 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
110 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
111 if ( !m_algExecStateSvc.isValid() ) {
112 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
117 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
118 if ( !m_whiteboard.isValid() ) {
119 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
124 if ( m_useIOBoundAlgScheduler ) {
125 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
126 if ( !m_IOBoundAlgScheduler.isValid() )
127 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
131 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
134 m_freeSlots = m_maxEventsInFlight;
141 const unsigned int algsNumber = algos.
size();
142 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
155 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
158 auto r = globalOutp.
insert(
id );
160 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 161 "though, or control flow may guarantee only one runs...!" 168 ostdd <<
"Data Dependencies for Algorithms:";
173 if (
nullptr == algoPtr ) {
174 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
175 <<
": this will result in a crash." <<
endmsg;
179 ostdd <<
"\n " << algoPtr->
name();
185 ostdd <<
"\n o INPUT " << id;
186 if (
id.key().find(
":" ) != std::string::npos ) {
187 ostdd <<
" contains alternatives which require resolution...\n";
188 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
192 if ( itok != tokens.end() ) {
193 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
194 id.updateKey( *itok );
196 error() <<
"failed to find alternate in global output list" 197 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
198 m_showDataDeps =
true;
201 algoDependencies.
insert(
id );
205 ostdd <<
"\n o OUTPUT " << *id;
206 if ( id->key().find(
":" ) != std::string::npos ) {
207 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 209 m_showDataDeps =
true;
215 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
218 if ( m_showDataDeps ) {
226 for (
auto o : globalInp )
227 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
229 if ( unmetDep.
size() > 0 ) {
232 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
233 ost <<
"\n o " << *o <<
" required by Algorithm: ";
235 for (
const auto& p : algosDependenciesMap )
236 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
239 if ( !m_useDataLoader.empty() ) {
244 if ( algo->name() == m_useDataLoader ) {
245 dataLoaderAlg = algo;
249 if ( dataLoaderAlg ==
nullptr ) {
250 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
251 <<
"\" found, and unmet INPUT dependencies " 257 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 258 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
263 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
267 for (
auto&
id : unmetDep ) {
268 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 269 << dataLoaderAlg->name() <<
endmsg;
274 fatal() <<
"Auto DataLoading not requested, " 275 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
280 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
285 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
286 if ( !m_precSvc.isValid() ) {
287 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
292 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
297 m_algname_vect.resize( algsNumber );
301 m_algname_index_map[
name] = index;
302 m_algname_vect.at( index ) =
name;
307 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
309 m_eventSlots.assign( m_maxEventsInFlight,
311 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
312 m_actionsCounts.assign( m_maxEventsInFlight, 0 );
314 if ( m_threadPoolSize > 1 ) {
315 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
319 info() <<
"Concurrency level information:" <<
endmsg;
320 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
321 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
323 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
325 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
328 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
341 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
344 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
346 info() <<
"Joining Scheduler thread" <<
endmsg;
351 error() <<
"problems in scheduler thread" <<
endmsg;
372 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
374 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
375 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
388 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
389 m_actionsQueue.pop( thisAction );
394 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
400 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
401 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
402 error() <<
"Problems terminating thread pool" <<
endmsg;
418 if ( m_isActive == ACTIVE ) {
421 m_freeSlots.store( 0 );
425 while ( m_actionsQueue.try_pop( thisAction ) ) {
431 m_isActive = INACTIVE;
449 unsigned int index = m_algname_index_map[algoname];
465 if ( !eventContext ) {
466 fatal() <<
"Event context is nullptr" <<
endmsg;
470 if ( m_freeSlots.load() == 0 ) {
471 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
480 const unsigned int thisSlotNum = eventContext->
slot();
481 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
483 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
487 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
488 thisSlot.
reset( eventContext );
495 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
496 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
500 if ( this->updateStates( thisSlotNum ).isFailure() ) {
501 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
511 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
512 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
515 m_actionsQueue.push(
action );
525 for (
auto context : eventContexts ) {
526 sc = pushNewEvent( context );
543 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
550 m_finishedEvents.pop( eventContext );
552 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
563 if ( m_finishedEvents.try_pop( eventContext ) ) {
564 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 595 const int eventsSlotsSize( m_eventSlots.size() );
596 eventSlotsPtrs.
reserve( eventsSlotsSize );
597 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
598 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
603 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
606 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
610 auto& thisSlot = m_eventSlots[iSlot];
614 if ( algo_index >= 0 ) {
618 if ( !inputContext || iSlot != (
int)inputContext->
slot() || inputContext == thisSlot.eventContext ) {
619 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
620 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
625 unsigned int const subSlotIndex = thisSlot.contextToSlot.at( inputContext );
626 if ( m_precSvc->iterate( thisSlot.allSubSlots[subSlotIndex], cs ).isFailure() ) {
627 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot of " << iSlot <<
endmsg;
636 if ( !m_optimizationMode.empty() ) {
637 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
638 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
642 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it )
644 while ( !buffer.
empty() ) {
645 bool IOBound =
false;
646 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
649 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
651 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext );
653 ON_VERBOSE if ( partial_sc.isFailure() )
verbose() <<
"Could not apply transition from " << AState::DATAREADY
654 <<
" for algorithm " << index2algname( buffer.
top() )
655 <<
" on processing slot " << iSlot <<
endmsg;
661 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
664 bool IOBound =
false;
665 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
668 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
670 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext );
673 <<
" for algorithm " << index2algname( algIndex )
674 <<
" on processing slot " << iSlot <<
endmsg;
679 for (
auto& subslot : thisSlot.allSubSlots ) {
680 auto& subslotStates = subslot.algsStates;
681 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
683 partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext );
685 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << index2algname( algIndex )
686 <<
" on processing subslot " << subslot.eventContext->slot() <<
endmsg;
690 if ( m_dumpIntraEventDynamics ) {
692 s << index2algname( algo_index ) <<
", " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 696 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
698 myfile.
open(
"IntraEventConcurrencyDynamics_" +
threads +
"T.csv", std::ios::app );
704 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
705 !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
706 !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
707 !thisSlot.complete ) {
709 thisSlot.complete =
true;
713 m_finishedEvents.push( thisSlot.eventContext );
714 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 715 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
721 thisSlot.eventContext =
nullptr;
723 }
else if ( isStalled( thisSlot ) ) {
725 eventFailed( thisSlot.eventContext );
747 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
765 const uint slotIdx = eventContext->
slot();
767 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
769 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
772 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
775 m_eventSlots[slotIdx].complete =
true;
776 m_finishedEvents.push( eventContext );
791 outputMS <<
"Dumping scheduler state\n" 792 <<
"=========================================================================================\n" 793 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 794 <<
"=========================================================================================\n\n";
798 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 799 <<
"------------------\n\n";
802 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc", false );
803 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
804 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
809 for (
auto& slot : m_eventSlots )
810 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
811 if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
814 for (
auto& slot : m_eventSlots ) {
815 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
818 const std::string algoName{index2algname( (uint)*it )};
820 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 821 << slot.eventContext->slot();
824 if ( timelineSvc.isValid() ) {
827 te.slot = slot.eventContext->slot();
828 te.event = slot.eventContext->evt();
830 if ( timelineSvc->getTimelineEvent( te ) )
833 outputMS <<
" thread.id: [unknown]";
838 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
845 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 846 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
849 for (
auto& slot : m_eventSlots ) {
851 if ( slot.complete )
continue;
853 outputMS <<
"[ slot: " 854 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
856 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
859 if ( 0 > iSlot || iSlot == slotCount ) {
862 outputMS << m_precSvc->printState( slot ) <<
"\n";
865 if ( slot.allSubSlots.size() ) {
866 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n";
867 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
868 for (
auto& ss : slot.allSubSlots ) {
869 outputMS <<
"[ slot: " << slotID <<
" sub-slot entry: " << ss.entryPoint <<
" event: " 870 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
872 outputMS << m_precSvc->printState( ss ) <<
"\n";
881 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
882 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
885 outputMS <<
"\n=========================================================================================\n" 886 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 887 <<
"=========================================================================================\n\n";
899 const std::string& algName( index2algname( iAlgo ) );
901 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
903 if ( sc.isSuccess() ) {
906 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
907 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
914 if ( -100 != m_threadPoolSize ) {
916 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
917 AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
919 tbb::task::enqueue( *algoTask );
922 AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
926 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 927 << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
934 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
937 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
938 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
947 ON_DEBUG debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si
962 const std::string& algName( index2algname( iAlgo ) );
964 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
966 if ( sc.isSuccess() ) {
968 ++m_IOBoundAlgosInFlight;
969 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
970 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
979 IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
980 m_IOBoundAlgScheduler->push( *theTask );
982 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
983 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
990 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
993 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
994 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
998 <<
" to SCHEDULED on slot " << si <<
endmsg;
1001 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1002 <<
" on slot " << si <<
endmsg;
1016 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1019 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1020 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1028 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1030 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1032 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1041 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1042 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1048 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1049 << m_algosInFlight <<
endmsg;
1052 ++m_actionsCounts[si];
1053 m_actionsQueue.push( [
this, si, iAlgo, eventContext]() {
1054 --this->m_actionsCounts[si];
1055 return this->updateStates( -1, iAlgo, eventContext );
1070 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1073 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1074 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1078 --m_IOBoundAlgosInFlight;
1082 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1085 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1087 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1096 unsigned int const subSlotIndex = thisSlot.
contextToSlot.
at( eventContext );
1097 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1101 <<
" to " << state <<
endmsg;
1103 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1104 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1107 ++m_actionsCounts[si];
1108 m_actionsQueue.push( [
this, si, iAlgo, eventContext]() {
1109 --this->m_actionsCounts[si];
1110 return this->updateStates( -1, iAlgo, eventContext );
1124 int const topSlotIndex = sourceContext->
slot();
1125 EventSlot& topSlot = m_eventSlots[topSlotIndex];
1134 if ( viewContext ) {
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
constexpr static const auto FAILURE
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
AlgsExecutionStates algsStates
Vector of algorithms states.
const DataObjIDColl & outputDataObjs() const override
EventContext * eventContext
Cache for the eventContext.
A service to resolve the task execution precedence.
void activate()
Activate scheduler.
std::string entryPoint
Name of the node this slot is attached to ("" for top level)
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
#define DECLARE_COMPONENT(type)
virtual StatusCode scheduleEventView(EventContext const *sourceContext, std::string const &nodeName, EventContext *viewContext) override
Method to inform the scheduler about event views.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode updateStates(int si=-1, int algo_index=-1, EventContext *=nullptr)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
bool complete
Flags completion of the event.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::map< std::string, std::vector< unsigned int > > subSlotsByNode
Listing of sub-slots by the node (name) they are attached to.
constexpr static const auto SUCCESS
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
std::string fullKey() const
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::map< EventContext *, unsigned int > contextToSlot
Quick lookup for sub-slots by event context (top level only)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
const StatusCode & execStatus() const
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.