34 #include <string_view>
36 #include <unordered_set>
39 #include <boost/algorithm/string.hpp>
40 #include <boost/thread.hpp>
41 #include <boost/tokenizer.hpp>
46 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
50 struct DataObjIDSorter {
56 std::vector<const DataObjID*> sortedDataObjIDColl(
const DataObjIDColl& coll ) {
57 std::vector<const DataObjID*>
v;
58 v.reserve( coll.size() );
59 for (
const DataObjID&
id : coll )
v.push_back( &
id );
60 std::sort(
v.begin(),
v.end(), DataObjIDSorter() );
64 bool subSlotAlgsInStates(
const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
66 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
82 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
87 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
92 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
97 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
102 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [
this, &fiber_manager_initalized]() {
107 fiber_manager_initalized.release();
111 fiber_manager_initalized.acquire();
115 fatal() <<
"Terminating initialization" <<
endmsg;
118 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
127 warning() <<
"No CondSvc found, or not enabled. "
128 <<
"Will not manage CondAlgorithms" <<
endmsg;
136 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
142 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
149 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
165 error() <<
"No algorithms found" <<
endmsg;
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
181 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
187 globalOutp.insert(
id );
188 algoOutputs.insert(
id );
190 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
193 std::ostringstream ostdd;
194 ostdd <<
"Data Dependencies for Algorithms:";
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
199 if (
nullptr == algoPtr ) {
200 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
201 <<
": this will result in a crash." <<
endmsg;
209 ostdd <<
"\n " << algoPtr->
name();
211 auto write_owners = [&avis, &ostdd](
const DataObjID&
id ) {
220 ostdd <<
"\n o INPUT " <<
id;
222 algoDependencies.insert(
id );
223 globalInp.insert(
id );
226 ostdd <<
"\n o OUTPUT " << *
id;
228 if (
id->key().find(
":" ) != std::string::npos ) {
229 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
237 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
244 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
253 std::set<std::string> requiredInputKeys;
254 for (
auto o : globalInp ) {
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
261 for (
auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
274 if ( !ignored ) { unusedOutp.insert( o ); }
281 if ( unmetDepInp.size() > 0 ) {
283 auto printUnmet = [&](
auto msg ) {
284 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
287 for (
const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
298 dataLoaderAlg = algo;
302 if ( dataLoaderAlg ==
nullptr ) {
304 <<
"\" found, and unmet INPUT dependencies "
306 printUnmet( fatal() );
310 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/"
311 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
312 printUnmet( info() );
317 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.
value() <<
"\" IAlg to Gaudi::Algorithm"
322 for (
auto&
id : unmetDepInp ) {
323 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/"
324 << dataLoaderAlg->name() <<
endmsg;
329 fatal() <<
"Auto DataLoading not requested, "
330 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
331 printUnmet( fatal() );
336 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
341 if ( unusedOutp.size() > 0 ) {
343 auto printUnusedOutp = [&](
auto msg ) {
344 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
347 for (
const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
352 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
353 printUnusedOutp( fatal() );
356 info() <<
"No unused OUTPUT items were found" <<
endmsg;
363 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
368 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
375 const std::string&
name = algo->name();
383 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
394 info() <<
"Concurrency level information:" <<
endmsg;
400 info() <<
"Task scheduling settings:" <<
endmsg;
401 info() <<
" o Avalanche generation mode: "
403 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
408 info() <<
" o Scheduling of condition tasks: " << (
m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
427 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
430 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
432 debug() <<
"Deleting FiberManager" <<
endmsg;
435 info() <<
"Joining Scheduler thread" <<
endmsg;
440 error() <<
"problems in scheduler thread" <<
endmsg;
460 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
463 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
481 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
492 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
500 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
502 error() <<
"Problems terminating thread pool" <<
endmsg;
548 if ( !eventContext ) {
549 fatal() <<
"Event context is nullptr" <<
endmsg;
554 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
563 const unsigned int thisSlotNum = eventContext->
slot();
566 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
570 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
571 thisSlot.
reset( eventContext );
578 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
583 if ( this->
iterate().isFailure() ) {
584 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
593 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
606 for (
auto context : eventContexts ) {
637 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
649 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
672 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
675 global_sc =
schedule( std::move( retryTS ) );
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
684 if ( !thisSlot.eventContext )
continue;
686 int iSlot = thisSlot.eventContext->slot();
698 if ( nextSnap.states.empty() ) {
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
707 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
711 for (
auto& subslot : thisSlot.allSubSlots ) {
713 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
719 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
723 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
726 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
729 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
730 <<
" on processing slot " << iSlot <<
endmsg;
734 for (
auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
739 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
741 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
747 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
749 <<
", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() <<
"\n";
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
759 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
766 thisSlot.complete =
true;
770 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
771 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
778 thisSlot.eventContext.reset(
nullptr );
788 if ( !nextSnap.states.empty() ) {
802 auto slotIndex = contextPtr->
slot();
808 auto subSlotIndex = contextPtr->
subSlot();
815 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
825 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
843 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
846 error() <<
"*** Stall detected, event context: " << slot.
eventContext.get() <<
endmsg;
860 const uint slotIdx = eventContext->
slot();
862 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
883 std::ostringstream outputMS;
885 outputMS <<
"Dumping scheduler state\n"
886 <<
"=========================================================================================\n"
887 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 <<
"=========================================================================================\n\n";
892 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 <<
"------------------\n\n";
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
919 outputMS <<
" task: " << std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
920 << slot.eventContext->slot();
923 if ( timelineSvc.isValid() ) {
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
929 if ( timelineSvc->getTimelineEvent( te ) )
932 outputMS <<
" thread.id: [unknown]";
937 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
944 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
949 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
952 for (
auto& slot : m_eventSlots ) {
954 if ( slot.complete )
continue;
956 outputMS <<
"[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
959 << ( slot.eventContext->
valid() ?
std::to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" );
961 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
962 outputMS <<
" ]:\n\n";
964 if ( 0 > iSlot || iSlot == slotCount ) {
968 outputMS <<
"ERROR alg(s):";
970 const auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
972 outputMS <<
" " << index2algname( algIndex );
975 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
979 outputMS << m_precSvc->printState( slot ) <<
"\n";
983 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
986 for (
auto& ss : slot.allSubSlots ) {
987 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
989 <<
", entry: " << ss.entryPoint <<
", event: "
990 << ( ss.eventContext->
valid() ?
std::to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
993 outputMS <<
"ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
999 outputMS << m_precSvc->printState( ss ) <<
"\n";
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1013 outputMS <<
"\n=========================================================================================\n"
1014 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 <<
"=========================================================================================\n\n";
1017 info() << outputMS.str() <<
endmsg;
1035 unsigned int algIndex{
ts.algIndex };
1036 std::string_view algName(
ts.algName );
1037 unsigned int algRank{
ts.algRank };
1038 bool asynchronous{
ts.asynchronous };
1039 int slotIndex{
ts.slotIndex };
1042 if ( asynchronous ) {
1050 if ( !asynchronous ) {
1058 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1060 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1061 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1071 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1077 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1100 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1106 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1107 <<
", rank:" <<
ts.algRank <<
", asynchronous:" << (
ts.asynchronous ?
"yes" :
"no" )
1124 std::unique_ptr<EventContext> viewContext ) {
1127 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1135 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.release(),
1138 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1140 if ( viewContextPtr ) {
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.
addSubSlot( std::move( viewContext ), nodeName );
1166 if ( samplePeriod < 0 ) {
1169 this->
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1179 const std::map<std::string, DataObjIDColl>& outDeps )
const {
1181 assert( inDeps.size() == outDeps.size() );
1184 info() <<
"Dumping data dependencies graph to file: " <<
g.fileName() <<
endmsg;
1187 std::set<std::size_t> definedObjects;
1194 std::size_t algoIndex = 0ul;
1195 for (
const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) )
continue;
1197 std::string algIndex =
"Alg_" + std::to_string( algoIndex );
1198 g.addNode( algName, algIndex );
1201 for (
const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1206 if ( inserted )
g.addNode( dep.key(), objIndex );
1208 g.addEdge( dep.key(), objIndex, algName, algIndex );
1211 const auto& odeps = outDeps.at( algName );
1212 for (
const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1217 if ( inserted )
g.addNode( dep.key(), objIndex );
1219 g.addEdge( algName, algIndex, dep.key(), objIndex );