19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 35 struct DataObjIDSorter {
67 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
70 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
71 if ( !m_threadPoolSvc.isValid() ) {
72 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
77 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
78 m_thread =
std::thread( [
this]() { this->activate(); } );
80 while ( m_isActive != ACTIVE ) {
82 fatal() <<
"Terminating initialization" <<
endmsg;
85 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
90 if ( m_enableCondSvc ) {
92 m_condSvc = serviceLocator()->service(
"CondSvc" );
93 if ( !m_condSvc.isValid() ) {
94 warning() <<
"No CondSvc found, or not enabled. " 95 <<
"Will not manage CondAlgorithms" <<
endmsg;
96 m_enableCondSvc =
false;
101 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
102 if ( !m_algResourcePool.isValid() ) {
103 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
107 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc" );
108 if ( !m_algExecStateSvc.isValid() ) {
109 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
114 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
115 if ( !m_whiteboard.isValid() ) {
116 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
121 if ( m_useIOBoundAlgScheduler ) {
122 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
123 if ( !m_IOBoundAlgScheduler.isValid() )
124 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
128 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
131 m_freeSlots = m_maxEventsInFlight;
135 const unsigned int algsNumber = algos.
size();
136 if ( algsNumber != 0 ) {
137 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
139 error() <<
"No algorithms found" <<
endmsg;
154 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
158 auto r = globalOutp.
insert(
id );
160 warning() <<
"multiple algorithms declare " <<
id 161 <<
" as output! could be a single instance in multiple paths " 162 "though, or control flow may guarantee only one runs...!" 169 ostdd <<
"Data Dependencies for Algorithms:";
174 if (
nullptr == algoPtr ) {
175 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
176 <<
": this will result in a crash." <<
endmsg;
180 ostdd <<
"\n " << algoPtr->
name();
186 ostdd <<
"\n o INPUT " << id;
187 if (
id.key().find(
":" ) != std::string::npos ) {
188 ostdd <<
" contains alternatives which require resolution...\n";
189 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
193 if ( itok != tokens.end() ) {
194 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
195 id.updateKey( *itok );
197 error() <<
"failed to find alternate in global output list" 198 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
199 m_showDataDeps =
true;
202 algoDependencies.
insert(
id );
206 ostdd <<
"\n o OUTPUT " << *id;
207 if ( id->key().find(
":" ) != std::string::npos ) {
208 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 210 m_showDataDeps =
true;
216 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
219 if ( m_showDataDeps ) { info() << ostdd.
str() <<
endmsg; }
225 for (
auto o : globalInp )
226 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
228 if ( unmetDep.
size() > 0 ) {
231 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232 ost <<
"\n o " << *o <<
" required by Algorithm: ";
234 for (
const auto& p : algosDependenciesMap )
235 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
238 if ( !m_useDataLoader.empty() ) {
243 if ( algo->name() == m_useDataLoader ) {
244 dataLoaderAlg = algo;
248 if ( dataLoaderAlg ==
nullptr ) {
249 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
250 <<
"\" found, and unmet INPUT dependencies " 256 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 257 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
262 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm" 267 for (
auto&
id : unmetDep ) {
268 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 269 << dataLoaderAlg->name() <<
endmsg;
274 fatal() <<
"Auto DataLoading not requested, " 275 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
280 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
285 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
286 if ( !m_precSvc.isValid() ) {
287 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
292 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
297 m_algname_vect.resize( algsNumber );
301 m_algname_index_map[
name] = index;
302 m_algname_vect.at( index ) =
name;
307 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
309 m_eventSlots.reserve( m_maxEventsInFlight );
310 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
312 m_eventSlots.back().complete =
true;
314 m_actionsCounts.assign( m_maxEventsInFlight, 0 );
316 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
319 info() <<
"Concurrency level information:" <<
endmsg;
320 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
321 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
323 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
325 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
328 if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
340 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
343 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
345 info() <<
"Joining Scheduler thread" <<
endmsg;
350 error() <<
"problems in scheduler thread" <<
endmsg;
370 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
372 if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
373 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
386 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
387 m_actionsQueue.pop( thisAction );
391 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
398 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
399 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
400 error() <<
"Problems terminating thread pool" <<
endmsg;
415 if ( m_isActive == ACTIVE ) {
418 m_freeSlots.store( 0 );
422 while ( m_actionsQueue.try_pop( thisAction ) ) {};
427 m_isActive = INACTIVE;
446 if ( !eventContext ) {
447 fatal() <<
"Event context is nullptr" <<
endmsg;
451 if ( m_freeSlots.load() == 0 ) {
452 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
461 const unsigned int thisSlotNum = eventContext->
slot();
462 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
464 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
468 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
469 thisSlot.
reset( eventContext );
476 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
477 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
481 if ( this->updateStates( thisSlotNum ).isFailure() ) {
482 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
491 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
492 verbose() <<
"Free slots available " << m_freeSlots.load() <<
endmsg;
495 m_actionsQueue.push(
action );
504 for (
auto context : eventContexts ) {
505 sc = pushNewEvent( context );
522 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
529 m_finishedEvents.pop( eventContext );
531 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
542 if ( m_finishedEvents.try_pop( eventContext ) ) {
543 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 564 const int source_slot ) {
570 const size_t retries = m_retryQueue.size();
571 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
573 queuePop = m_retryQueue.front();
585 const int eventsSlotsSize( m_eventSlots.size() );
586 eventSlotsPtrs.
reserve( eventsSlotsSize );
587 for (
auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
588 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
593 eventSlotsPtrs.
push_back( &m_eventSlots[si] );
596 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
600 auto& thisSlot = m_eventSlots[iSlot];
604 if ( algo_index >= 0 ) {
608 if ( sub_slot == -1 || iSlot != source_slot ) {
609 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
610 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
614 if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
615 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot <<
" of " << iSlot <<
endmsg;
624 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
627 bool IOBound =
false;
628 if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
631 partial_sc = enqueue( algIndex, iSlot, thisSlotPtr->eventContext.get() );
633 partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
636 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << index2algname( algIndex )
637 <<
" on processing slot " << iSlot <<
endmsg;
641 for (
auto& subslot : thisSlot.allSubSlots ) {
642 auto& subslotStates = subslot.algsStates;
643 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
645 partial_sc = enqueue( algIndex, iSlot, subslot.eventContext.get() );
653 if ( m_dumpIntraEventDynamics ) {
655 s << ( algo_index != -1 ? index2algname( algo_index ) :
"START" ) <<
", " 656 << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 660 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
662 myfile.
open(
"IntraEventFSMOccupancy_" +
threads +
"T.csv", std::ios::app );
668 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
669 !thisSlot.algsStates.containsAny(
670 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
671 !subSlotAlgsInStates( thisSlot,
672 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
673 !thisSlot.complete ) {
675 thisSlot.complete =
true;
679 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 680 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
681 m_finishedEvents.push( thisSlot.eventContext.release() );
687 thisSlot.eventContext.reset(
nullptr );
689 }
else if ( isStalled( thisSlot ) ) {
691 eventFailed( thisSlot.eventContext.get() );
709 size_t const subSlotIndex = contextPtr->
subSlot();
710 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
730 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
746 const uint slotIdx = eventContext->
slot();
748 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
750 dumpSchedulerState( msgLevel(
MSG::VERBOSE ) ? -1 : slotIdx );
753 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
756 m_eventSlots[slotIdx].complete =
true;
757 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
771 outputMS <<
"Dumping scheduler state\n" 772 <<
"=========================================================================================\n" 773 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 774 <<
"=========================================================================================\n\n";
778 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 779 <<
"------------------\n\n";
782 auto timelineSvc = serviceLocator()->service<
ITimelineSvc>(
"TimelineSvc",
false );
783 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
784 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
789 for (
auto& slot : m_eventSlots )
790 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
791 if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
794 for (
auto& slot : m_eventSlots ) {
795 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
800 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 801 << slot.eventContext->slot();
804 if ( timelineSvc.isValid() ) {
807 te.slot = slot.eventContext->slot();
808 te.event = slot.eventContext->evt();
810 if ( timelineSvc->getTimelineEvent( te ) )
813 outputMS <<
" thread.id: [unknown]";
818 outputMS <<
" state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
825 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 826 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
829 for (
auto& slot : m_eventSlots ) {
831 if ( slot.complete )
continue;
833 outputMS <<
"[ slot: " 834 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
836 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
839 if ( 0 > iSlot || iSlot == slotCount ) {
842 outputMS << m_precSvc->printState( slot ) <<
"\n";
845 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
846 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
847 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
848 for (
auto& ss : slot.allSubSlots ) {
849 outputMS <<
"[ slot: " << slotID <<
" sub-slot entry: " << ss.entryPoint <<
" event: " 850 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
852 outputMS << m_precSvc->printState( ss ) <<
"\n";
861 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
862 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
865 outputMS <<
"\n=========================================================================================\n" 866 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 867 <<
"=========================================================================================\n\n";
877 const std::string& algName( index2algname( iAlgo ) );
878 unsigned int rank = 0;
879 if ( !m_optimizationMode.empty() ) { rank = m_precSvc->getPriority( algName ); }
883 StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( algName, iAlgoPtr ) );
887 if ( getAlgSC.isSuccess() ) {
890 m_scheduledQueue.push( {iAlgo, si, eventContext, rank, iAlgoPtr} );
894 if ( -100 != m_threadPoolSize ) {
897 tbb::task* algoTask =
898 new ( tbb::task::allocate_root() )
AlgoExecutionTask(
this, serviceLocator(), m_algExecStateSvc );
900 tbb::task::enqueue( *algoTask );
907 ON_DEBUG debug() <<
"Algorithm " << index2algname( iAlgo ) <<
" was submitted on event " << eventContext->
evt()
908 <<
" in slot " << si <<
". Algorithms scheduled are " << m_algosInFlight <<
endmsg;
910 state = AState::SCHEDULED;
914 m_retryQueue.push( {iAlgo, si, eventContext, rank,
nullptr} );
916 state = AState::RESOURCELESS;
920 StatusCode updateSc = setAlgState( iAlgo, eventContext, state );
925 ON_VERBOSE verbose() <<
"Promoting " << index2algname( iAlgo ) <<
" to " << state <<
" on slot " << si <<
endmsg;
937 const std::string& algName( index2algname( iAlgo ) );
939 StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
941 if ( sc.isSuccess() ) {
943 ++m_IOBoundAlgosInFlight;
944 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
945 this->m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
954 IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
955 m_IOBoundAlgScheduler->push( *theTask );
957 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
958 <<
" in slot " << si <<
". algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
961 StatusCode updateSc = setAlgState( iAlgo, eventContext, AState::SCHEDULED );
964 <<
"[Asynchronous] Promoting " << algName <<
" to SCHEDULED on slot " << si <<
endmsg;
967 ON_DEBUG debug() <<
"[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
968 <<
" on slot " << si <<
endmsg;
980 const std::string& algName( index2algname( iAlgo ) );
986 ON_DEBUG debug() <<
"Trying to handle execution result of " << algName <<
" on slot " << si <<
endmsg;
988 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algName, *eventContext );
990 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
994 StatusCode sc = setAlgState( iAlgo, eventContext, state );
997 <<
"Promoting " << algName <<
" on slot " << si <<
" to " << state <<
endmsg;
999 ON_DEBUG debug() <<
"Algorithm " << algName <<
" executed in slot " << si <<
". Algorithms scheduled are " 1000 << m_algosInFlight <<
endmsg;
1003 int subSlotIndex = -1;
1005 ++m_actionsCounts[si];
1007 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1008 --this->m_actionsCounts[si];
1009 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1023 StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1026 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1027 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1031 --m_IOBoundAlgosInFlight;
1033 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1036 const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1038 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1042 sc = setAlgState( iAlgo, eventContext, state );
1045 <<
"[Asynchronous] Promoting " << algo->name() <<
" on slot " << si <<
" to " << state <<
endmsg;
1047 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1048 <<
". Algorithms scheduled are " << m_IOBoundAlgosInFlight <<
endmsg;
1051 ++m_actionsCounts[si];
1052 int subSlotIndex = -1;
1055 m_actionsQueue.push( [
this, si, iAlgo, subSlotIndex]() {
1056 --this->m_actionsCounts[si];
1057 return this->updateStates( -1, iAlgo, subSlotIndex, si );
1071 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1079 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1082 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1084 if ( viewContextPtr ) {
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
Class representing an event slot.
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
StatusCode finalize() override
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
constexpr static const auto SUCCESS
void activate()
Activate scheduler.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
EventContext * contextPtr
Struct to hold entries in the alg queues.
size_t sizeOfSubset(State state) const
This class represents an entry point to all the event specific data.
StatusCode setAlgState(unsigned int iAlgo, EventContext *contextPtr, AState state)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
#define DECLARE_COMPONENT(type)
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
StatusCode set(unsigned int iAlgo, State newState)
StatusCode finalize() override
Finalise.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
bool isValid() const
Allow for check if smart pointer is valid.
Iterator begin(State kind)
const StatusCode & ignore() const
Ignore/check StatusCode.
Base class from which all concrete algorithm classes should be derived.
StatusCode enqueue(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
constexpr static const auto FAILURE
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
bool filterPassed() const
class MergingTransformer< Out(const vector_of_const_< In > false
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
std::string fullKey() const
ContextID_t subSlot() const
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
const StatusCode & execStatus() const
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgsExecutionStates algsStates
Vector of algorithms states.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.