18 #include <unordered_set> 21 #include "boost/algorithm/string.hpp" 22 #include "boost/thread.hpp" 23 #include "boost/tokenizer.hpp" 25 #include "tbb/task_scheduler_init.h" 30 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 31 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 35 struct DataObjIDSorter {
69 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
72 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
73 if ( !m_threadPoolSvc.isValid() ) {
74 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
79 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
80 m_thread =
std::thread( [
this]() { this->activate(); } );
82 while ( m_isActive != ACTIVE ) {
84 fatal() <<
"Terminating initialization" <<
endmsg;
87 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
92 if ( m_enableCondSvc ) {
94 m_condSvc = serviceLocator()->service(
"CondSvc" );
95 if ( !m_condSvc.isValid() ) {
96 warning() <<
"No CondSvc found, or not enabled. " 97 <<
"Will not manage CondAlgorithms" <<
endmsg;
98 m_enableCondSvc =
false;
103 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
104 if ( !m_algResourcePool.isValid() ) {
105 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
109 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
110 if ( !m_algExecStateSvc.isValid() ) {
111 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
116 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
117 if ( !m_whiteboard.isValid() ) {
118 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
123 if ( m_useIOBoundAlgScheduler ) {
124 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
125 if ( !m_IOBoundAlgScheduler.isValid() )
126 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
130 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
133 m_freeSlots = m_maxEventsInFlight;
137 const unsigned int algsNumber = algos.
size();
138 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
153 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
156 auto r = globalOutp.
insert(
id );
158 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 159 "though, or control flow may guarantee only one runs...!" 166 ostdd <<
"Data Dependencies for Algorithms:";
171 if (
nullptr == algoPtr ) {
172 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
173 <<
": this will result in a crash." <<
endmsg;
177 ostdd <<
"\n " << algoPtr->
name();
184 ostdd <<
"\n o INPUT " << id;
185 if (
id.key().find(
":" ) != std::string::npos ) {
186 ostdd <<
" contains alternatives which require resolution...\n";
187 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
191 if ( itok != tokens.end() ) {
192 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
193 id.updateKey( *itok );
195 error() <<
"failed to find alternate in global output list" 196 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
197 m_showDataDeps =
true;
200 algoDependencies.
insert(
id );
204 ostdd <<
"\n o OUTPUT " << *id;
205 if ( id->key().find(
":" ) != std::string::npos ) {
206 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 208 m_showDataDeps =
true;
214 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
217 if ( m_showDataDeps ) {
225 for (
auto o : globalInp )
226 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
228 if ( unmetDep.
size() > 0 ) {
231 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232 ost <<
"\n o " << *o <<
" required by Algorithm: ";
234 for (
const auto& p : algosDependenciesMap )
235 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
238 if ( !m_useDataLoader.empty() ) {
243 if ( algo->name() == m_useDataLoader ) {
244 dataLoaderAlg = algo;
248 if ( dataLoaderAlg ==
nullptr ) {
249 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
250 <<
"\" found, and unmet INPUT dependencies " 256 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 257 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
262 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
266 for (
auto&
id : unmetDep ) {
267 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 268 << dataLoaderAlg->name() <<
endmsg;
273 fatal() <<
"Auto DataLoading not requested, " 274 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
279 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
284 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
285 if ( !m_precSvc.isValid() ) {
286 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
291 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
296 m_algname_vect.resize( algsNumber );
300 m_algname_index_map[
name] = index;
301 m_algname_vect.at( index ) =
name;
306 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
308 m_eventSlots.reserve( m_maxEventsInFlight );
309 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311 m_eventSlots.back().complete =
true;
313 m_actionsCounts.assign( m_maxEventsInFlight, 0 );
315 if ( m_threadPoolSize > 1 ) {
316 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
320 info() <<
"Concurrency level information:" <<
endmsg;
321 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
322 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
324 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
326 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
329 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
342 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
345 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
347 info() <<
"Joining Scheduler thread" <<
endmsg;
352 error() <<
"problems in scheduler thread" <<
endmsg;
373 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
375 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
376 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
389 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
390 m_actionsQueue.pop( thisAction );
395 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
401 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
402 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
403 error() <<
"Problems terminating thread pool" <<
endmsg;
419 if ( m_isActive == ACTIVE ) {
422 m_freeSlots.store( 0 );
426 while ( m_actionsQueue.try_pop( thisAction ) ) {
432 m_isActive = INACTIVE;
450 unsigned int index = m_algname_index_map[algoname];
466 if ( !eventContext ) {
467 fatal() <<
"Event context is nullptr" <<
endmsg;
471 if ( m_freeSlots.load() == 0 ) {
472 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
481 const unsigned int thisSlotNum = eventContext->
slot();
482 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
484 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
488 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
489 thisSlot.
reset( eventContext );
496 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
497 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
501 if ( this->updateStates( thisSlotNum ).isFailure() ) {
502 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
512 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
513 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
516 m_actionsQueue.push(
action );
526 for (
auto context : eventContexts ) {
527 sc = pushNewEvent( context );
544 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
551 m_finishedEvents.pop( eventContext );
553 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
564 if ( m_finishedEvents.try_pop( eventContext ) ) {
565 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 586 const int source_slot )
597 const int eventsSlotsSize( m_eventSlots.size() );
598 eventSlotsPtrs.
reserve( eventsSlotsSize );
599 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
600 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
605 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
608 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
612 auto& thisSlot = m_eventSlots[iSlot];
616 if ( algo_index >= 0 ) {
620 if ( sub_slot == -1 || iSlot != source_slot ) {
621 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
622 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
626 if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
627 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot <<
" of " << iSlot <<
endmsg;
636 if ( !m_optimizationMode.empty() ) {
637 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
638 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
642 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it )
644 while ( !buffer.
empty() ) {
645 bool IOBound =
false;
646 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
649 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
651 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
653 ON_VERBOSE if ( partial_sc.isFailure() )
verbose() <<
"Could not apply transition from " << AState::DATAREADY
654 <<
" for algorithm " << index2algname( buffer.
top() )
655 <<
" on processing slot " << iSlot <<
endmsg;
661 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
664 bool IOBound =
false;
665 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
668 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
670 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
672 ON_VERBOSE if ( partial_sc.isFailure() )
verbose() <<
"Could not apply transition from " << AState::DATAREADY
673 <<
" for algorithm " << index2algname( algIndex )
674 <<
" on processing slot " << iSlot <<
endmsg;
679 for (
auto& subslot : thisSlot.allSubSlots ) {
680 auto& subslotStates = subslot.algsStates;
681 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
683 partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
691 if ( m_dumpIntraEventDynamics ) {
693 s << index2algname( algo_index ) <<
", " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 697 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
699 myfile.
open(
"IntraEventConcurrencyDynamics_" +
threads +
"T.csv", std::ios::app );
705 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
706 !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
707 !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
708 !thisSlot.complete ) {
710 thisSlot.complete =
true;
714 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 715 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
716 m_finishedEvents.push( thisSlot.eventContext.release() );
722 thisSlot.eventContext.reset(
nullptr );
724 }
else if ( isStalled( thisSlot ) ) {
726 eventFailed( thisSlot.eventContext.get() );
748 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
765 const uint slotIdx = eventContext->
slot();
767 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
769 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
772 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
775 m_eventSlots[slotIdx].complete =
true;
776 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
791 outputMS <<
"Dumping scheduler state\n" 792 <<
"=========================================================================================\n" 793 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 794 <<
"=========================================================================================\n\n";
798 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 799 <<
"------------------\n\n";
802 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc", false );
803 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
804 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
809 for (
auto& slot : m_eventSlots )
810 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
811 if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
814 for (
auto& slot : m_eventSlots ) {
815 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
818 const std::string algoName{index2algname( (uint)*it )};
820 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 821 << slot.eventContext->slot();
824 if ( timelineSvc.isValid() ) {
827 te.slot = slot.eventContext->slot();
828 te.event = slot.eventContext->evt();
830 if ( timelineSvc->getTimelineEvent( te ) )
833 outputMS <<
" thread.id: [unknown]";
838 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
845 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 846 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
849 for (
auto& slot : m_eventSlots ) {
851 if ( slot.complete )
continue;
853 outputMS <<
"[ slot: " 854 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
856 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
859 if ( 0 > iSlot || iSlot == slotCount ) {
862 outputMS << m_precSvc->printState( slot ) <<
"\n";
881 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
882 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
885 outputMS <<
"\n=========================================================================================\n" 886 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 887 <<
"=========================================================================================\n\n";
899 const std::string& algName( index2algname( iAlgo ) );
901 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
903 if ( sc.isSuccess() ) {
906 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
907 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
914 if ( -100 != m_threadPoolSize ) {
916 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
917 AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
919 tbb::task::enqueue( *algoTask );
922 AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
923 promote2ExecutedClosure );
927 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 928 << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
935 size_t const subSlotIndex = eventContext->
subSlot();
936 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
939 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
948 ON_DEBUG debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si
963 const std::string& algName( index2algname( iAlgo ) );
965 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
967 if ( sc.isSuccess() ) {
969 ++m_IOBoundAlgosInFlight;
970 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
971 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
980 IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
981 m_IOBoundAlgScheduler->push( *theTask );
983 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
984 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
991 size_t const subSlotIndex = eventContext->
subSlot();
992 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
995 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
999 <<
" to SCHEDULED on slot " << si <<
endmsg;
1002 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1003 <<
" on slot " << si <<
endmsg;
1017 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1020 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1021 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1029 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1031 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1033 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1037 int subSlotIndex = -1;
1040 subSlotIndex = eventContext->
subSlot();
1041 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1050 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1051 << m_algosInFlight <<
endmsg;
1054 ++m_actionsCounts[si];
1055 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1056 --this->m_actionsCounts[si];
1057 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1072 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1075 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1076 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1080 --m_IOBoundAlgosInFlight;
1084 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1087 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1089 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1093 int subSlotIndex = -1;
1096 subSlotIndex = eventContext->
subSlot();
1097 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1104 <<
" to " << state <<
endmsg;
1106 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1107 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1110 ++m_actionsCounts[si];
1111 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1112 --this->m_actionsCounts[si];
1113 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1128 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1137 [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(), &nodeName ]()->
StatusCode 1141 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1143 if ( viewContextPtr ) {
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
constexpr static const auto FAILURE
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
const std::string & name() const override
The identifying name of the algorithm object.
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
StatusCode finalize() override
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
IDataHandleMetadata::AccessMode AccessMode
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
void addDataDependency(const DataObjID &key, AccessMode access) final override
Add a data dependency, even after initialization.
A service to resolve the task execution precedence.
void activate()
Activate scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr static const auto SUCCESS
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
const DataObjIDColl & dataDependencies(AccessMode access) const final override
Tell which whiteboard keys the algorithm will be reading or writing.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
ContextID_t subSlot() const
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.