The Gaudi Framework  v33r2 (a6f0ec87)
AvalancheSchedulerSvc Class Reference

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events. More...
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask. More...
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms. More...
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate {true}
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 

Friends

template<class T >
class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, " unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 113 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

Definition at line 152 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Constructor & Destructor Documentation

◆ ~AvalancheSchedulerSvc()

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 378 of file AvalancheSchedulerSvc.cpp.

378  {
379 
380  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
381 
383  error() << "problems initializing ThreadPoolSvc" << endmsg;
385  return;
386  }
387 
388  // Wait for actions pushed into the queue by finishing tasks.
389  action thisAction;
391 
392  m_isActive = ACTIVE;
393 
394  // Continue to wait if the scheduler is running or there is something to do
395  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
396  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
397  m_actionsQueue.pop( thisAction );
398  sc = thisAction();
399  ON_VERBOSE {
400  if ( sc.isFailure() )
401  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
402  else
403  verbose() << "Action succeeded." << endmsg;
404  }
405  else sc.ignore();
406 
407  // If all queued actions have been processed, update the slot states
408  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
409  sc = iterate();
410  ON_VERBOSE {
411  if ( sc.isFailure() )
412  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
413  else
414  verbose() << "Iteration succeeded." << endmsg;
415  }
416  else sc.ignore();
417  }
418  }
419 
420  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
422  error() << "Problems terminating thread pool" << endmsg;
424  }
425 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
#define ON_DEBUG
std::atomic< bool > m_needsUpdate
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
bool isFailure() const
Definition: StatusCode.h:145
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
T load(T... args)
#define ON_VERBOSE

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 208 of file AvalancheSchedulerSvc.h.

208 { return m_algname_index_map[algoname]; };
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 435 of file AvalancheSchedulerSvc.cpp.

435  {
436 
437  if ( m_isActive == ACTIVE ) {
438 
439  // Set the number of slots available to an error code
440  m_freeSlots.store( 0 );
441 
442  // Empty queue
443  action thisAction;
444  while ( m_actionsQueue.try_pop( thisAction ) ) {};
445 
446  // This would be the last action
447  m_actionsQueue.push( [this]() -> StatusCode {
448  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
450  return StatusCode::SUCCESS;
451  } );
452  }
453 
454  return StatusCode::SUCCESS;
455 }
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 768 of file AvalancheSchedulerSvc.cpp.

768  {
769 
770  // To have just one big message
771  std::ostringstream outputMS;
772 
773  outputMS << "Dumping scheduler state\n"
774  << "=========================================================================================\n"
775  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
776  << "=========================================================================================\n\n";
777 
778  //===========================================================================
779 
780  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
781  << "------------------\n\n";
782 
783  // Figure if TimelineSvc is available (used below to detect threads IDs)
784  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
785  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
786  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
787  } else {
788 
789  // Figure optimal printout layout
790  size_t indt( 0 );
791  for ( auto& slot : m_eventSlots )
792  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
793  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
794 
795  // Figure the last running schedule across all slots
796  for ( auto& slot : m_eventSlots ) {
797  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
798  ++it ) {
799 
800  const std::string& algoName{index2algname( *it )};
801 
802  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
803  << slot.eventContext->slot();
804 
805  // Try to get POSIX threads IDs the currently running tasks are scheduled to
806  if ( timelineSvc.isValid() ) {
807  TimelineEvent te{};
808  te.algorithm = algoName;
809  te.slot = slot.eventContext->slot();
810  te.event = slot.eventContext->evt();
811 
812  if ( timelineSvc->getTimelineEvent( te ) )
813  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
814  else
815  outputMS << " thread.id: [unknown]"; // this means a task has just
816  // been signed off as SCHEDULED,
817  // but has not been assigned to a thread yet
818  // (i.e., not running yet)
819  }
820  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
821  }
822  }
823  }
824 
825  //===========================================================================
826 
827  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
828  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
829 
830  int slotCount = -1;
831  for ( auto& slot : m_eventSlots ) {
832  ++slotCount;
833  if ( slot.complete ) continue;
834 
835  outputMS << "[ slot: "
836  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
837  << " event: "
838  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
839  << " ]:\n\n";
840 
841  if ( 0 > iSlot || iSlot == slotCount ) {
842 
843  // Snapshot of the Control Flow and FSM states
844  outputMS << m_precSvc->printState( slot ) << "\n";
845 
846  // Mention sub slots (this is expensive if the number of sub-slots is high)
847  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
848  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
849  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
850  for ( auto& ss : slot.allSubSlots ) {
851  outputMS << "[ slot: " << slotID << ", sub-slot: "
852  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
853  << ", entry: " << ss.entryPoint << ", event: "
854  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
855  << " ]:\n\n";
856  outputMS << m_precSvc->printState( ss ) << "\n";
857  }
858  }
859  }
860  }
861 
862  //===========================================================================
863 
864  if ( 0 <= iSlot ) {
865  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
866  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
867  }
868 
869  outputMS << "\n=========================================================================================\n"
870  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
871  << "=========================================================================================\n\n";
872 
873  info() << outputMS.str() << endmsg;
874 }
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T to_string(T... args)
std::string algorithm
Definition: ITimelineSvc.h:31
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
T setw(T... args)
STL class.
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:86
Gaudi::Property< bool > m_verboseSubSlots
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
T str(T... args)
T length(T... args)
T hex(T... args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 747 of file AvalancheSchedulerSvc.cpp.

747  {
748  const uint slotIdx = eventContext->slot();
749 
750  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
751 
752  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
753 
754  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
756 
757  // Push into the finished events queue the failed context
758  m_eventSlots[slotIdx].complete = true;
759  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
760 }
ContextID_t slot() const
Definition: EventContext.h:51
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
ContextEvt_t evt() const
Definition: EventContext.h:50
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 347 of file AvalancheSchedulerSvc.cpp.

347  {
348 
350  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
351 
352  sc = deactivate();
353  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
354 
355  info() << "Joining Scheduler thread" << endmsg;
356  m_thread.join();
357 
358  // Final error check after thread pool termination
359  if ( m_isActive == FAILURE ) {
360  error() << "problems in scheduler thread" << endmsg;
361  return StatusCode::FAILURE;
362  }
363 
364  return sc;
365 }
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
T join(T... args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
constexpr static const auto FAILURE
Definition: StatusCode.h:101
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::thread m_thread
The thread in which the activate function runs.

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 535 of file AvalancheSchedulerSvc.cpp.

535 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T... args)

◆ index2algname()

const std::string& AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 214 of file AvalancheSchedulerSvc.h.

214 { return m_algname_vect[index]; };
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 78 of file AvalancheSchedulerSvc.cpp.

78  {
79 
80  // Initialise mother class (read properties, ...)
82  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83 
84  // Get hold of the TBBSvc. This should initialize the thread pool
85  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86  if ( !m_threadPoolSvc.isValid() ) {
87  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
88  return StatusCode::FAILURE;
89  }
90 
91  // Activate the scheduler in another thread.
92  info() << "Activating scheduler in a separate thread" << endmsg;
93  m_thread = std::thread( [this]() { this->activate(); } );
94 
95  while ( m_isActive != ACTIVE ) {
96  if ( m_isActive == FAILURE ) {
97  fatal() << "Terminating initialization" << endmsg;
98  return StatusCode::FAILURE;
99  } else {
100  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
101  sleep( 1 );
102  }
103  }
104 
105  if ( m_enableCondSvc ) {
106  // Get hold of the CondSvc
107  m_condSvc = serviceLocator()->service( "CondSvc" );
108  if ( !m_condSvc.isValid() ) {
109  warning() << "No CondSvc found, or not enabled. "
110  << "Will not manage CondAlgorithms" << endmsg;
111  m_enableCondSvc = false;
112  }
113  }
114 
115  // Get the algo resource pool
116  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
117  if ( !m_algResourcePool.isValid() ) {
118  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
119  return StatusCode::FAILURE;
120  }
121 
122  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
123  if ( !m_algExecStateSvc.isValid() ) {
124  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
125  return StatusCode::FAILURE;
126  }
127 
128  // Get Whiteboard
130  if ( !m_whiteboard.isValid() ) {
131  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
132  return StatusCode::FAILURE;
133  }
134 
135  // Set the MaxEventsInFlight parameters from the number of WB stores
137 
138  // Set the number of free slots
140 
141  // Get the list of algorithms
143  const unsigned int algsNumber = algos.size();
144  if ( algsNumber != 0 ) {
145  info() << "Found " << algsNumber << " algorithms" << endmsg;
146  } else {
147  error() << "No algorithms found" << endmsg;
148  return StatusCode::FAILURE;
149  }
150 
151  /* Dependencies
152  1) Look for handles in algo, if none
153  2) Assume none are required
154  */
155 
156  DataObjIDColl globalInp, globalOutp;
157 
158  // figure out all outputs
159  for ( IAlgorithm* ialgoPtr : algos ) {
160  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
161  if ( !algoPtr ) {
162  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
163  return StatusCode::FAILURE;
164  }
165  for ( auto id : algoPtr->outputDataObjs() ) globalOutp.insert( id );
166  }
167 
168  std::ostringstream ostdd;
169  ostdd << "Data Dependencies for Algorithms:";
170 
171  std::map<std::string, DataObjIDColl> algosDependenciesMap;
172  for ( IAlgorithm* ialgoPtr : algos ) {
173  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
174  if ( nullptr == algoPtr ) {
175  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176  << ": this will result in a crash." << endmsg;
177  return StatusCode::FAILURE;
178  }
179 
180  ostdd << "\n " << algoPtr->name();
181 
182  DataObjIDColl algoDependencies;
183  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
184  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
185  DataObjID id = *idp;
186  ostdd << "\n o INPUT " << id;
187  if ( id.key().find( ":" ) != std::string::npos ) {
188  ostdd << " contains alternatives which require resolution...\n";
189  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
190  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
191  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
192  } );
193  if ( itok != tokens.end() ) {
194  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
195  id.updateKey( *itok );
196  } else {
197  error() << "failed to find alternate in global output list"
198  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
199  m_showDataDeps = true;
200  }
201  }
202  algoDependencies.insert( id );
203  globalInp.insert( id );
204  }
205  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
206  ostdd << "\n o OUTPUT " << *id;
207  if ( id->key().find( ":" ) != std::string::npos ) {
208  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
209  << endmsg;
210  m_showDataDeps = true;
211  }
212  }
213  } else {
214  ostdd << "\n none";
215  }
216  algosDependenciesMap[algoPtr->name()] = algoDependencies;
217  }
218 
219  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
220 
221  // Check if we have unmet global input dependencies, and, optionally, heal them
222  // WARNING: this step must be done BEFORE the Precedence Service is initialized
223  if ( m_checkDeps ) {
224  DataObjIDColl unmetDep;
225  for ( auto o : globalInp )
226  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
227 
228  if ( unmetDep.size() > 0 ) {
229 
230  std::ostringstream ost;
231  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232  ost << "\n o " << *o << " required by Algorithm: ";
233 
234  for ( const auto& p : algosDependenciesMap )
235  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
236  }
237 
238  if ( !m_useDataLoader.empty() ) {
239 
240  // Find the DataLoader Alg
241  IAlgorithm* dataLoaderAlg( nullptr );
242  for ( IAlgorithm* algo : algos )
243  if ( algo->name() == m_useDataLoader ) {
244  dataLoaderAlg = algo;
245  break;
246  }
247 
248  if ( dataLoaderAlg == nullptr ) {
249  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
250  << "\" found, and unmet INPUT dependencies "
251  << "detected:\n"
252  << ost.str() << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
257  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
258 
259  // Set the property Load of DataLoader Alg
260  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
261  if ( !dataAlg ) {
262  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
263  << endmsg;
264  return StatusCode::FAILURE;
265  }
266 
267  for ( auto& id : unmetDep ) {
268  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
269  << dataLoaderAlg->name() << endmsg;
271  }
272 
273  } else {
274  fatal() << "Auto DataLoading not requested, "
275  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
276  return StatusCode::FAILURE;
277  }
278 
279  } else {
280  info() << "No unmet INPUT data dependencies were found" << endmsg;
281  }
282  }
283 
284  // Get the precedence service
285  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
286  if ( !m_precSvc.isValid() ) {
287  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
288  return StatusCode::FAILURE;
289  }
290  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
291  if ( !precSvc ) {
292  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  // Fill the containers to convert algo names to index
297  m_algname_vect.resize( algsNumber );
298  for ( IAlgorithm* algo : algos ) {
299  const std::string& name = algo->name();
300  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
301  m_algname_index_map[name] = index;
302  m_algname_vect.at( index ) = name;
303  }
304 
305  // Shortcut for the message service
306  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
307  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
308 
310  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
311  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
312  m_eventSlots.back().complete = true;
313  }
314 
315  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
316 
317  // Clearly inform about the level of concurrency
318  info() << "Concurrency level information:" << endmsg;
319  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
320  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
321 
322  // Inform about task scheduling prescriptions
323  info() << "Task scheduling settings:" << endmsg;
324  info() << " o Avalanche generation mode: "
325  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
326  info() << " o Preemptive scheduling of CPU-blocking tasks: "
328  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
329  : "disabled" )
330  << endmsg;
331  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
332 
334 
336 
337  // Simulate execution flow
339 
340  return sc;
341 }
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
StatusCode initialize() override
Definition: Service.cpp:70
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
T empty(T... args)
Gaudi::Property< std::string > m_whiteboardSvcName
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
T to_string(T... args)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
void activate()
Activate scheduler.
T end(T... args)
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
STL class.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T... args)
Gaudi::Property< bool > m_checkDeps
STL class.
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T... args)
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:86
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:284
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T str(T... args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
SmartIF< IThreadPoolSvc > m_threadPoolSvc
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:38
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
T insert(T... args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
T find_if(T... args)
T size(T... args)
Gaudi::Property< bool > m_simulateExecution
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:89
T back(T... args)
constexpr static const auto FAILURE
Definition: StatusCode.h:101
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
const DataObjIDColl & inputDataObjs() const override
virtual void dumpControlFlow() const =0
Dump precedence rules.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:73
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:549
T reserve(T... args)
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 729 of file AvalancheSchedulerSvc.cpp.

729  {
730 
731  if ( !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
732  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) ) {
733 
734  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
735 
736  return true;
737  }
738  return false;
739 }
ContextID_t slot() const
Definition: EventContext.h:51
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 582 of file AvalancheSchedulerSvc.cpp.

582  {
583 
584  StatusCode global_sc( StatusCode::SUCCESS );
585 
586  // Retry algorithms
587  const size_t retries = m_retryQueue.size();
588  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
589  TaskSpec retryTS = std::move( m_retryQueue.front() );
590  m_retryQueue.pop();
591  global_sc = schedule( std::move( retryTS ) );
592  }
593 
594  // Loop over all slots
595  for ( EventSlot& thisSlot : m_eventSlots ) {
596 
597  // Ignore slots without a valid context (relevant when populating scheduler for first time)
598  if ( !thisSlot.eventContext ) continue;
599 
600  int iSlot = thisSlot.eventContext->slot();
601 
602  // Cache the states of the algorithms to improve readability and performance
603  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
604 
605  StatusCode partial_sc( StatusCode::FAILURE, true );
606 
607  // Perform DR->SCHEDULED
608  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
609  uint algIndex{*it};
610  const std::string& algName{index2algname( algIndex )};
611  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
612  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
613 
614  partial_sc =
615  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, thisSlot.eventContext.get() ) );
616 
617  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
618  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
619  << " on processing slot " << iSlot << endmsg;
620  }
621 
622  // Check for algorithms ready in sub-slots
623  for ( auto& subslot : thisSlot.allSubSlots ) {
624  auto& subslotStates = subslot.algsStates;
625  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
626  uint algIndex{*it};
627  const std::string& algName{index2algname( algIndex )};
628  unsigned int rank{m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName )};
629  bool blocking{m_enablePreemptiveBlockingTasks ? m_precSvc->isBlocking( algName ) : false};
630  partial_sc =
631  schedule( TaskSpec( nullptr, algIndex, algName, rank, blocking, iSlot, subslot.eventContext.get() ) );
632  }
633  }
634 
635  if ( m_dumpIntraEventDynamics ) {
637  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
638  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
639  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
641 #if TBB_INTERFACE_VERSION_MAJOR < 12
642  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
643 #else
645 #endif // TBB_INTERFACE_VERSION_MAJOR < 12
646  std::ofstream myfile;
647  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
648  myfile << s.str();
649  myfile.close();
650  }
651 
652  // Not complete because this would mean that the slot is already free!
653  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
654  !thisSlot.algsStates.containsAny(
655  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
656  !subSlotAlgsInStates( thisSlot,
657  {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS} ) &&
658  !thisSlot.complete ) {
659 
660  thisSlot.complete = true;
661  // if the event did not fail, add it to the finished events
662  // otherwise it is taken care of in the error handling
663  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
664  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
665  << thisSlot.eventContext->slot() << ")." << endmsg;
666  m_finishedEvents.push( thisSlot.eventContext.release() );
667  }
668 
669  // now let's return the fully evaluated result of the control flow
670  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
671 
672  thisSlot.eventContext.reset( nullptr );
673 
674  } else if ( isStalled( thisSlot ) ) {
675  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
676  eventFailed( thisSlot.eventContext.get() ); // can't release yet
677  }
678  partial_sc.ignore();
679  } // end loop on slots
680 
681  ON_VERBOSE verbose() << "Iteration done." << endmsg;
682  m_needsUpdate.store( false );
683  return global_sc;
684 }
#define ON_DEBUG
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Class representing an event slot.
Definition: EventSlot.h:24
T open(T... args)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::queue< TaskSpec > m_retryQueue
std::atomic< bool > m_needsUpdate
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
T to_string(T... args)
T store(T... args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
Gaudi::Property< std::string > m_optimizationMode
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
T hardware_concurrency(T... args)
STL class.
STL class.
StatusCode schedule(TaskSpec &&)
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T... args)
size_t sizeOfSubset(State state) const
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
T move(T... args)
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Iterator begin(State kind)
virtual uint getPriority(const std::string &) const =0
Get task priority.
string s
Definition: gaudirun.py:328
constexpr static const auto FAILURE
Definition: StatusCode.h:101
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
virtual const std::string printState(EventSlot &) const =0
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
#define ON_VERBOSE
Iterator end(State kind)

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 541 of file AvalancheSchedulerSvc.cpp.

541  {
542 
543  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
544  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
545  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
546  // << " active: " << m_isActive << endmsg;
547  return StatusCode::FAILURE;
548  } else {
549  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
550  // << " active: " << m_isActive << endmsg;
551  m_finishedEvents.pop( eventContext );
552  ++m_freeSlots;
553  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
554  return StatusCode::SUCCESS;
555  }
556 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:51
ContextEvt_t evt() const
Definition: EventContext.h:50
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:101
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 466 of file AvalancheSchedulerSvc.cpp.

466  {
467 
468  if ( !eventContext ) {
469  fatal() << "Event context is nullptr" << endmsg;
470  return StatusCode::FAILURE;
471  }
472 
473  if ( m_freeSlots.load() == 0 ) {
474  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
475  return StatusCode::FAILURE;
476  }
477 
478  // no problem as push new event is only called from one thread (event loop manager)
479  --m_freeSlots;
480 
481  auto action = [this, eventContext]() -> StatusCode {
482  // Event processing slot forced to be the same as the wb slot
483  const unsigned int thisSlotNum = eventContext->slot();
484  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
485  if ( !thisSlot.complete ) {
486  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
487  return StatusCode::FAILURE;
488  }
489 
490  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
491  thisSlot.reset( eventContext );
492 
493  // Result status code:
495 
496  // promote to CR and DR the initial set of algorithms
497  Cause cs = {Cause::source::Root, "RootDecisionHub"};
498  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
499  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
500  result = StatusCode::FAILURE;
501  }
502 
503  if ( this->iterate().isFailure() ) {
504  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
505  result = StatusCode::FAILURE;
506  }
507 
508  return result;
509  }; // end of lambda
510 
511  // Kick off scheduling
512  ON_VERBOSE {
513  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
514  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
515  }
516 
517  m_actionsQueue.push( action );
518 
519  return StatusCode::SUCCESS;
520 }
#define ON_DEBUG
Class representing an event slot.
Definition: EventSlot.h:24
ContextID_t slot() const
Definition: EventContext.h:51
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextEvt_t evt() const
Definition: EventContext.h:50
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
bool isFailure() const
Definition: StatusCode.h:145
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 524 of file AvalancheSchedulerSvc.cpp.

524  {
525  StatusCode sc;
526  for ( auto context : eventContexts ) {
527  sc = pushNewEvent( context );
528  if ( sc != StatusCode::SUCCESS ) return sc;
529  }
530  return sc;
531 }
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int  iAlgo,
EventContext contextPtr,
AState  state,
bool  iterate = false 
)
private

Definition at line 688 of file AvalancheSchedulerSvc.cpp.

688  {
689  StatusCode sc;
690  auto slotIndex = contextPtr->slot();
691  EventSlot& slot = m_eventSlots[slotIndex];
692  Cause cs = {Cause::source::Task, index2algname( iAlgo )};
693 
694  if ( UNLIKELY( contextPtr->usesSubSlot() ) ) {
695  // Sub-slot
696  auto subSlotIndex = contextPtr->subSlot();
697  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
698 
699  sc = subSlot.algsStates.set( iAlgo, state );
700 
701  if ( LIKELY( sc.isSuccess() ) ) {
702  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
703  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
704  // Revise states of algorithms downstream the precedence graph
705  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
706  }
707  } else {
708  // Event level (standard behaviour)
709  sc = slot.algsStates.set( iAlgo, state );
710 
711  if ( LIKELY( sc.isSuccess() ) ) {
712  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
713  << ", event:" << contextPtr->evt() << "]" << endmsg;
714  // Revise states of algorithms downstream the precedence graph
715  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
716  }
717  }
718  return sc;
719 }
#define UNLIKELY(x)
Definition: Kernel.h:106
Class representing an event slot.
Definition: EventSlot.h:24
ContextID_t slot() const
Definition: EventContext.h:51
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextEvt_t evt() const
Definition: EventContext.h:50
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
bool isSuccess() const
Definition: StatusCode.h:366
#define LIKELY(x)
Definition: Kernel.h:105
ContextID_t subSlot() const
Definition: EventContext.h:52
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
bool usesSubSlot() const
Definition: EventContext.h:53
#define ON_VERBOSE

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec &&  ts)
private

Definition at line 878 of file AvalancheSchedulerSvc.cpp.

878  {
879 
881  m_retryQueue.push( std::move( ts ) );
882  return StatusCode::SUCCESS;
883  }
884 
885  // Check if a free Algorithm instance is available
886  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
887 
888  // If an instance is available, proceed to scheduling
889  StatusCode sc;
890  if ( LIKELY( getAlgSC.isSuccess() ) ) {
891 
892  // Decide how to schedule the task and schedule it
893  if ( LIKELY( -100 != m_threadPoolSize ) ) {
894 
895  // Cache values before moving the TaskSpec further
896  unsigned int algIndex{ts.algIndex};
897  std::string_view algName( ts.algName );
898  unsigned int algRank{ts.algRank};
899  bool blocking{ts.blocking};
900  int slotIndex{ts.slotIndex};
901  EventContext* contextPtr{ts.contextPtr};
902 
903  if ( LIKELY( !blocking ) ) {
904  // Add the algorithm to the scheduled queue
905  m_scheduledQueue.push( std::move( ts ) );
906 
907  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
908  auto algoTask =
909  new ( tbb::task::allocate_root() ) AlgTask<tbb::task>( this, serviceLocator(), m_algExecStateSvc );
910 
911  // schedule the task
912  tbb::task::enqueue( *algoTask );
913  ++m_algosInFlight;
914 
915  } else { // schedule blocking algorithm in independent thread
916 
917  // Prepare Gaudi task that will execute the Algorithm according to the above queued specs
918  auto algoTask = AlgTask<ITask>( std::move( ts ), this, serviceLocator(), m_algExecStateSvc );
919 
920  // Schedule the blocking task in an independent thread
922  std::thread _t( std::move( algoTask ) );
923  _t.detach();
924 
925  } // end scheduling blocking Algorithm
926 
927  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
928 
929  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
930  << ", rank:" << algRank << ", blocking:" << ( blocking ? "yes" : "no" )
931  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
933  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
934  : "" )
935  << endmsg;
936 
937  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
939  ++m_algosInFlight;
940  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
941  theTask();
942  --m_algosInFlight;
943  }
944  } else { // if no Algorithm instance available, retry later
945 
946  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
947  // Add the algorithm to the retry queue
948  m_retryQueue.push( std::move( ts ) );
949  }
950 
952 
953  return sc;
954 }
#define ON_DEBUG
#define UNLIKELY(x)
Definition: Kernel.h:106
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:287
std::queue< TaskSpec > m_retryQueue
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
T to_string(T... args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
This class represents an entry point to all the event specific data.
Definition: EventContext.h:34
virtual StatusCode acquireAlgorithm(std::string_view name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
unsigned int m_algosInFlight
Number of algorithms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
Gaudi::Property< int > m_threadPoolSize
T move(T... args)
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
#define ON_VERBOSE

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 995 of file AvalancheSchedulerSvc.cpp.

996  {
997  // Prevent view nesting
998  if ( sourceContext->usesSubSlot() ) {
999  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1000  return StatusCode::FAILURE;
1001  }
1002 
1003  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1004 
1005  // It's not possible to create an std::functional from a move-capturing lambda
1006  // So, we have to release the unique pointer
1007  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1008  &nodeName]() -> StatusCode {
1009  // Attach the sub-slot to the top-level slot
1010  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1011 
1012  if ( viewContextPtr ) {
1013  // Re-create the unique pointer
1014  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1015  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1016  return StatusCode::SUCCESS;
1017  } else {
1018  // Disable the view node if there are no views
1019  topSlot.disableSubSlots( nodeName );
1020  return StatusCode::SUCCESS;
1021  }
1022  };
1023 
1024  m_actionsQueue.push( std::move( action ) );
1025 
1026  return StatusCode::SUCCESS;
1027 }
Class representing an event slot.
Definition: EventSlot.h:24
ContextID_t slot() const
Definition: EventContext.h:51
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
T release(T... args)
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T move(T... args)
T get(T... args)
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool usesSubSlot() const
Definition: EventContext.h:53
#define ON_VERBOSE

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 961 of file AvalancheSchedulerSvc.cpp.

961  {
962 
963  Gaudi::Hive::setCurrentContext( ts.contextPtr );
964 
965  if ( LIKELY( !ts.blocking ) )
966  --m_algosInFlight;
967  else
969 
970  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
971  AState state = algstate.execStatus().isSuccess()
972  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
973  : AState::ERROR;
974 
975  // Update algorithm state and revise the downstream states
976  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
977 
978  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
979  << ", rank:" << ts.algRank << ", blocking:" << ( ts.blocking ? "yes" : "no" )
980  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
982  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
983  : "" )
984  << endmsg;
985 
986  // Prompt a call to updateStates
987  m_needsUpdate.store( true );
988  return sc;
989 }
#define ON_DEBUG
AlgsExecutionStates::State AState
std::atomic< bool > m_needsUpdate
T to_string(T... args)
T store(T... args)
bool filterPassed() const
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
unsigned int m_algosInFlight
Number of algorithms presently in flight.
const StatusCode & execStatus() const
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isSuccess() const
Definition: StatusCode.h:366
GAUDI_API void setCurrentContext(const EventContext *ctx)
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 562 of file AvalancheSchedulerSvc.cpp.

562  {
563 
564  if ( m_finishedEvents.try_pop( eventContext ) ) {
565  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
566  << endmsg;
567  ++m_freeSlots;
568  return StatusCode::SUCCESS;
569  }
570  return StatusCode::FAILURE;
571 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:51
ContextEvt_t evt() const
Definition: EventContext.h:50
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:101
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202

Friends And Related Function Documentation

◆ AlgTask

template<class T >
friend class AlgTask
friend

Definition at line 116 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 275 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 235 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 208 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 214 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 241 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 270 of file AvalancheSchedulerSvc.h.

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 244 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 175 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 238 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 169 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 180 of file AvalancheSchedulerSvc.h.

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly."}

Definition at line 171 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 226 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 232 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 229 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 202 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 325 of file AvalancheSchedulerSvc.h.

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms"}

Definition at line 162 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 324 of file AvalancheSchedulerSvc.h.

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate {true}
private

Definition at line 318 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 167 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 220 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 315 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 314 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 188 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 182 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 185 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 164 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 205 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread."}

Definition at line 156 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 323 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 177 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
private

Definition at line 191 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 223 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 161 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: