19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 36 struct DataObjIDSorter {
70 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
73 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
74 if ( !m_threadPoolSvc.isValid() ) {
75 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
80 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
81 m_thread =
std::thread( [
this]() { this->activate(); } );
83 while ( m_isActive != ACTIVE ) {
85 fatal() <<
"Terminating initialization" <<
endmsg;
88 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
93 if ( m_enableCondSvc ) {
95 m_condSvc = serviceLocator()->service(
"CondSvc" );
96 if ( !m_condSvc.isValid() ) {
97 warning() <<
"No CondSvc found, or not enabled. " 98 <<
"Will not manage CondAlgorithms" <<
endmsg;
99 m_enableCondSvc =
false;
104 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
105 if ( !m_algResourcePool.isValid() ) {
106 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
110 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
111 if ( !m_algExecStateSvc.isValid() ) {
112 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
117 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
118 if ( !m_whiteboard.isValid() ) {
119 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
124 if ( m_useIOBoundAlgScheduler ) {
125 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
126 if ( !m_IOBoundAlgScheduler.isValid() )
127 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
131 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
134 m_freeSlots = m_maxEventsInFlight;
138 const unsigned int algsNumber = algos.
size();
139 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
152 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
155 auto r = globalOutp.
insert(
id );
157 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 158 "though, or control flow may guarantee only one runs...!" 165 ostdd <<
"Data Dependencies for Algorithms:";
170 if (
nullptr == algoPtr ) {
171 fatal() <<
"Could not convert IAlgorithm into Algorithm for " << ialgoPtr->
name()
172 <<
": this will result in a crash." <<
endmsg;
176 ostdd <<
"\n " << algoPtr->
name();
182 ostdd <<
"\n o INPUT " << id;
183 if (
id.key().find(
":" ) != std::string::npos ) {
184 ostdd <<
" contains alternatives which require resolution...\n";
185 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
189 if ( itok != tokens.end() ) {
190 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
191 id.updateKey( *itok );
193 error() <<
"failed to find alternate in global output list" 194 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
195 m_showDataDeps =
true;
198 algoDependencies.
insert(
id );
202 ostdd <<
"\n o OUTPUT " << *id;
203 if ( id->key().find(
":" ) != std::string::npos ) {
204 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 206 m_showDataDeps =
true;
212 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
215 if ( m_showDataDeps ) {
223 for (
auto o : globalInp )
224 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
226 if ( unmetDep.
size() > 0 ) {
229 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
230 ost <<
"\n o " << *o <<
" required by Algorithm: ";
232 for (
const auto& p : algosDependenciesMap )
233 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
236 if ( !m_useDataLoader.empty() ) {
241 if ( algo->name() == m_useDataLoader ) {
242 dataLoaderAlg = algo;
246 if ( dataLoaderAlg ==
nullptr ) {
247 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
248 <<
"\" found, and unmet INPUT dependencies " 254 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 255 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
260 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Algorithm" <<
endmsg;
264 for (
auto&
id : unmetDep ) {
265 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 266 << dataLoaderAlg->name() <<
endmsg;
271 fatal() <<
"Auto DataLoading not requested, " 272 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
277 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
282 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
283 if ( !m_precSvc.isValid() ) {
284 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
289 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
294 m_algname_vect.resize( algsNumber );
298 m_algname_index_map[
name] = index;
299 m_algname_vect.at( index ) =
name;
304 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
306 m_eventSlots.reserve( m_maxEventsInFlight );
307 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
309 m_eventSlots.back().complete =
true;
311 m_actionsCounts.assign( m_maxEventsInFlight, 0 );
313 if ( m_threadPoolSize > 1 ) {
314 m_maxAlgosInFlight = (size_t)m_threadPoolSize;
318 info() <<
"Concurrency level information:" <<
endmsg;
319 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
320 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
322 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
324 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
327 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
340 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
343 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
345 info() <<
"Joining Scheduler thread" <<
endmsg;
350 error() <<
"problems in scheduler thread" <<
endmsg;
371 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
373 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
374 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
387 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
388 m_actionsQueue.pop( thisAction );
393 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
399 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
400 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
401 error() <<
"Problems terminating thread pool" <<
endmsg;
417 if ( m_isActive == ACTIVE ) {
420 m_freeSlots.store( 0 );
424 while ( m_actionsQueue.try_pop( thisAction ) ) {
430 m_isActive = INACTIVE;
448 unsigned int index = m_algname_index_map[algoname];
464 if ( !eventContext ) {
465 fatal() <<
"Event context is nullptr" <<
endmsg;
469 if ( m_freeSlots.load() == 0 ) {
470 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
479 const unsigned int thisSlotNum = eventContext->
slot();
480 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
482 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
486 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
487 thisSlot.
reset( eventContext );
494 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
495 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
499 if ( this->updateStates( thisSlotNum ).isFailure() ) {
500 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
510 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
511 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
514 m_actionsQueue.push(
action );
524 for (
auto context : eventContexts ) {
525 sc = pushNewEvent( context );
542 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
549 m_finishedEvents.pop( eventContext );
551 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
562 if ( m_finishedEvents.try_pop( eventContext ) ) {
563 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 584 const int source_slot )
595 const int eventsSlotsSize( m_eventSlots.size() );
596 eventSlotsPtrs.
reserve( eventsSlotsSize );
597 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
598 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
603 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
606 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
610 auto& thisSlot = m_eventSlots[iSlot];
614 if ( algo_index >= 0 ) {
618 if ( sub_slot == -1 || iSlot != source_slot ) {
619 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
620 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
624 if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
625 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot <<
" of " << iSlot <<
endmsg;
634 if ( !m_optimizationMode.empty() ) {
635 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
636 return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
640 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it )
642 while ( !buffer.
empty() ) {
643 bool IOBound =
false;
644 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.
top() ) );
647 partial_sc = promoteToScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
649 partial_sc = promoteToAsyncScheduled( buffer.
top(), iSlot, thisSlotPtr->eventContext.get() );
651 ON_VERBOSE if ( partial_sc.isFailure() )
verbose() <<
"Could not apply transition from " << AState::DATAREADY
652 <<
" for algorithm " << index2algname( buffer.
top() )
653 <<
" on processing slot " << iSlot <<
endmsg;
659 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
662 bool IOBound =
false;
663 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
666 partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
668 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
670 ON_VERBOSE if ( partial_sc.isFailure() )
verbose() <<
"Could not apply transition from " << AState::DATAREADY
671 <<
" for algorithm " << index2algname( algIndex )
672 <<
" on processing slot " << iSlot <<
endmsg;
677 for (
auto& subslot : thisSlot.allSubSlots ) {
678 auto& subslotStates = subslot.algsStates;
679 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
681 partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
689 if ( m_dumpIntraEventDynamics ) {
691 s << index2algname( algo_index ) <<
", " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 695 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
697 myfile.
open(
"IntraEventConcurrencyDynamics_" +
threads +
"T.csv", std::ios::app );
703 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
704 !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
705 !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
706 !thisSlot.complete ) {
708 thisSlot.complete =
true;
712 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 713 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
714 m_finishedEvents.push( thisSlot.eventContext.release() );
720 thisSlot.eventContext.reset(
nullptr );
722 }
else if ( isStalled( thisSlot ) ) {
724 eventFailed( thisSlot.eventContext.get() );
746 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
763 const uint slotIdx = eventContext->
slot();
765 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
767 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
770 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
773 m_eventSlots[slotIdx].complete =
true;
774 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
789 outputMS <<
"Dumping scheduler state\n" 790 <<
"=========================================================================================\n" 791 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 792 <<
"=========================================================================================\n\n";
796 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 797 <<
"------------------\n\n";
800 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc", false );
801 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
802 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
807 for (
auto& slot : m_eventSlots )
808 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
809 if ( index2algname( (uint)*it ).length() > indt ) indt = index2algname( (uint)*it ).length();
812 for (
auto& slot : m_eventSlots ) {
813 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
816 const std::string algoName{index2algname( (uint)*it )};
818 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 819 << slot.eventContext->slot();
822 if ( timelineSvc.isValid() ) {
825 te.slot = slot.eventContext->slot();
826 te.event = slot.eventContext->evt();
828 if ( timelineSvc->getTimelineEvent( te ) )
831 outputMS <<
" thread.id: [unknown]";
836 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
843 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 844 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
847 for (
auto& slot : m_eventSlots ) {
849 if ( slot.complete )
continue;
851 outputMS <<
"[ slot: " 852 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
854 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
857 if ( 0 > iSlot || iSlot == slotCount ) {
860 outputMS << m_precSvc->printState( slot ) <<
"\n";
879 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
880 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
883 outputMS <<
"\n=========================================================================================\n" 884 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 885 <<
"=========================================================================================\n\n";
897 const std::string& algName( index2algname( iAlgo ) );
899 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
901 if ( sc.isSuccess() ) {
904 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
905 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
912 if ( -100 != m_threadPoolSize ) {
914 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
915 AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
917 tbb::task::enqueue( *algoTask );
920 AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
921 promote2ExecutedClosure );
925 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 926 << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
933 size_t const subSlotIndex = eventContext->
subSlot();
934 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
937 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
946 ON_DEBUG debug() <<
"Could not acquire instance for algorithm " << index2algname( iAlgo ) <<
" on slot " << si
961 const std::string& algName( index2algname( iAlgo ) );
963 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
965 if ( sc.isSuccess() ) {
967 ++m_IOBoundAlgosInFlight;
968 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
969 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
978 IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
979 m_IOBoundAlgScheduler->push( *theTask );
981 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
982 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
989 size_t const subSlotIndex = eventContext->
subSlot();
990 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
993 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
997 <<
" to SCHEDULED on slot " << si <<
endmsg;
1000 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
1001 <<
" on slot " << si <<
endmsg;
1015 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1018 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1019 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1027 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1029 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1031 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1035 int subSlotIndex = -1;
1038 subSlotIndex = eventContext->
subSlot();
1039 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1048 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1049 << m_algosInFlight <<
endmsg;
1052 ++m_actionsCounts[si];
1053 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1054 --this->m_actionsCounts[si];
1055 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1070 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1073 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1074 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1078 --m_IOBoundAlgosInFlight;
1082 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1085 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1087 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1091 int subSlotIndex = -1;
1094 subSlotIndex = eventContext->
subSlot();
1095 sc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1102 <<
" to " << state <<
endmsg;
1104 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1105 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1108 ++m_actionsCounts[si];
1109 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1110 --this->m_actionsCounts[si];
1111 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1126 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1135 [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(), &nodeName ]()->
StatusCode 1139 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1141 if ( viewContextPtr ) {
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
constexpr static const auto FAILURE
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
const std::string & name() const override
The identifying name of the algorithm object.
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
StatusCode finalize() override
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
void activate()
Activate scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr static const auto SUCCESS
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
ContextID_t subSlot() const
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.