28 #include <string_view> 
   30 #include <unordered_set> 
   33 #include "boost/algorithm/string.hpp" 
   34 #include "boost/thread.hpp" 
   35 #include "boost/tokenizer.hpp" 
   37 #include "tbb/tbb_stddef.h" 
   42 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 
   43 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 
   46   struct DataObjIDSorter {
 
   54     v.reserve( coll.
size() );
 
   55     for ( 
const DataObjID& 
id : coll ) 
v.push_back( &
id );
 
   62                         [testStates]( 
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
 
   78   if ( sc.
isFailure() ) warning() << 
"Base class could not be initialized" << 
endmsg;
 
   83     fatal() << 
"Error retrieving ThreadPoolSvc" << 
endmsg;
 
   88     fatal() << 
"Cannot cast ThreadPoolSvc" << 
endmsg;
 
   93     fatal() << 
"Cannot find valid TBB task_arena" << 
endmsg;
 
   98   info() << 
"Activating scheduler in a separate thread" << 
endmsg;
 
  103       fatal() << 
"Terminating initialization" << 
endmsg;
 
  106       ON_DEBUG debug() << 
"Waiting for AvalancheSchedulerSvc to activate" << 
endmsg;
 
  115       warning() << 
"No CondSvc found, or not enabled. " 
  116                 << 
"Will not manage CondAlgorithms" << 
endmsg;
 
  124     fatal() << 
"Error retrieving AlgoResourcePool" << 
endmsg;
 
  130     fatal() << 
"Error retrieving AlgExecStateSvc" << 
endmsg;
 
  137     fatal() << 
"Error retrieving EventDataSvc interface IHiveWhiteBoard." << 
endmsg;
 
  149   const unsigned int            algsNumber = algos.
size();
 
  150   if ( algsNumber != 0 ) {
 
  151     info() << 
"Found " << algsNumber << 
" algorithms" << 
endmsg;
 
  153     error() << 
"No algorithms found" << 
endmsg;
 
  168       fatal() << 
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << 
endmsg;
 
  175   ostdd << 
"Data Dependencies for Algorithms:";
 
  180     if ( 
nullptr == algoPtr ) {
 
  181       fatal() << 
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->
name()
 
  182               << 
": this will result in a crash." << 
endmsg;
 
  186     ostdd << 
"\n  " << algoPtr->
name();
 
  192         ostdd << 
"\n    o INPUT  " << id;
 
  193         if ( 
id.
key().find( 
":" ) != std::string::npos ) {
 
  194           ostdd << 
" contains alternatives which require resolution...\n";
 
  195           auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
 
  197             return globalOutp.find( DataObjID{t} ) != globalOutp.
end();
 
  199           if ( itok != tokens.end() ) {
 
  200             ostdd << 
"found matching output for " << *itok << 
" -- updating scheduler info\n";
 
  201             id.updateKey( *itok );
 
  203             error() << 
"failed to find alternate in global output list" 
  204                     << 
" for id: " << 
id << 
" in Alg " << algoPtr->
name() << 
endmsg;
 
  208         algoDependencies.
insert( 
id );
 
  212         ostdd << 
"\n    o OUTPUT " << *id;
 
  213         if ( id->key().find( 
":" ) != std::string::npos ) {
 
  214           error() << 
" in Alg " << algoPtr->
name() << 
" alternatives are NOT allowed for outputs! id: " << *
id 
  222     algosDependenciesMap[algoPtr->name()] = algoDependencies;
 
  225   if ( m_showDataDeps ) { info() << ostdd.str() << 
endmsg; }
 
  231     for ( 
auto o : globalInp )
 
  232       if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.
insert( o );
 
  234     if ( unmetDep.
size() > 0 ) {
 
  236       auto printUnmet = [&]( 
auto msg ) {
 
  237         for ( 
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
 
  238           msg << 
"   o " << *o << 
"    required by Algorithm: " << 
endmsg;
 
  240           for ( 
const auto& p : algosDependenciesMap )
 
  241             if ( p.second.find( *o ) != p.second.end() ) 
msg << 
"       * " << p.first << 
endmsg;
 
  245       if ( !m_useDataLoader.empty() ) {
 
  250           if ( algo->name() == m_useDataLoader ) {
 
  251             dataLoaderAlg = algo;
 
  255         if ( dataLoaderAlg == 
nullptr ) {
 
  256           fatal() << 
"No DataLoader Algorithm \"" << m_useDataLoader.value()
 
  257                   << 
"\" found, and unmet INPUT dependencies " 
  259           printUnmet( fatal() );
 
  263         info() << 
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << 
"/" 
  264                << dataLoaderAlg->name() << 
"\" Algorithm" << 
endmsg;
 
  265         printUnmet( info() );
 
  270           fatal() << 
"Unable to dcast DataLoader \"" << m_useDataLoader.value() << 
"\" IAlg to Gaudi::Algorithm" 
  275         for ( 
auto& 
id : unmetDep ) {
 
  276           ON_DEBUG debug() << 
"adding OUTPUT dep \"" << 
id << 
"\" to " << dataLoaderAlg->type() << 
"/" 
  277                            << dataLoaderAlg->name() << 
endmsg;
 
  282         fatal() << 
"Auto DataLoading not requested, " 
  283                 << 
"and the following unmet INPUT dependencies were found:" << 
endmsg;
 
  284         printUnmet( fatal() );
 
  289       info() << 
"No unmet INPUT data dependencies were found" << 
endmsg;
 
  294   m_precSvc = serviceLocator()->service( 
"PrecedenceSvc" );
 
  295   if ( !m_precSvc.isValid() ) {
 
  296     fatal() << 
"Error retrieving PrecedenceSvc" << 
endmsg;
 
  301     fatal() << 
"Unable to dcast PrecedenceSvc" << 
endmsg;
 
  306   m_algname_vect.resize( algsNumber );
 
  310     m_algname_index_map[
name]  = index;
 
  311     m_algname_vect.at( index ) = 
name;
 
  316   if ( !messageSvc.isValid() ) error() << 
"Error retrieving MessageSvc interface IMessageSvc." << 
endmsg;
 
  318   m_eventSlots.reserve( m_maxEventsInFlight );
 
  319   for ( 
size_t i = 0; i < m_maxEventsInFlight; ++i ) {
 
  321     m_eventSlots.back().complete = 
true;
 
  324   if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
 
  327   info() << 
"Concurrency level information:" << 
endmsg;
 
  328   info() << 
" o Number of events in flight: " << m_maxEventsInFlight << 
endmsg;
 
  329   info() << 
" o TBB thread pool size: " << m_threadPoolSize << 
endmsg;
 
  332   info() << 
"Task scheduling settings:" << 
endmsg;
 
  333   info() << 
" o Avalanche generation mode: " 
  334          << ( m_optimizationMode.empty() ? 
"disabled" : m_optimizationMode.toString() ) << 
endmsg;
 
  335   info() << 
" o Preemptive scheduling of CPU-blocking tasks: " 
  336          << ( m_enablePreemptiveBlockingTasks
 
  337                   ? ( 
"enabled (max. " + 
std::to_string( m_maxBlockingAlgosInFlight ) + 
" concurrent tasks)" )
 
  340   info() << 
" o Scheduling of condition tasks: " << ( m_enableCondSvc ? 
"enabled" : 
"disabled" ) << 
endmsg;
 
  342   if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
 
  344   if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
 
  347   if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
 
  359   if ( sc.
isFailure() ) warning() << 
"Base class could not be finalized" << 
endmsg;
 
  362   if ( sc.
isFailure() ) warning() << 
"Scheduler could not be deactivated" << 
endmsg;
 
  364   info() << 
"Joining Scheduler thread" << 
endmsg;
 
  369     error() << 
"problems in scheduler thread" << 
endmsg;
 
  389   ON_DEBUG debug() << 
"AvalancheSchedulerSvc::activate()" << 
endmsg;
 
  392     error() << 
"problems initializing ThreadPoolSvc" << 
endmsg;
 
  410         verbose() << 
"Action did not succeed (which is not bad per se)." << 
endmsg;
 
  421           verbose() << 
"Iteration did not succeed (which is not bad per se)." << 
endmsg;
 
  429   ON_DEBUG debug() << 
"Terminating thread-pool resources" << 
endmsg;
 
  431     error() << 
"Problems terminating thread pool" << 
endmsg;
 
  477   if ( !eventContext ) {
 
  478     fatal() << 
"Event context is nullptr" << 
endmsg;
 
  483     ON_DEBUG debug() << 
"A free processing slot could not be found." << 
endmsg;
 
  492     const unsigned int thisSlotNum = eventContext->
slot();
 
  495       fatal() << 
"The slot " << thisSlotNum << 
" is supposed to be a finished event but it's not" << 
endmsg;
 
  499     ON_DEBUG debug() << 
"Executing event " << eventContext->
evt() << 
" on slot " << thisSlotNum << 
endmsg;
 
  500     thisSlot.
reset( eventContext );
 
  507     if ( 
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
 
  508       error() << 
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << 
endmsg;
 
  512     if ( this->
iterate().isFailure() ) {
 
  513       error() << 
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << 
endmsg;
 
  522     verbose() << 
"Pushing the action to update the scheduler for slot " << eventContext->
slot() << 
endmsg;
 
  535   for ( 
auto context : eventContexts ) {
 
  562     ON_DEBUG debug() << 
"Popped slot " << eventContext->
slot() << 
" (event " << eventContext->
evt() << 
")" << 
endmsg;
 
  574     ON_DEBUG debug() << 
"Try Pop successful slot " << eventContext->
slot() << 
"(event " << eventContext->
evt() << 
")" 
  597   for ( 
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
 
  604   OccupancySnapshot nextSnap;
 
  609     if ( !thisSlot.eventContext ) 
continue;
 
  611     int iSlot = thisSlot.eventContext->slot();
 
  623       if ( nextSnap.states.empty() ) {
 
  630       slotStateTotals.
resize( AState::MAXVALUE );
 
  632         slotStateTotals[
state] = thisSlot.algsStates.sizeOfSubset( 
AState( 
state ) );
 
  636       for ( 
auto& subslot : thisSlot.allSubSlots ) {
 
  638           slotStateTotals[
state] += subslot.algsStates.sizeOfSubset( 
AState( 
state ) );
 
  644     for ( 
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
 
  651           schedule( 
TaskSpec( 
nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
 
  654           << 
"Could not apply transition from " << AState::DATAREADY << 
" for algorithm " << algName
 
  655           << 
" on processing slot " << iSlot << 
endmsg;
 
  659     for ( 
auto& subslot : thisSlot.allSubSlots ) {
 
  660       auto& subslotStates = subslot.algsStates;
 
  661       for ( 
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
 
  667             schedule( 
TaskSpec( 
nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
 
  673       s << 
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) << 
", " 
  685     if ( 
m_precSvc->CFRulesResolved( thisSlot ) &&
 
  686          !thisSlot.algsStates.containsAny(
 
  687              {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
 
  688          !subSlotAlgsInStates( thisSlot,
 
  689                                {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
 
  690          !thisSlot.complete ) {
 
  692       thisSlot.complete = 
true;
 
  696         ON_DEBUG debug() << 
"Event " << thisSlot.eventContext->evt() << 
" finished (slot " 
  697                          << thisSlot.eventContext->slot() << 
")." << 
endmsg;
 
  704       thisSlot.eventContext.reset( 
nullptr );
 
  714   if ( !nextSnap.states.empty() ) {
 
  728   auto       slotIndex = contextPtr->
slot();
 
  734     auto       subSlotIndex = contextPtr->
subSlot();
 
  741                            << 
", subslot:" << subSlotIndex << 
", event:" << contextPtr->
evt() << 
"]" << 
endmsg;
 
  751                            << 
", event:" << contextPtr->
evt() << 
"]" << 
endmsg;
 
  770        !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
 
  786   const uint slotIdx = eventContext->
slot();
 
  788   error() << 
"Event " << eventContext->
evt() << 
" on slot " << slotIdx << 
" failed" << 
endmsg;
 
  811   outputMS << 
"Dumping scheduler state\n" 
  812            << 
"=========================================================================================\n" 
  813            << 
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 
  814            << 
"=========================================================================================\n\n";
 
  818   outputMS << 
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 
  819            << 
"------------------\n\n";
 
  823   if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
 
  824     outputMS << 
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
 
  830       for ( 
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
 
  835       for ( 
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
 
  840         outputMS << 
"  task: " << 
std::setw( indt ) << algoName << 
" evt/slot: " << slot.eventContext->evt() << 
"/" 
  841                  << slot.eventContext->slot();
 
  844         if ( timelineSvc.isValid() ) {
 
  847           te.slot      = slot.eventContext->slot();
 
  848           te.event     = slot.eventContext->evt();
 
  850           if ( timelineSvc->getTimelineEvent( te ) )
 
  853             outputMS << 
" thread.id: [unknown]"; 
 
  858         outputMS << 
" state: [" << 
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << 
"]\n";
 
  865   outputMS << 
"\n---------------------------- Task/CF/FSM Mapping " 
  866            << ( 0 > iSlot ? 
"[all slots] --" : 
"[target slot] " ) << 
"--------------------------\n\n";
 
  873   for ( 
auto& slot : m_eventSlots ) {
 
  875     if ( slot.complete ) 
continue;
 
  877     outputMS << 
"[ slot: " 
  878              << ( slot.eventContext->valid() ? 
std::to_string( slot.eventContext->slot() ) : 
"[ctx invalid]" )
 
  880              << ( slot.eventContext->valid() ? 
std::
to_string( slot.eventContext->
evt() ) : 
"[ctx invalid]" )
 
  883     if ( 0 > iSlot || iSlot == slotCount ) {
 
  887         outputMS << 
"ERROR alg(s):";
 
  890           outputMS << 
" " << index2algname( *it );
 
  893         if ( errorCount == 0 ) outputMS << 
" in subslot(s)";
 
  897         outputMS << m_precSvc->printState( slot ) << 
"\n";
 
  901       if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
 
  902         outputMS << 
"\nNumber of sub-slots: " << slot.allSubSlots.size() << 
"\n\n";
 
  903         auto slotID = slot.eventContext->valid() ? 
std::to_string( slot.eventContext->slot() ) : 
"[ctx invalid]";
 
  904         for ( 
auto& ss : slot.allSubSlots ) {
 
  905           outputMS << 
"[ slot: " << slotID << 
", sub-slot: " 
  906                    << ( ss.eventContext->valid() ? 
std::to_string( ss.eventContext->subSlot() ) : 
"[ctx invalid]" )
 
  907                    << 
", entry: " << ss.entryPoint << 
", event: " 
  908                    << ( ss.eventContext->valid() ? 
std::
to_string( ss.eventContext->
evt() ) : 
"[ctx invalid]" )
 
  911             outputMS << 
"ERROR alg(s):";
 
  913               outputMS << 
" " << index2algname( *it );
 
  917             outputMS << m_precSvc->printState( ss ) << 
"\n";
 
  926   if ( 0 <= iSlot && !wasAlgError ) {
 
  927     outputMS << 
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
 
  928     m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
 
  931   outputMS << 
"\n=========================================================================================\n" 
  932            << 
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 
  933            << 
"=========================================================================================\n\n";
 
  935   info() << outputMS.str() << 
endmsg;
 
  958       unsigned int     algIndex{
ts.algIndex};
 
  959       std::string_view algName( 
ts.algName );
 
  960       unsigned int     algRank{
ts.algRank};
 
  961       bool             blocking{
ts.blocking};
 
  962       int              slotIndex{
ts.slotIndex};
 
  965       if ( 
LIKELY( !blocking ) ) {
 
  983       sc = 
revise( algIndex, contextPtr, AState::SCHEDULED );
 
  985       ON_DEBUG debug() << 
"Scheduled " << algName << 
" [slot:" << slotIndex << 
", event:" << contextPtr->evt()
 
  986                        << 
", rank:" << algRank << 
", blocking:" << ( blocking ? 
"yes" : 
"no" )
 
  995       sc = 
revise( 
ts.algIndex, 
ts.contextPtr, AState::SCHEDULED );
 
 1001     sc = 
revise( 
ts.algIndex, 
ts.contextPtr, AState::RESOURCELESS );
 
 1027                      ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
 
 1033   ON_DEBUG debug() << 
"Executed " << 
ts.algName << 
" [slot:" << 
ts.slotIndex << 
", event:" << 
ts.contextPtr->evt()
 
 1034                    << 
", rank:" << 
ts.algRank << 
", blocking:" << ( 
ts.blocking ? 
"yes" : 
"no" )
 
 1054     fatal() << 
"Attempted to nest EventViews at node " << nodeName << 
": this is not supported" << 
endmsg;
 
 1062   auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
 
 1065     EventSlot& topSlot = this->m_eventSlots[slotIndex];
 
 1067     if ( viewContextPtr ) {
 
 1093     if ( samplePeriod < 0 ) {