33 #include <string_view>
35 #include <unordered_set>
38 #include <boost/algorithm/string.hpp>
39 #include <boost/thread.hpp>
40 #include <boost/tokenizer.hpp>
45 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
46 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
49 struct DataObjIDSorter {
57 v.reserve( coll.
size() );
58 for (
const DataObjID&
id : coll )
v.push_back( &
id );
65 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
81 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
86 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
91 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
96 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
101 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
102 std::binary_semaphore fiber_manager_initalized{ 0 };
106 fiber_manager_initalized.release();
110 fiber_manager_initalized.acquire();
114 fatal() <<
"Terminating initialization" <<
endmsg;
117 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
126 warning() <<
"No CondSvc found, or not enabled. "
127 <<
"Will not manage CondAlgorithms" <<
endmsg;
135 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
141 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
148 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
160 const unsigned int algsNumber = algos.
size();
161 if ( algsNumber != 0 ) {
162 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
164 error() <<
"No algorithms found" <<
endmsg;
180 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
189 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
193 ostdd <<
"Data Dependencies for Algorithms:";
198 if (
nullptr == algoPtr ) {
199 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
200 <<
": this will result in a crash." <<
endmsg;
208 ostdd <<
"\n " << algoPtr->
name();
210 auto write_owners = [&avis, &ostdd](
const DataObjID&
id ) {
219 ostdd <<
"\n o INPUT " <<
id;
221 algoDependencies.
insert(
id );
225 ostdd <<
"\n o OUTPUT " << *
id;
227 if (
id->key().find(
":" ) != std::string::npos ) {
228 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
236 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
243 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
253 for (
auto o : globalInp ) {
256 requiredInputKeys.
insert( o.key() );
257 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDepInp.
insert( o );
260 for (
auto o : globalOutp ) {
261 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
265 auto it = algosOutputDependenciesMap.
find( algoName );
266 if ( it != algosOutputDependenciesMap.
end() ) {
267 if ( it->second.find( o ) != it->second.end() ) {
273 if ( !ignored ) { unusedOutp.
insert( o ); }
280 if ( unmetDepInp.
size() > 0 ) {
282 auto printUnmet = [&](
auto msg ) {
283 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
284 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
286 for (
const auto& p : algosInputDependenciesMap )
287 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
297 dataLoaderAlg = algo;
301 if ( dataLoaderAlg ==
nullptr ) {
303 <<
"\" found, and unmet INPUT dependencies "
305 printUnmet( fatal() );
309 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/"
310 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
311 printUnmet( info() );
316 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.
value() <<
"\" IAlg to Gaudi::Algorithm"
321 for (
auto&
id : unmetDepInp ) {
322 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/"
323 << dataLoaderAlg->name() <<
endmsg;
328 fatal() <<
"Auto DataLoading not requested, "
329 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
330 printUnmet( fatal() );
335 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
340 if ( unusedOutp.
size() > 0 ) {
342 auto printUnusedOutp = [&](
auto msg ) {
343 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
344 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
346 for (
const auto& p : algosOutputDependenciesMap )
347 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
351 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
352 printUnusedOutp( fatal() );
355 info() <<
"No unused OUTPUT items were found" <<
endmsg;
362 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
367 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
382 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
393 info() <<
"Concurrency level information:" <<
endmsg;
399 info() <<
"Task scheduling settings:" <<
endmsg;
400 info() <<
" o Avalanche generation mode: "
402 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
407 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
426 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
429 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
431 debug() <<
"Deleting FiberManager" <<
endmsg;
434 info() <<
"Joining Scheduler thread" <<
endmsg;
439 error() <<
"problems in scheduler thread" <<
endmsg;
459 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
462 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
480 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
491 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
499 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
501 error() <<
"Problems terminating thread pool" <<
endmsg;
547 if ( !eventContext ) {
548 fatal() <<
"Event context is nullptr" <<
endmsg;
553 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
562 const unsigned int thisSlotNum = eventContext->
slot();
565 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
569 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
570 thisSlot.
reset( eventContext );
577 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
578 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
582 if ( this->
iterate().isFailure() ) {
583 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
592 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
605 for (
auto context : eventContexts ) {
636 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
648 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
671 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
678 OccupancySnapshot nextSnap;
683 if ( !thisSlot.eventContext )
continue;
685 int iSlot = thisSlot.eventContext->slot();
697 if ( nextSnap.states.empty() ) {
704 slotStateTotals.
resize( AState::MAXVALUE );
706 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
710 for (
auto& subslot : thisSlot.allSubSlots ) {
712 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
718 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
719 for ( uint algIndex : drAlgs ) {
722 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
725 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
728 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
729 <<
" on processing slot " << iSlot <<
endmsg;
733 for (
auto& subslot : thisSlot.allSubSlots ) {
734 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
735 for ( uint algIndex : drAlgsSubSlot ) {
738 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
740 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
746 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
758 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
759 !thisSlot.algsStates.containsAny(
760 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
761 !subSlotAlgsInStates( thisSlot,
762 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
763 !thisSlot.complete ) {
765 thisSlot.complete =
true;
769 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
770 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
777 thisSlot.eventContext.reset(
nullptr );
787 if ( !nextSnap.states.empty() ) {
801 auto slotIndex = contextPtr->
slot();
807 auto subSlotIndex = contextPtr->
subSlot();
814 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
824 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
842 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
843 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
859 const uint slotIdx = eventContext->
slot();
861 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
884 outputMS <<
"Dumping scheduler state\n"
885 <<
"=========================================================================================\n"
886 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
887 <<
"=========================================================================================\n\n";
891 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
892 <<
"------------------\n\n";
896 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
897 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
904 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
905 for ( uint algIndex : schedAlgs ) {
913 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
914 for ( uint algIndex : schedAlgs ) {
918 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
919 << slot.eventContext->slot();
922 if ( timelineSvc.isValid() ) {
925 te.slot = slot.eventContext->slot();
926 te.event = slot.eventContext->evt();
928 if ( timelineSvc->getTimelineEvent( te ) )
931 outputMS <<
" thread.id: [unknown]";
936 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
943 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
944 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
948 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
951 for (
auto& slot : m_eventSlots ) {
953 if ( slot.complete )
continue;
955 outputMS <<
"[ slot: "
956 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
958 << ( slot.eventContext->
valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
960 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
961 outputMS <<
" ]:\n\n";
963 if ( 0 > iSlot || iSlot == slotCount ) {
967 outputMS <<
"ERROR alg(s):";
969 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
970 for ( uint algIndex : errorAlgs ) {
971 outputMS <<
" " << index2algname( algIndex );
974 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
978 outputMS << m_precSvc->printState( slot ) <<
"\n";
982 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
983 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
984 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
985 for (
auto& ss : slot.allSubSlots ) {
986 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
987 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
988 <<
", entry: " << ss.entryPoint <<
", event: "
992 outputMS <<
"ERROR alg(s):";
993 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
994 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
998 outputMS << m_precSvc->printState( ss ) <<
"\n";
1007 if ( 0 <= iSlot && !wasAlgError ) {
1008 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1009 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1012 outputMS <<
"\n=========================================================================================\n"
1013 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1014 <<
"=========================================================================================\n\n";
1016 info() << outputMS.str() <<
endmsg;
1034 unsigned int algIndex{
ts.algIndex };
1035 std::string_view algName(
ts.algName );
1036 unsigned int algRank{
ts.algRank };
1037 bool asynchronous{
ts.asynchronous };
1038 int slotIndex{
ts.slotIndex };
1041 if ( asynchronous ) {
1049 if ( !asynchronous ) {
1057 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1059 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1060 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1070 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1076 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1099 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1105 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1106 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
1126 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1134 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1137 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139 if ( viewContextPtr ) {
1165 if ( samplePeriod < 0 ) {
1180 assert( inDeps.
size() == outDeps.
size() );
1183 enum class FileType : short {
UNKNOWN, DOT, MD };
1190 fileExtension = FileType::DOT;
1192 fileExtension = FileType::MD;
1194 fileExtension = FileType::DOT;
1195 fileName = fileName +
".dot";
1197 info() <<
"Dumping data dependencies graph to file: " << fileName <<
endmsg;
1207 if ( fileExtension == FileType::DOT ) {
1209 startGraph =
"digraph datadeps {\nrankdir=\"LR\";\n\n";
1210 stopGraph =
"\n}\n";
1213 return "Alg_" + idx +
" [label=\"" +
alg +
"\";shape=box];\n";
1217 return "obj_" +
std::to_string( obj.hash() ) +
" [label=\"" + obj.key() +
"\"];\n";
1229 startGraph =
"```mermaid\ngraph LR;\n\n";
1230 stopGraph =
"\n```\n";
1233 return "Alg_" + idx +
"{{" +
alg +
"}}\n";
1237 return "obj_" +
std::to_string( obj.hash() ) +
">" + obj.key() +
"]\n";
1250 dataDepthGraphFile << startGraph;
1261 for (
const auto& [
name, ideps] : inDeps ) {
1266 for (
const auto& dep : ideps ) {
1269 const auto [itr, inserted] = definedObjects.
insert( dep.hash() );
1270 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1272 dataDepthGraphFile << defineInput( dep,
std::to_string( algoIndex ) );
1275 const auto& odeps = outDeps.
at(
name );
1276 for (
const auto& dep : odeps ) {
1279 const auto [itr, inserted] = definedObjects.
insert( dep.hash() );
1280 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1282 dataDepthGraphFile << defineOutput(
std::to_string( algoIndex ), dep );
1289 dataDepthGraphFile << stopGraph;
1290 dataDepthGraphFile.
close();