28 #include <string_view>
30 #include <unordered_set>
33 #include "boost/algorithm/string.hpp"
34 #include "boost/thread.hpp"
35 #include "boost/tokenizer.hpp"
37 #include "tbb/tbb_stddef.h"
42 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
43 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
46 struct DataObjIDSorter {
54 v.reserve( coll.
size() );
55 for (
const DataObjID&
id : coll )
v.push_back( &
id );
62 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
78 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
83 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
88 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
93 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
98 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
103 fatal() <<
"Terminating initialization" <<
endmsg;
106 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
115 warning() <<
"No CondSvc found, or not enabled. "
116 <<
"Will not manage CondAlgorithms" <<
endmsg;
124 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
130 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
137 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
149 const unsigned int algsNumber = algos.
size();
150 if ( algsNumber != 0 ) {
151 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
153 error() <<
"No algorithms found" <<
endmsg;
168 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
175 ostdd <<
"Data Dependencies for Algorithms:";
180 if (
nullptr == algoPtr ) {
181 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
182 <<
": this will result in a crash." <<
endmsg;
186 ostdd <<
"\n " << algoPtr->
name();
192 ostdd <<
"\n o INPUT " << id;
193 if (
id.
key().find(
":" ) != std::string::npos ) {
194 ostdd <<
" contains alternatives which require resolution...\n";
195 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
197 return globalOutp.find( DataObjID{t} ) != globalOutp.
end();
199 if ( itok != tokens.end() ) {
200 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
201 id.updateKey( *itok );
203 error() <<
"failed to find alternate in global output list"
204 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
208 algoDependencies.
insert(
id );
212 ostdd <<
"\n o OUTPUT " << *id;
213 if ( id->key().find(
":" ) != std::string::npos ) {
214 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
222 algosDependenciesMap[algoPtr->name()] = algoDependencies;
225 if ( m_showDataDeps ) { info() << ostdd.str() <<
endmsg; }
231 for (
auto o : globalInp )
232 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.
insert( o );
234 if ( unmetDep.
size() > 0 ) {
236 auto printUnmet = [&](
auto msg ) {
237 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
238 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
240 for (
const auto& p : algosDependenciesMap )
241 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
245 if ( !m_useDataLoader.empty() ) {
250 if ( algo->name() == m_useDataLoader ) {
251 dataLoaderAlg = algo;
255 if ( dataLoaderAlg ==
nullptr ) {
256 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
257 <<
"\" found, and unmet INPUT dependencies "
259 printUnmet( fatal() );
263 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
"/"
264 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
265 printUnmet( info() );
270 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm"
275 for (
auto&
id : unmetDep ) {
276 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
"/"
277 << dataLoaderAlg->name() <<
endmsg;
282 fatal() <<
"Auto DataLoading not requested, "
283 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
284 printUnmet( fatal() );
289 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
294 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
295 if ( !m_precSvc.isValid() ) {
296 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
301 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
306 m_algname_vect.resize( algsNumber );
310 m_algname_index_map[
name] = index;
311 m_algname_vect.at( index ) =
name;
316 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
318 m_eventSlots.reserve( m_maxEventsInFlight );
319 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
321 m_eventSlots.back().complete =
true;
324 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
327 info() <<
"Concurrency level information:" <<
endmsg;
328 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
329 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
332 info() <<
"Task scheduling settings:" <<
endmsg;
333 info() <<
" o Avalanche generation mode: "
334 << ( m_optimizationMode.empty() ?
"disabled" : m_optimizationMode.toString() ) <<
endmsg;
335 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
336 << ( m_enablePreemptiveBlockingTasks
337 ? (
"enabled (max. " +
std::to_string( m_maxBlockingAlgosInFlight ) +
" concurrent tasks)" )
340 info() <<
" o Scheduling of condition tasks: " << ( m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
342 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
344 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
347 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
359 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
362 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
364 info() <<
"Joining Scheduler thread" <<
endmsg;
369 error() <<
"problems in scheduler thread" <<
endmsg;
389 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
392 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
410 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
421 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
429 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
431 error() <<
"Problems terminating thread pool" <<
endmsg;
477 if ( !eventContext ) {
478 fatal() <<
"Event context is nullptr" <<
endmsg;
483 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
492 const unsigned int thisSlotNum = eventContext->
slot();
495 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
499 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
500 thisSlot.
reset( eventContext );
507 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
508 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
512 if ( this->
iterate().isFailure() ) {
513 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
522 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
535 for (
auto context : eventContexts ) {
562 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
574 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
597 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
604 OccupancySnapshot nextSnap;
609 if ( !thisSlot.eventContext )
continue;
611 int iSlot = thisSlot.eventContext->slot();
623 if ( nextSnap.states.empty() ) {
630 slotStateTotals.
resize( AState::MAXVALUE );
632 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
636 for (
auto& subslot : thisSlot.allSubSlots ) {
638 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
644 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
651 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
654 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
655 <<
" on processing slot " << iSlot <<
endmsg;
659 for (
auto& subslot : thisSlot.allSubSlots ) {
660 auto& subslotStates = subslot.algsStates;
661 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
667 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
673 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
685 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
686 !thisSlot.algsStates.containsAny(
687 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
688 !subSlotAlgsInStates( thisSlot,
689 {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
690 !thisSlot.complete ) {
692 thisSlot.complete =
true;
696 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
697 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
704 thisSlot.eventContext.reset(
nullptr );
714 if ( !nextSnap.states.empty() ) {
728 auto slotIndex = contextPtr->
slot();
734 auto subSlotIndex = contextPtr->
subSlot();
741 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
751 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
770 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
786 const uint slotIdx = eventContext->
slot();
788 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
811 outputMS <<
"Dumping scheduler state\n"
812 <<
"=========================================================================================\n"
813 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
814 <<
"=========================================================================================\n\n";
818 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
819 <<
"------------------\n\n";
823 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
824 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
830 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
835 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
840 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
841 << slot.eventContext->slot();
844 if ( timelineSvc.isValid() ) {
847 te.slot = slot.eventContext->slot();
848 te.event = slot.eventContext->evt();
850 if ( timelineSvc->getTimelineEvent( te ) )
853 outputMS <<
" thread.id: [unknown]";
858 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
865 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
866 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
873 for (
auto& slot : m_eventSlots ) {
875 if ( slot.complete )
continue;
877 outputMS <<
"[ slot: "
878 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
880 << ( slot.eventContext->valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" )
883 if ( 0 > iSlot || iSlot == slotCount ) {
887 outputMS <<
"ERROR alg(s):";
890 outputMS <<
" " << index2algname( *it );
893 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
897 outputMS << m_precSvc->printState( slot ) <<
"\n";
901 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
902 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
903 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
904 for (
auto& ss : slot.allSubSlots ) {
905 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
906 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
907 <<
", entry: " << ss.entryPoint <<
", event: "
908 << ( ss.eventContext->valid() ?
std::
to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
911 outputMS <<
"ERROR alg(s):";
913 outputMS <<
" " << index2algname( *it );
917 outputMS << m_precSvc->printState( ss ) <<
"\n";
926 if ( 0 <= iSlot && !wasAlgError ) {
927 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
928 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
931 outputMS <<
"\n=========================================================================================\n"
932 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
933 <<
"=========================================================================================\n\n";
935 info() << outputMS.str() <<
endmsg;
958 unsigned int algIndex{
ts.algIndex};
959 std::string_view algName(
ts.algName );
960 unsigned int algRank{
ts.algRank};
961 bool blocking{
ts.blocking};
962 int slotIndex{
ts.slotIndex};
965 if (
LIKELY( !blocking ) ) {
983 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
985 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
986 <<
", rank:" << algRank <<
", blocking:" << ( blocking ?
"yes" :
"no" )
995 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1001 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1027 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1033 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1034 <<
", rank:" <<
ts.algRank <<
", blocking:" << (
ts.blocking ?
"yes" :
"no" )
1054 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1062 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1065 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1067 if ( viewContextPtr ) {
1093 if ( samplePeriod < 0 ) {