29 #include <string_view>
31 #include <unordered_set>
34 #include "boost/algorithm/string.hpp"
35 #include "boost/thread.hpp"
36 #include "boost/tokenizer.hpp"
38 #include "tbb/tbb_stddef.h"
43 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
44 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
47 struct DataObjIDSorter {
55 v.reserve( coll.
size() );
56 for (
const DataObjID&
id : coll )
v.push_back( &
id );
63 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
79 if ( sc.
isFailure() ) warning() <<
"Base class could not be initialized" <<
endmsg;
84 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
89 fatal() <<
"Cannot cast ThreadPoolSvc" <<
endmsg;
94 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
99 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
104 fatal() <<
"Terminating initialization" <<
endmsg;
107 ON_DEBUG debug() <<
"Waiting for AvalancheSchedulerSvc to activate" <<
endmsg;
116 warning() <<
"No CondSvc found, or not enabled. "
117 <<
"Will not manage CondAlgorithms" <<
endmsg;
125 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
131 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
138 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
150 const unsigned int algsNumber = algos.
size();
151 if ( algsNumber != 0 ) {
152 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
154 error() <<
"No algorithms found" <<
endmsg;
170 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
179 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
183 ostdd <<
"Data Dependencies for Algorithms:";
188 if (
nullptr == algoPtr ) {
189 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
190 <<
": this will result in a crash." <<
endmsg;
198 ostdd <<
"\n " << algoPtr->
name();
200 auto write_owners = [&avis, &ostdd](
const DataObjID id ) {
209 ostdd <<
"\n o INPUT " << id;
211 if (
id.
key().find(
":" ) != std::string::npos ) {
212 ostdd <<
" contains alternatives which require resolution...\n";
213 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":" } };
215 return globalOutp.find( DataObjID{ t } ) != globalOutp.
end();
217 if ( itok != tokens.end() ) {
218 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
219 id.updateKey( *itok );
221 error() <<
"failed to find alternate in global output list"
222 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
226 algoDependencies.
insert(
id );
230 ostdd <<
"\n o OUTPUT " << *id;
232 if ( id->key().find(
":" ) != std::string::npos ) {
233 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
241 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
244 if ( m_showDataDeps ) { info() << ostdd.str() <<
endmsg; }
249 if ( m_checkDeps || m_checkOutput ) {
251 for (
auto o : globalInp ) {
254 requiredInputKeys.
insert( o.key() );
255 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.
insert( o );
257 if ( m_checkOutput ) {
258 for (
auto o : globalOutp ) {
259 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.
find( o.key() ) == requiredInputKeys.
end() ) {
262 for (
const std::string& algoName : m_checkOutputIgnoreList ) {
263 auto it = algosOutputDependenciesMap.find( algoName );
264 if ( it != algosOutputDependenciesMap.end() ) {
265 if ( it->second.find( o ) != it->second.end() ) {
271 if ( !ignored ) { unusedOutp.
insert( o ); }
278 if ( unmetDepInp.
size() > 0 ) {
280 auto printUnmet = [&](
auto msg ) {
281 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
282 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
284 for (
const auto& p : algosInputDependenciesMap )
285 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
289 if ( !m_useDataLoader.empty() ) {
294 if ( m_useDataLoader == algo->name() ) {
295 dataLoaderAlg = algo;
299 if ( dataLoaderAlg ==
nullptr ) {
300 fatal() <<
"No DataLoader Algorithm \"" << m_useDataLoader.value()
301 <<
"\" found, and unmet INPUT dependencies "
303 printUnmet( fatal() );
307 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() <<
"/"
308 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
309 printUnmet( info() );
314 fatal() <<
"Unable to dcast DataLoader \"" << m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm"
319 for (
auto&
id : unmetDepInp ) {
320 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->type() <<
"/"
321 << dataLoaderAlg->name() <<
endmsg;
326 fatal() <<
"Auto DataLoading not requested, "
327 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
328 printUnmet( fatal() );
333 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
337 if ( m_checkOutput ) {
338 if ( unusedOutp.
size() > 0 ) {
340 auto printUnusedOutp = [&](
auto msg ) {
341 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
342 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
344 for (
const auto& p : algosOutputDependenciesMap )
345 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
349 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
350 printUnusedOutp( fatal() );
353 info() <<
"No unused OUTPUT items were found" <<
endmsg;
358 m_precSvc = serviceLocator()->service(
"PrecedenceSvc" );
359 if ( !m_precSvc.isValid() ) {
360 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
365 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
370 m_algname_vect.resize( algsNumber );
374 m_algname_index_map[
name] = index;
375 m_algname_vect.at( index ) =
name;
380 if ( !messageSvc.isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
382 m_eventSlots.reserve( m_maxEventsInFlight );
383 for (
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
385 m_eventSlots.back().complete =
true;
388 if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
391 info() <<
"Concurrency level information:" <<
endmsg;
392 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
393 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
396 info() <<
"Task scheduling settings:" <<
endmsg;
397 info() <<
" o Avalanche generation mode: "
398 << ( m_optimizationMode.empty() ?
"disabled" : m_optimizationMode.toString() ) <<
endmsg;
399 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
400 << ( m_enablePreemptiveBlockingTasks
401 ? (
"enabled (max. " +
std::to_string( m_maxBlockingAlgosInFlight ) +
" concurrent tasks)" )
404 info() <<
" o Scheduling of condition tasks: " << ( m_enableCondSvc ?
"enabled" :
"disabled" ) <<
endmsg;
406 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
408 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
411 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
423 if ( sc.
isFailure() ) warning() <<
"Base class could not be finalized" <<
endmsg;
426 if ( sc.
isFailure() ) warning() <<
"Scheduler could not be deactivated" <<
endmsg;
428 info() <<
"Joining Scheduler thread" <<
endmsg;
433 error() <<
"problems in scheduler thread" <<
endmsg;
453 ON_DEBUG debug() <<
"AvalancheSchedulerSvc::activate()" <<
endmsg;
456 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
474 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
485 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
493 ON_DEBUG debug() <<
"Terminating thread-pool resources" <<
endmsg;
495 error() <<
"Problems terminating thread pool" <<
endmsg;
541 if ( !eventContext ) {
542 fatal() <<
"Event context is nullptr" <<
endmsg;
547 ON_DEBUG debug() <<
"A free processing slot could not be found." <<
endmsg;
556 const unsigned int thisSlotNum = eventContext->
slot();
559 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
563 ON_DEBUG debug() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
564 thisSlot.
reset( eventContext );
571 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
572 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
576 if ( this->
iterate().isFailure() ) {
577 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
586 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
599 for (
auto context : eventContexts ) {
626 ON_DEBUG debug() <<
"Popped slot " << eventContext->
slot() <<
" (event " << eventContext->
evt() <<
")" <<
endmsg;
638 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
661 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
668 OccupancySnapshot nextSnap;
673 if ( !thisSlot.eventContext )
continue;
675 int iSlot = thisSlot.eventContext->slot();
687 if ( nextSnap.states.empty() ) {
694 slotStateTotals.
resize( AState::MAXVALUE );
696 slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset(
AState(
state ) );
700 for (
auto& subslot : thisSlot.allSubSlots ) {
702 slotStateTotals[
state] += subslot.algsStates.sizeOfSubset(
AState(
state ) );
708 auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
709 for ( uint algIndex : drAlgs ) {
715 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
718 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
719 <<
" on processing slot " << iSlot <<
endmsg;
723 for (
auto& subslot : thisSlot.allSubSlots ) {
724 auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
725 for ( uint algIndex : drAlgsSubSlot ) {
730 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
736 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
748 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
749 !thisSlot.algsStates.containsAny(
750 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
751 !subSlotAlgsInStates( thisSlot,
752 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
753 !thisSlot.complete ) {
755 thisSlot.complete =
true;
759 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
760 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
767 thisSlot.eventContext.reset(
nullptr );
777 if ( !nextSnap.states.empty() ) {
791 auto slotIndex = contextPtr->
slot();
797 auto subSlotIndex = contextPtr->
subSlot();
804 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
814 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
832 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
833 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
849 const uint slotIdx = eventContext->
slot();
851 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
874 outputMS <<
"Dumping scheduler state\n"
875 <<
"=========================================================================================\n"
876 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
877 <<
"=========================================================================================\n\n";
881 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
882 <<
"------------------\n\n";
886 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
887 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
894 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
895 for ( uint algIndex : schedAlgs ) {
903 auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
904 for ( uint algIndex : schedAlgs ) {
908 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
909 << slot.eventContext->slot();
912 if ( timelineSvc.isValid() ) {
915 te.slot = slot.eventContext->slot();
916 te.event = slot.eventContext->evt();
918 if ( timelineSvc->getTimelineEvent( te ) )
921 outputMS <<
" thread.id: [unknown]";
926 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
933 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
934 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
938 subSlotAlgsInStates( m_eventSlots[iSlot], {
AState::ERROR } )
941 for (
auto& slot : m_eventSlots ) {
943 if ( slot.complete )
continue;
945 outputMS <<
"[ slot: "
946 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
948 << ( slot.eventContext->valid() ?
std::
to_string( slot.eventContext->
evt() ) :
"[ctx invalid]" )
951 if ( 0 > iSlot || iSlot == slotCount ) {
955 outputMS <<
"ERROR alg(s):";
957 auto& errorAlgs = slot.algsStates.algsInState(
AState::ERROR );
958 for ( uint algIndex : errorAlgs ) {
959 outputMS <<
" " << index2algname( algIndex );
962 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
966 outputMS << m_precSvc->printState( slot ) <<
"\n";
970 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
971 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
972 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
973 for (
auto& ss : slot.allSubSlots ) {
974 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
975 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
976 <<
", entry: " << ss.entryPoint <<
", event: "
977 << ( ss.eventContext->valid() ?
std::
to_string( ss.eventContext->
evt() ) :
"[ctx invalid]" )
980 outputMS <<
"ERROR alg(s):";
981 auto& errorAlgs = ss.algsStates.algsInState(
AState::ERROR );
982 for ( uint algIndex : errorAlgs ) { outputMS <<
" " << index2algname( algIndex ); }
986 outputMS << m_precSvc->printState( ss ) <<
"\n";
995 if ( 0 <= iSlot && !wasAlgError ) {
996 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
997 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1000 outputMS <<
"\n=========================================================================================\n"
1001 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1002 <<
"=========================================================================================\n\n";
1004 info() << outputMS.str() <<
endmsg;
1027 unsigned int algIndex{
ts.algIndex };
1028 std::string_view algName(
ts.algName );
1029 unsigned int algRank{
ts.algRank };
1030 bool blocking{
ts.blocking };
1031 int slotIndex{
ts.slotIndex };
1052 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1054 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1055 <<
", rank:" << algRank <<
", blocking:" << ( blocking ?
"yes" :
"no" )
1064 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::SCHEDULED );
1070 sc =
revise(
ts.algIndex,
ts.contextPtr, AState::RESOURCELESS );
1096 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1102 ON_DEBUG debug() <<
"Executed " <<
ts.algName <<
" [slot:" <<
ts.slotIndex <<
", event:" <<
ts.contextPtr->evt()
1103 <<
", rank:" <<
ts.algRank <<
", blocking:" << (
ts.blocking ?
"yes" :
"no" )
1123 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1131 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1134 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1136 if ( viewContextPtr ) {
1162 if ( samplePeriod < 0 ) {