28 #include <string_view>
30 #include <unordered_set>
33 #include "boost/algorithm/string.hpp"
34 #include "boost/thread.hpp"
35 #include "boost/tokenizer.hpp"
37 #include "tbb/tbb_stddef.h"
42 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
43 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
46 struct DataObjIDSorter {
54 v.reserve( coll.
size() );
55 for (
const DataObjID&
id : coll )
v.push_back( &
id );
62 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
78 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
83 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
88 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
93 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
98 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
103 fatal() <<
"Terminating initialization" <<
endmsg;
106 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
115 warning() <<
"No CondSvc found, or not enabled. "
116 <<
"Will not manage CondAlgorithms" <<
endmsg;
124 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
130 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
137 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
149 const unsigned int algsNumber = algos.
size();
150 if ( algsNumber != 0 ) {
151 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
153 error() <<
"No algorithms found" <<
endmsg;
169 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
178 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
182 ostdd <<
"Data Dependencies for Algorithms:";
187 if (
nullptr == algoPtr ) {
188 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
189 <<
": this will result in a crash." <<
endmsg;
193 ostdd <<
"\n " << algoPtr->
name();
199 ostdd <<
"\n o INPUT " << id;
200 if (
id.
key().find(
":" ) != std::string::npos ) {
201 ostdd <<
" contains alternatives which require resolution...\n";
202 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":" } };
204 return globalOutp.find( DataObjID{ t } ) != globalOutp.
end();
206 if ( itok != tokens.end() ) {
207 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
208 id.updateKey( *itok );
210 error() <<
"failed to find alternate in global output list"
211 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
215 algoDependencies.
insert(
id );
219 ostdd <<
"\n o OUTPUT " << *id;
220 if ( id->key().find(
":" ) != std::string::npos ) {
221 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
229 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
232 if ( m_showDataDeps ) { info() << ostdd.str() <<
endmsg; }
237 if ( m_checkDeps || m_checkOutput ) {
239 for (
auto o : globalInp ) {
242 requiredInputKeys.
insert( o.key() );
243 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.
insert( o );
245 if ( m_checkOutput ) {
246 for (
auto o : globalOutp ) {
247 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
250 for (
const std::string& algoName : m_checkOutputIgnoreList ) {
251 auto it = algosOutputDependenciesMap.find( algoName );
252 if ( it != algosOutputDependenciesMap.end() ) {
253 if ( it->second.find( o ) != it->second.end() ) {
259 if ( !ignored ) { unusedOutp.
insert( o ); }
266 if ( unmetDepInp.
size() > 0 ) {
268 auto printUnmet = [&](
auto msg ) {
269 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
270 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
272 for (
const auto& p : algosInputDependenciesMap )
273 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
277 if ( !m_useDataLoader.empty() ) {
282 if ( algo->name() == m_useDataLoader ) {
283 dataLoaderAlg = algo;
287 if ( dataLoaderAlg ==
nullptr ) {
288 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
289 <<
"\" found, and unmet INPUT dependencies "
291 printUnmet( fatal() );
295 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
"/"
296 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
297 printUnmet( info() );
302 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm"
307 for (
auto&
id : unmetDepInp ) {
308 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
"/"
309 << dataLoaderAlg->name() <<
endmsg;
314 fatal() <<
"Auto DataLoading not requested, "
315 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
316 printUnmet( fatal() );
321 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
325 if ( m_checkOutput ) {
326 if ( unusedOutp.
size() > 0 ) {
328 auto printUnusedOutp = [&](
auto msg ) {
329 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
330 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
332 for (
const auto& p : algosOutputDependenciesMap )
333 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
337 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
338 printUnusedOutp( fatal() );
341 info() <<
"No unused OUTPUT items were found" <<
endmsg;
346 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
347 if ( !m_precSvc.isValid() ) {
348 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
353 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
358 m_algname_vect.resize( algsNumber );
362 m_algname_index_map[
name] = index;
363 m_algname_vect.at( index ) =
name;
368 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
370 m_eventSlots.reserve( m_maxEventsInFlight );
371 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
373 m_eventSlots.back().complete =
true;
376 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
379 info() <<
"Concurrency level information:" <<
endmsg;
380 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
381 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
384 info() <<
"Task scheduling settings:" <<
endmsg;
385 info() <<
" o Avalanche generation mode: "
386 << ( m_optimizationMode.empty() ?
"disabled" : m_optimizationMode.toString() ) <<
endmsg;
387 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
388 << ( m_enablePreemptiveBlockingTasks
389 ? (
"enabled (max. " +
std::to_string( m_maxBlockingAlgosInFlight ) +
" concurrent tasks)" )
392 info() <<
" o Scheduling of condition tasks: " << ( m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
394 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
396 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
399 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
411 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
414 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
416 info() <<
"Joining Scheduler thread" <<
endmsg;
421 error() <<
"problems in scheduler thread" <<
endmsg;
441 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
444 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
462 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
473 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
481 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
483 error() <<
"Problems terminating thread pool" <<
endmsg;
529 if ( !eventContext ) {
530 fatal() <<
"Event context is nullptr" <<
endmsg;
535 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
544 const unsigned int thisSlotNum = eventContext->
slot();
547 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
551 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
552 thisSlot.
reset( eventContext );
559 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
560 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
564 if ( this->
iterate().isFailure() ) {
565 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
574 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
587 for (
auto context : eventContexts ) {
614 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
626 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
649 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
656 OccupancySnapshot nextSnap;
661 if ( !thisSlot.eventContext )
continue;
663 int iSlot = thisSlot.eventContext->slot();
675 if ( nextSnap.states.empty() ) {
682 slotStateTotals.
resize( AState::MAXVALUE );
684 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
688 for (
auto& subslot : thisSlot.allSubSlots ) {
690 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
696 auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
697 for ( uint algIndex : drAlgs ) {
703 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
706 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
707 <<
" on processing slot " << iSlot <<
endmsg;
711 for (
auto& subslot : thisSlot.allSubSlots ) {
712 auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
713 for ( uint algIndex : drAlgsSubSlot ) {
718 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
724 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
736 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
737 !thisSlot.algsStates.containsAny(
738 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
739 !subSlotAlgsInStates( thisSlot,
740 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
741 !thisSlot.complete ) {
743 thisSlot.complete =
true;
747 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
748 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
755 thisSlot.eventContext.reset(
nullptr );
765 if ( !nextSnap.states.empty() ) {
779 auto slotIndex = contextPtr->
slot();
785 auto subSlotIndex = contextPtr->
subSlot();
792 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
802 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
820 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
821 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
837 const uint slotIdx = eventContext->
slot();
839 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
862 outputMS <<
"Dumping scheduler state\n"
863 <<
"=========================================================================================\n"
864 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
865 <<
"=========================================================================================\n\n";
869 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
870 <<
"------------------\n\n";
874 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
875 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
882 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
883 for ( uint algIndex : schedAlgs ) {
891 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
892 for ( uint algIndex : schedAlgs ) {
896 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
897 << slot.eventContext->slot();
900 if ( timelineSvc.isValid() ) {
903 te.slot = slot.eventContext->slot();
904 te.event = slot.eventContext->evt();
906 if ( timelineSvc->getTimelineEvent( te ) )
909 outputMS <<
" thread.id: [unknown]";
914 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
921 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
922 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
926 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
929 for (
auto& slot : m_eventSlots ) {
931 if ( slot.complete )
continue;
933 outputMS <<
"[ slot: "
934 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
936 << ( slot.eventContext->valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" )
939 if ( 0 > iSlot || iSlot == slotCount ) {
943 outputMS <<
"ERROR alg(s):";
945 auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
946 for ( uint algIndex : errorAlgs ) {
947 outputMS <<
" " << index2algname( algIndex );
950 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
954 outputMS << m_precSvc->printState( slot ) <<
"\n";
958 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
959 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
960 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
961 for (
auto& ss : slot.allSubSlots ) {
962 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
963 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
964 <<
", entry: " << ss.entryPoint <<
", event: "
965 << ( ss.eventContext->valid() ?
std::
to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
968 outputMS <<
"ERROR alg(s):";
969 auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
970 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
974 outputMS << m_precSvc->printState( ss ) <<
"\n";
983 if ( 0 <= iSlot && !wasAlgError ) {
984 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
985 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
988 outputMS <<
"\n=========================================================================================\n"
989 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
990 <<
"=========================================================================================\n\n";
992 info() << outputMS.str() <<
endmsg;
1015 unsigned int algIndex{
ts.algIndex };
1016 std::string_view algName(
ts.algName );
1017 unsigned int algRank{
ts.algRank };
1018 bool blocking{
ts.blocking };
1019 int slotIndex{
ts.slotIndex };
1040 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1042 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1043 <<
", rank:" << algRank <<
", blocking:" << ( blocking ?
"yes" :
"no" )
1052 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1058 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1084 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1090 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1091 <<
", rank:" <<
ts.algRank <<
", blocking:" << (
ts.blocking ?
"yes" :
"no" )
1111 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1119 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1122 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1124 if ( viewContextPtr ) {
1150 if ( samplePeriod < 0 ) {