33 #include <string_view>
35 #include <unordered_set>
38 #include <boost/algorithm/string.hpp>
39 #include <boost/thread.hpp>
40 #include <boost/tokenizer.hpp>
45 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
46 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
49 struct DataObjIDSorter {
55 std::vector<const DataObjID*> sortedDataObjIDColl(
const DataObjIDColl& coll ) {
56 std::vector<const DataObjID*>
v;
57 v.reserve( coll.size() );
58 for (
const DataObjID&
id : coll )
v.push_back( &
id );
59 std::sort(
v.begin(),
v.end(), DataObjIDSorter() );
63 bool subSlotAlgsInStates(
const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
65 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
81 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
86 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
91 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
96 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
101 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
102 std::binary_semaphore fiber_manager_initalized{ 0 };
103 m_thread = std::thread( [
this, &fiber_manager_initalized]() {
106 fiber_manager_initalized.release();
110 fiber_manager_initalized.acquire();
114 fatal() <<
"Terminating initialization" <<
endmsg;
117 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
126 warning() <<
"No CondSvc found, or not enabled. "
127 <<
"Will not manage CondAlgorithms" <<
endmsg;
135 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
141 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
148 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
160 const unsigned int algsNumber = algos.size();
161 if ( algsNumber != 0 ) {
162 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
164 error() <<
"No algorithms found" <<
endmsg;
176 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
180 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
186 globalOutp.insert(
id );
187 algoOutputs.insert(
id );
189 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
192 std::ostringstream ostdd;
193 ostdd <<
"Data Dependencies for Algorithms:";
195 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
198 if (
nullptr == algoPtr ) {
199 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
200 <<
": this will result in a crash." <<
endmsg;
208 ostdd <<
"\n " << algoPtr->
name();
210 auto write_owners = [&avis, &ostdd](
const DataObjID&
id ) {
219 ostdd <<
"\n o INPUT " <<
id;
221 algoDependencies.insert(
id );
222 globalInp.insert(
id );
225 ostdd <<
"\n o OUTPUT " << *
id;
227 if (
id->key().find(
":" ) != std::string::npos ) {
228 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
236 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
243 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
252 std::set<std::string> requiredInputKeys;
253 for (
auto o : globalInp ) {
256 requiredInputKeys.insert( o.key() );
257 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
260 for (
auto o : globalOutp ) {
261 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
265 auto it = algosOutputDependenciesMap.find( algoName );
266 if ( it != algosOutputDependenciesMap.end() ) {
267 if ( it->second.find( o ) != it->second.end() ) {
273 if ( !ignored ) { unusedOutp.insert( o ); }
280 if ( unmetDepInp.size() > 0 ) {
282 auto printUnmet = [&](
auto msg ) {
283 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
284 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
286 for (
const auto& p : algosInputDependenciesMap )
287 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
297 dataLoaderAlg = algo;
301 if ( dataLoaderAlg ==
nullptr ) {
303 <<
"\" found, and unmet INPUT dependencies "
305 printUnmet( fatal() );
309 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/"
310 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
311 printUnmet( info() );
316 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.
value() <<
"\" IAlg to Gaudi::Algorithm"
321 for (
auto&
id : unmetDepInp ) {
322 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/"
323 << dataLoaderAlg->name() <<
endmsg;
328 fatal() <<
"Auto DataLoading not requested, "
329 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
330 printUnmet( fatal() );
335 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
340 if ( unusedOutp.size() > 0 ) {
342 auto printUnusedOutp = [&](
auto msg ) {
343 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
344 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
346 for (
const auto& p : algosOutputDependenciesMap )
347 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
351 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
352 printUnusedOutp( fatal() );
355 info() <<
"No unused OUTPUT items were found" <<
endmsg;
362 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
367 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
374 const std::string&
name = algo->name();
382 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
393 info() <<
"Concurrency level information:" <<
endmsg;
399 info() <<
"Task scheduling settings:" <<
endmsg;
400 info() <<
" o Avalanche generation mode: "
402 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
407 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
426 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
429 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
431 debug() <<
"Deleting FiberManager" <<
endmsg;
434 info() <<
"Joining Scheduler thread" <<
endmsg;
439 error() <<
"problems in scheduler thread" <<
endmsg;
459 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
462 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
480 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
491 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
499 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
501 error() <<
"Problems terminating thread pool" <<
endmsg;
547 if ( !eventContext ) {
548 fatal() <<
"Event context is nullptr" <<
endmsg;
553 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
562 const unsigned int thisSlotNum = eventContext->
slot();
565 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
569 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
570 thisSlot.
reset( eventContext );
577 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
578 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
582 if ( this->
iterate().isFailure() ) {
583 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
592 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
605 for (
auto context : eventContexts ) {
636 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
648 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
671 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
674 global_sc =
schedule( std::move( retryTS ) );
678 OccupancySnapshot nextSnap;
679 auto now = std::chrono::system_clock::now();
683 if ( !thisSlot.eventContext )
continue;
685 int iSlot = thisSlot.eventContext->slot();
697 if ( nextSnap.states.empty() ) {
703 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
704 slotStateTotals.resize( AState::MAXVALUE );
706 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
710 for (
auto& subslot : thisSlot.allSubSlots ) {
712 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
718 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
719 for ( uint algIndex : drAlgs ) {
722 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
725 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
728 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
729 <<
" on processing slot " << iSlot <<
endmsg;
733 for (
auto& subslot : thisSlot.allSubSlots ) {
734 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
735 for ( uint algIndex : drAlgsSubSlot ) {
738 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
740 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
746 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
748 <<
", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() <<
"\n";
750 : std::to_string( std::thread::hardware_concurrency() );
751 std::ofstream myfile;
758 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
759 !thisSlot.algsStates.containsAny(
760 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
761 !subSlotAlgsInStates( thisSlot,
762 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
763 !thisSlot.complete ) {
765 thisSlot.complete =
true;
769 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
770 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
777 thisSlot.eventContext.reset(
nullptr );
787 if ( !nextSnap.states.empty() ) {
801 auto slotIndex = contextPtr->
slot();
807 auto subSlotIndex = contextPtr->
subSlot();
814 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
824 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
842 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
843 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845 error() <<
"*** Stall detected, event context: " << slot.
eventContext.get() <<
endmsg;
859 const uint slotIdx = eventContext->
slot();
861 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
882 std::ostringstream outputMS;
884 outputMS <<
"Dumping scheduler state\n"
885 <<
"=========================================================================================\n"
886 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
887 <<
"=========================================================================================\n\n";
891 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
892 <<
"------------------\n\n";
896 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
897 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
904 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
905 for ( uint algIndex : schedAlgs ) {
913 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
914 for ( uint algIndex : schedAlgs ) {
918 outputMS <<
" task: " << std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
919 << slot.eventContext->slot();
922 if ( timelineSvc.isValid() ) {
925 te.slot = slot.eventContext->slot();
926 te.event = slot.eventContext->evt();
928 if ( timelineSvc->getTimelineEvent( te ) )
931 outputMS <<
" thread.id: [unknown]";
936 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
943 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
944 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
948 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
951 for (
auto& slot : m_eventSlots ) {
953 if ( slot.complete )
continue;
955 outputMS <<
"[ slot: "
956 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
958 << ( slot.eventContext->
valid() ?
std::to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
960 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
961 outputMS <<
" ]:\n\n";
963 if ( 0 > iSlot || iSlot == slotCount ) {
967 outputMS <<
"ERROR alg(s):";
969 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
970 for ( uint algIndex : errorAlgs ) {
971 outputMS <<
" " << index2algname( algIndex );
974 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
978 outputMS << m_precSvc->printState( slot ) <<
"\n";
982 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
983 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
984 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
985 for (
auto& ss : slot.allSubSlots ) {
986 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
987 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
988 <<
", entry: " << ss.entryPoint <<
", event: "
989 << ( ss.eventContext->
valid() ?
std::to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
992 outputMS <<
"ERROR alg(s):";
993 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
994 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
998 outputMS << m_precSvc->printState( ss ) <<
"\n";
1007 if ( 0 <= iSlot && !wasAlgError ) {
1008 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1009 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1012 outputMS <<
"\n=========================================================================================\n"
1013 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1014 <<
"=========================================================================================\n\n";
1016 info() << outputMS.str() <<
endmsg;
1034 unsigned int algIndex{
ts.algIndex };
1035 std::string_view algName(
ts.algName );
1036 unsigned int algRank{
ts.algRank };
1037 bool asynchronous{
ts.asynchronous };
1038 int slotIndex{
ts.slotIndex };
1041 if ( asynchronous ) {
1049 if ( !asynchronous ) {
1057 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1059 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1060 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1070 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1076 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1099 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1105 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1106 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
1123 std::unique_ptr<EventContext> viewContext ) {
1126 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1134 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.release(),
1137 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139 if ( viewContextPtr ) {
1141 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1142 topSlot.
addSubSlot( std::move( viewContext ), nodeName );
1165 if ( samplePeriod < 0 ) {
1168 this->
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1178 const std::map<std::string, DataObjIDColl>& outDeps )
const {
1180 assert( inDeps.size() == outDeps.size() );
1183 enum class FileType : short {
UNKNOWN, DOT, MD };
1184 std::regex fileExtensionRegexDot(
".dot$" );
1185 std::regex fileExtensionRegexMd(
".md$" );
1190 fileExtension = FileType::DOT;
1192 fileExtension = FileType::MD;
1194 fileExtension = FileType::DOT;
1195 fileName = fileName +
".dot";
1197 info() <<
"Dumping data dependencies graph to file: " << fileName <<
endmsg;
1199 std::string startGraph =
"";
1200 std::string stopGraph =
"";
1202 std::function<std::string(
const std::string&,
const std::string& )> defineAlg;
1203 std::function<std::string(
const DataObjID& )> defineObj;
1204 std::function<std::string(
const DataObjID&,
const std::string& )> defineInput;
1205 std::function<std::string(
const std::string&,
const DataObjID& )> defineOutput;
1207 if ( fileExtension == FileType::DOT ) {
1209 startGraph =
"digraph datadeps {\nrankdir=\"LR\";\n\n";
1210 stopGraph =
"\n}\n";
1212 defineAlg = [](
const std::string&
alg,
const std::string& idx ) -> std::string {
1213 return "Alg_" + idx +
" [label=\"" +
alg +
"\";shape=box];\n";
1216 defineObj = [](
const DataObjID& obj ) -> std::string {
1217 return "obj_" + std::to_string( obj.hash() ) +
" [label=\"" + obj.key() +
"\"];\n";
1220 defineInput = [](
const DataObjID& obj,
const std::string&
alg ) -> std::string {
1221 return "obj_" + std::to_string( obj.
hash() ) +
" -> " +
"Alg_" +
alg +
";\n";
1224 defineOutput = [](
const std::string&
alg,
const DataObjID& obj ) -> std::string {
1225 return "Alg_" +
alg +
" -> " +
"obj_" + std::to_string( obj.
hash() ) +
";\n";
1229 startGraph =
"```mermaid\ngraph LR;\n\n";
1230 stopGraph =
"\n```\n";
1232 defineAlg = [](
const std::string&
alg,
const std::string& idx ) -> std::string {
1233 return "Alg_" + idx +
"{{" +
alg +
"}}\n";
1236 defineObj = [](
const DataObjID& obj ) -> std::string {
1237 return "obj_" + std::to_string( obj.hash() ) +
">" + obj.key() +
"]\n";
1240 defineInput = [](
const DataObjID& obj,
const std::string&
alg ) -> std::string {
1241 return "obj_" + std::to_string( obj.
hash() ) +
" --> " +
"Alg_" +
alg +
"\n";
1244 defineOutput = [](
const std::string&
alg,
const DataObjID& obj ) -> std::string {
1245 return "Alg_" +
alg +
" --> " +
"obj_" + std::to_string( obj.
hash() ) +
"\n";
1250 dataDepthGraphFile << startGraph;
1253 std::set<std::size_t> definedObjects;
1260 std::size_t algoIndex = 0ul;
1261 for (
const auto& [
name, ideps] : inDeps ) {
1262 if ( not std::regex_search(
name, algNameRegex ) )
continue;
1263 dataDepthGraphFile << defineAlg(
name, std::to_string( algoIndex ) );
1266 for (
const auto& dep : ideps ) {
1267 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1269 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1270 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1272 dataDepthGraphFile << defineInput( dep, std::to_string( algoIndex ) );
1275 const auto& odeps = outDeps.at(
name );
1276 for (
const auto& dep : odeps ) {
1277 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1279 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1280 if ( inserted ) dataDepthGraphFile << defineObj( dep );
1282 dataDepthGraphFile << defineOutput( std::to_string( algoIndex ), dep );
1289 dataDepthGraphFile << stopGraph;
1290 dataDepthGraphFile.close();