29 #include <string_view>
31 #include <unordered_set>
34 #include "boost/algorithm/string.hpp"
35 #include "boost/thread.hpp"
36 #include "boost/tokenizer.hpp"
41 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
42 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
45 struct DataObjIDSorter {
53 v.reserve( coll.
size() );
54 for (
const DataObjID&
id : coll )
v.push_back( &
id );
61 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
77 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
82 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
87 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
92 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
97 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
102 fatal() <<
"Terminating initialization" <<
endmsg;
105 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
114 warning() <<
"No CondSvc found, or not enabled. "
115 <<
"Will not manage CondAlgorithms" <<
endmsg;
123 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
129 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
136 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
148 const unsigned int algsNumber = algos.
size();
149 if ( algsNumber != 0 ) {
150 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
152 error() <<
"No algorithms found" <<
endmsg;
168 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
177 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
181 ostdd <<
"Data Dependencies for Algorithms:";
186 if (
nullptr == algoPtr ) {
187 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
188 <<
": this will result in a crash." <<
endmsg;
196 ostdd <<
"\n " << algoPtr->
name();
198 auto write_owners = [&avis, &ostdd](
const DataObjID id ) {
207 ostdd <<
"\n o INPUT " << id;
209 if (
id.
key().find(
":" ) != std::string::npos ) {
210 ostdd <<
" contains alternatives which require resolution...\n";
211 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":" } };
213 return globalOutp.find( DataObjID{ t } ) != globalOutp.
end();
215 if ( itok != tokens.end() ) {
216 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
217 id.updateKey( *itok );
219 error() <<
"failed to find alternate in global output list"
220 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
224 algoDependencies.
insert(
id );
228 ostdd <<
"\n o OUTPUT " << *id;
230 if ( id->key().find(
":" ) != std::string::npos ) {
231 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
239 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
242 if ( m_showDataDeps ) { info() << ostdd.str() <<
endmsg; }
247 if ( m_checkDeps || m_checkOutput ) {
249 for (
auto o : globalInp ) {
252 requiredInputKeys.
insert( o.key() );
253 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.
insert( o );
255 if ( m_checkOutput ) {
256 for (
auto o : globalOutp ) {
257 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
260 for (
const std::string& algoName : m_checkOutputIgnoreList ) {
261 auto it = algosOutputDependenciesMap.find( algoName );
262 if ( it != algosOutputDependenciesMap.end() ) {
263 if ( it->second.find( o ) != it->second.end() ) {
269 if ( !ignored ) { unusedOutp.
insert( o ); }
276 if ( unmetDepInp.
size() > 0 ) {
278 auto printUnmet = [&](
auto msg ) {
279 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
280 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
282 for (
const auto& p : algosInputDependenciesMap )
283 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
287 if ( !m_useDataLoader.empty() ) {
292 if ( m_useDataLoader == algo->name() ) {
293 dataLoaderAlg = algo;
297 if ( dataLoaderAlg ==
nullptr ) {
298 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
299 <<
"\" found, and unmet INPUT dependencies "
301 printUnmet( fatal() );
305 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
"/"
306 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
307 printUnmet( info() );
312 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm"
317 for (
auto&
id : unmetDepInp ) {
318 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
"/"
319 << dataLoaderAlg->name() <<
endmsg;
324 fatal() <<
"Auto DataLoading not requested, "
325 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
326 printUnmet( fatal() );
331 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
335 if ( m_checkOutput ) {
336 if ( unusedOutp.
size() > 0 ) {
338 auto printUnusedOutp = [&](
auto msg ) {
339 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
340 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
342 for (
const auto& p : algosOutputDependenciesMap )
343 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
347 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
348 printUnusedOutp( fatal() );
351 info() <<
"No unused OUTPUT items were found" <<
endmsg;
356 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
357 if ( !m_precSvc.isValid() ) {
358 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
363 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
368 m_algname_vect.resize( algsNumber );
372 m_algname_index_map[
name] = index;
373 m_algname_vect.at( index ) =
name;
378 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
380 m_eventSlots.reserve( m_maxEventsInFlight );
381 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
383 m_eventSlots.back().complete =
true;
386 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
389 info() <<
"Concurrency level information:" <<
endmsg;
390 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
391 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
394 info() <<
"Task scheduling settings:" <<
endmsg;
395 info() <<
" o Avalanche generation mode: "
396 << ( m_optimizationMode.empty() ?
"disabled" : m_optimizationMode.toString() ) <<
endmsg;
397 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
398 << ( m_enablePreemptiveBlockingTasks
399 ? (
"enabled (max. " +
std::to_string( m_maxBlockingAlgosInFlight ) +
" concurrent tasks)" )
402 info() <<
" o Scheduling of condition tasks: " << ( m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
404 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
406 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
409 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
421 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
424 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
426 info() <<
"Joining Scheduler thread" <<
endmsg;
431 error() <<
"problems in scheduler thread" <<
endmsg;
451 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
454 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
472 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
483 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
491 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
493 error() <<
"Problems terminating thread pool" <<
endmsg;
539 if ( !eventContext ) {
540 fatal() <<
"Event context is nullptr" <<
endmsg;
545 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
554 const unsigned int thisSlotNum = eventContext->
slot();
557 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
561 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
562 thisSlot.
reset( eventContext );
569 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
570 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
574 if ( this->
iterate().isFailure() ) {
575 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
584 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
597 for (
auto context : eventContexts ) {
624 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
636 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
659 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
666 OccupancySnapshot nextSnap;
671 if ( !thisSlot.eventContext )
continue;
673 int iSlot = thisSlot.eventContext->slot();
685 if ( nextSnap.states.empty() ) {
692 slotStateTotals.
resize( AState::MAXVALUE );
694 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
698 for (
auto& subslot : thisSlot.allSubSlots ) {
700 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
706 auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
707 for ( uint algIndex : drAlgs ) {
713 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
716 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
717 <<
" on processing slot " << iSlot <<
endmsg;
721 for (
auto& subslot : thisSlot.allSubSlots ) {
722 auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
723 for ( uint algIndex : drAlgsSubSlot ) {
728 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
734 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
746 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
747 !thisSlot.algsStates.containsAny(
748 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
749 !subSlotAlgsInStates( thisSlot,
750 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
751 !thisSlot.complete ) {
753 thisSlot.complete =
true;
757 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
758 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
765 thisSlot.eventContext.reset(
nullptr );
775 if ( !nextSnap.states.empty() ) {
789 auto slotIndex = contextPtr->
slot();
795 auto subSlotIndex = contextPtr->
subSlot();
802 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
812 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
830 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
831 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
847 const uint slotIdx = eventContext->
slot();
849 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
872 outputMS <<
"Dumping scheduler state\n"
873 <<
"=========================================================================================\n"
874 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
875 <<
"=========================================================================================\n\n";
879 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
880 <<
"------------------\n\n";
884 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
885 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
892 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
893 for ( uint algIndex : schedAlgs ) {
901 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
902 for ( uint algIndex : schedAlgs ) {
906 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
907 << slot.eventContext->slot();
910 if ( timelineSvc.isValid() ) {
913 te.slot = slot.eventContext->slot();
914 te.event = slot.eventContext->evt();
916 if ( timelineSvc->getTimelineEvent( te ) )
919 outputMS <<
" thread.id: [unknown]";
924 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
931 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
932 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
936 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
939 for (
auto& slot : m_eventSlots ) {
941 if ( slot.complete )
continue;
943 outputMS <<
"[ slot: "
944 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
946 << ( slot.eventContext->valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" )
949 if ( 0 > iSlot || iSlot == slotCount ) {
953 outputMS <<
"ERROR alg(s):";
955 auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
956 for ( uint algIndex : errorAlgs ) {
957 outputMS <<
" " << index2algname( algIndex );
960 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
964 outputMS << m_precSvc->printState( slot ) <<
"\n";
968 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
969 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
970 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
971 for (
auto& ss : slot.allSubSlots ) {
972 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
973 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
974 <<
", entry: " << ss.entryPoint <<
", event: "
975 << ( ss.eventContext->valid() ?
std::
to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
978 outputMS <<
"ERROR alg(s):";
979 auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
980 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
984 outputMS << m_precSvc->printState( ss ) <<
"\n";
993 if ( 0 <= iSlot && !wasAlgError ) {
994 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
995 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
998 outputMS <<
"\n=========================================================================================\n"
999 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1000 <<
"=========================================================================================\n\n";
1002 info() << outputMS.str() <<
endmsg;
1025 unsigned int algIndex{
ts.algIndex };
1026 std::string_view algName(
ts.algName );
1027 unsigned int algRank{
ts.algRank };
1028 bool blocking{
ts.blocking };
1029 int slotIndex{
ts.slotIndex };
1050 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1052 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1053 <<
", rank:" << algRank <<
", blocking:" << ( blocking ?
"yes" :
"no" )
1062 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1068 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1094 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1100 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1101 <<
", rank:" <<
ts.algRank <<
", blocking:" << (
ts.blocking ?
"yes" :
"no" )
1121 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1129 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1132 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1134 if ( viewContextPtr ) {
1160 if ( samplePeriod < 0 ) {