Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
AvalancheSchedulerSvc Class Reference

Introduction

More...

#include <src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Public Member Functions

 ~AvalancheSchedulerSvc () override=default
 Destructor. More...
 
StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service. More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service. More...
 
StatusCode sysStart () override
 Initialize Service. More...
 
StatusCode sysStop () override
 Initialize Service. More...
 
StatusCode sysFinalize () override
 Finalize Service. More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
StatusCode setProperties ()
 Method for setting declared properties to the values specified for the job. More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, std::string toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p) override
 set the property form another property More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const std::string &n, const std::string &v) override
 set the property from name and the value More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (const std::string &name) const override
 get the property by name More...
 
StatusCode getProperty (const std::string &n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (const std::string &name) const override
 Return true if we have a property with the given name. More...
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const
 Backward compatibility function for getting the output level. More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode updateStates (int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
 Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state) More...
 
StatusCode promoteToScheduled (unsigned int iAlgo, int si, EventContext *)
 Algorithm promotion. More...
 
StatusCode promoteToAsyncScheduled (unsigned int iAlgo, int si, EventContext *)
 
StatusCode promoteToExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the AlgoExecutionTask. More...
 
StatusCode promoteToAsyncExecuted (unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
 The call to this method is triggered only from within the IOBoundAlgTask. More...
 
StatusCode promoteToFinished (unsigned int iAlgo, int si)
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< std::stringm_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
 
Gaudi::Property< std::stringm_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
 
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_useIOBoundAlgScheduler
 
Gaudi::Property< bool > m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
 
std::atomic< ActivationStatem_isActive {INACTIVE}
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
SmartIF< IAcceleratorm_IOBoundAlgScheduler
 A shortcut to IO-bound algorithm scheduler. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_IOBoundAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
std::vector< unsigned int > m_actionsCounts
 Bookkeeping of the number of actions in flight per slot. More...
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
size_t m_maxEventsInFlight {0}
 
size_t m_maxAlgosInFlight {1}
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
 ~Service () override
 Standard Destructor. More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
Gaudi::Details::PropertyBaseproperty (const std::string &name) const
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state. More...
 
Gaudi::Property< int > m_outputLevel {this, "OutputLevel", MSG::NIL, "output level"}
 
Gaudi::Property< bool > m_auditInit {this, "AuditServices", false, "[[deprecated]] unused"}
 
Gaudi::Property< bool > m_auditorInitialize {this, "AuditInitialize", false, "trigger auditor on initialize()"}
 
Gaudi::Property< bool > m_auditorStart {this, "AuditStart", false, "trigger auditor on start()"}
 
Gaudi::Property< bool > m_auditorStop {this, "AuditStop", false, "trigger auditor on stop()"}
 
Gaudi::Property< bool > m_auditorFinalize {this, "AuditFinalize", false, "trigger auditor on finalize()"}
 
Gaudi::Property< bool > m_auditorReinitialize {this, "AuditReinitialize", false, "trigger auditor on reinitialize()"}
 
Gaudi::Property< bool > m_auditorRestart {this, "AuditRestart", false, "trigger auditor on restart()"}
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service. More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by scheduling the CPU-blocking tasks efficiently. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 100 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

Definition at line 135 of file AvalancheSchedulerSvc.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

AvalancheSchedulerSvc::~AvalancheSchedulerSvc ( )
overridedefault

Destructor.

Member Function Documentation

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 362 of file AvalancheSchedulerSvc.cpp.

362  {
363 
364  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
365 
367  error() << "problems initializing ThreadPoolSvc" << endmsg;
369  return;
370  }
371 
372  // Wait for actions pushed into the queue by finishing tasks.
373  action thisAction;
375 
376  m_isActive = ACTIVE;
377 
378  // Continue to wait if the scheduler is running or there is something to do
379  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
380  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
381  m_actionsQueue.pop( thisAction );
382  sc = thisAction();
383  ON_VERBOSE {
384  if ( sc != StatusCode::SUCCESS )
385  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
386  else
387  verbose() << "Action succeeded." << endmsg;
388  }
389  }
390 
391  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
393  error() << "Problems terminating thread pool" << endmsg;
395  }
396 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
#define ON_DEBUG
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
bool isFailure() const
Definition: StatusCode.h:130
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 436 of file AvalancheSchedulerSvc.cpp.

436  {
437  unsigned int index = m_algname_index_map[algoname];
438  return index;
439 }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 406 of file AvalancheSchedulerSvc.cpp.

406  {
407 
408  if ( m_isActive == ACTIVE ) {
409 
410  // Set the number of slots available to an error code
411  m_freeSlots.store( 0 );
412 
413  // Empty queue
414  action thisAction;
415  while ( m_actionsQueue.try_pop( thisAction ) ) {};
416 
417  // This would be the last action
418  m_actionsQueue.push( [this]() -> StatusCode {
419  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
421  return StatusCode::SUCCESS;
422  } );
423  }
424 
425  return StatusCode::SUCCESS;
426 }
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 765 of file AvalancheSchedulerSvc.cpp.

765  {
766 
767  // To have just one big message
768  std::ostringstream outputMS;
769 
770  outputMS << "Dumping scheduler state\n"
771  << "=========================================================================================\n"
772  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
773  << "=========================================================================================\n\n";
774 
775  //===========================================================================
776 
777  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
778  << "------------------\n\n";
779 
780  // Figure if TimelineSvc is available (used below to detect threads IDs)
781  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
782  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
783  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
784  } else {
785 
786  // Figure optimal printout layout
787  size_t indt( 0 );
788  for ( auto& slot : m_eventSlots )
789  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
790  if ( index2algname( *it ).length() > indt ) indt = index2algname( *it ).length();
791 
792  // Figure the last running schedule across all slots
793  for ( auto& slot : m_eventSlots ) {
794  for ( auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
795  ++it ) {
796 
797  const std::string algoName{index2algname( *it )};
798 
799  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
800  << slot.eventContext->slot();
801 
802  // Try to get POSIX threads IDs the currently running tasks are scheduled to
803  if ( timelineSvc.isValid() ) {
804  TimelineEvent te{};
805  te.algorithm = algoName;
806  te.slot = slot.eventContext->slot();
807  te.event = slot.eventContext->evt();
808 
809  if ( timelineSvc->getTimelineEvent( te ) )
810  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
811  else
812  outputMS << " thread.id: [unknown]"; // this means a task has just
813  // been signed off as SCHEDULED,
814  // but has not been assigned to a thread yet
815  // (i.e., not running yet)
816  }
817  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
818  }
819  }
820  }
821 
822  //===========================================================================
823 
824  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
825  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
826 
827  int slotCount = -1;
828  for ( auto& slot : m_eventSlots ) {
829  ++slotCount;
830  if ( slot.complete ) continue;
831 
832  outputMS << "[ slot: "
833  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
834  << " event: "
835  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" )
836  << " ]:\n\n";
837 
838  if ( 0 > iSlot || iSlot == slotCount ) {
839 
840  // Snapshot of the Control Flow and FSM states
841  outputMS << m_precSvc->printState( slot ) << "\n";
842 
843  // Mention sub slots (this is expensive if the number of sub-slots is high)
844  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
845  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
846  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
847  for ( auto& ss : slot.allSubSlots ) {
848  outputMS << "[ slot: " << slotID << " sub-slot entry: " << ss.entryPoint << " event: "
849  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
850  << " ]:\n\n";
851  outputMS << m_precSvc->printState( ss ) << "\n";
852  }
853  }
854  }
855  }
856 
857  //===========================================================================
858 
859  if ( 0 <= iSlot ) {
860  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
861  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
862  }
863 
864  outputMS << "\n=========================================================================================\n"
865  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
866  << "=========================================================================================\n\n";
867 
868  info() << outputMS.str() << endmsg;
869 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
T to_string(T...args)
std::string algorithm
Definition: ITimelineSvc.h:21
T setw(T...args)
virtual const std::string printState(EventSlot &) const =0
STL class.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
Gaudi::Property< bool > m_verboseSubSlots
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
T length(T...args)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
T hex(T...args)
std::vector< EventSlot > m_eventSlots
Vector of events slots.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 744 of file AvalancheSchedulerSvc.cpp.

744  {
745  const uint slotIdx = eventContext->slot();
746 
747  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
748 
749  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
750 
751  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
753 
754  // Push into the finished events queue the failed context
755  m_eventSlots[slotIdx].complete = true;
756  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
757 }
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
ContextID_t slot() const
Definition: EventContext.h:48
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
ContextEvt_t evt() const
Definition: EventContext.h:47
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 331 of file AvalancheSchedulerSvc.cpp.

331  {
332 
334  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
335 
336  sc = deactivate();
337  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
338 
339  info() << "Joining Scheduler thread" << endmsg;
340  m_thread.join();
341 
342  // Final error check after thread pool termination
343  if ( m_isActive == FAILURE ) {
344  error() << "problems in scheduler thread" << endmsg;
345  return StatusCode::FAILURE;
346  }
347 
348  return sc;
349 }
StatusCode finalize() override
Definition: Service.cpp:164
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode deactivate()
Deactivate scheduler.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::thread m_thread
The thread in which the activate function runs.
unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 519 of file AvalancheSchedulerSvc.cpp.

519 { return std::max( m_freeSlots.load(), 0 ); }
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T max(T...args)
const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 432 of file AvalancheSchedulerSvc.cpp.

432 { return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 62 of file AvalancheSchedulerSvc.cpp.

62  {
63 
64  // Initialise mother class (read properties, ...)
66  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
67 
68  // Get hold of the TBBSvc. This should initialize the thread pool
69  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
70  if ( !m_threadPoolSvc.isValid() ) {
71  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
72  return StatusCode::FAILURE;
73  }
74 
75  // Activate the scheduler in another thread.
76  info() << "Activating scheduler in a separate thread" << endmsg;
77  m_thread = std::thread( [this]() { this->activate(); } );
78 
79  while ( m_isActive != ACTIVE ) {
80  if ( m_isActive == FAILURE ) {
81  fatal() << "Terminating initialization" << endmsg;
82  return StatusCode::FAILURE;
83  } else {
84  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
85  sleep( 1 );
86  }
87  }
88 
89  if ( m_enableCondSvc ) {
90  // Get hold of the CondSvc
91  m_condSvc = serviceLocator()->service( "CondSvc" );
92  if ( !m_condSvc.isValid() ) {
93  warning() << "No CondSvc found, or not enabled. "
94  << "Will not manage CondAlgorithms" << endmsg;
95  m_enableCondSvc = false;
96  }
97  }
98 
99  // Get the algo resource pool
100  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
101  if ( !m_algResourcePool.isValid() ) {
102  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
107  if ( !m_algExecStateSvc.isValid() ) {
108  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
109  return StatusCode::FAILURE;
110  }
111 
112  // Get Whiteboard
114  if ( !m_whiteboard.isValid() ) {
115  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
116  return StatusCode::FAILURE;
117  }
118 
119  // Get dedicated scheduler for I/O-bound algorithms
120  if ( m_useIOBoundAlgScheduler ) {
123  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
124  }
125 
126  // Set the MaxEventsInFlight parameters from the number of WB stores
128 
129  // Set the number of free slots
131 
132  // Get the list of algorithms
134  const unsigned int algsNumber = algos.size();
135  info() << "Found " << algsNumber << " algorithms" << endmsg;
136 
137  /* Dependencies
138  1) Look for handles in algo, if none
139  2) Assume none are required
140  */
141 
142  DataObjIDColl globalInp, globalOutp;
143 
144  // figure out all outputs
145  for ( IAlgorithm* ialgoPtr : algos ) {
146  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
147  if ( !algoPtr ) {
148  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
149  return StatusCode::FAILURE;
150  }
151  for ( auto id : algoPtr->outputDataObjs() ) {
152  auto r = globalOutp.insert( id );
153  if ( !r.second ) {
154  warning() << "multiple algorithms declare " << id
155  << " as output! could be a single instance in multiple paths "
156  "though, or control flow may guarantee only one runs...!"
157  << endmsg;
158  }
159  }
160  }
161 
162  std::ostringstream ostdd;
163  ostdd << "Data Dependencies for Algorithms:";
164 
165  std::map<std::string, DataObjIDColl> algosDependenciesMap;
166  for ( IAlgorithm* ialgoPtr : algos ) {
167  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
168  if ( nullptr == algoPtr ) {
169  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
170  << ": this will result in a crash." << endmsg;
171  return StatusCode::FAILURE;
172  }
173 
174  ostdd << "\n " << algoPtr->name();
175 
176  DataObjIDColl algoDependencies;
177  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
178  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
179  DataObjID id = *idp;
180  ostdd << "\n o INPUT " << id;
181  if ( id.key().find( ":" ) != std::string::npos ) {
182  ostdd << " contains alternatives which require resolution...\n";
183  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
184  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
185  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
186  } );
187  if ( itok != tokens.end() ) {
188  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
189  id.updateKey( *itok );
190  } else {
191  error() << "failed to find alternate in global output list"
192  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
193  m_showDataDeps = true;
194  }
195  }
196  algoDependencies.insert( id );
197  globalInp.insert( id );
198  }
199  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
200  ostdd << "\n o OUTPUT " << *id;
201  if ( id->key().find( ":" ) != std::string::npos ) {
202  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
203  << endmsg;
204  m_showDataDeps = true;
205  }
206  }
207  } else {
208  ostdd << "\n none";
209  }
210  algosDependenciesMap[algoPtr->name()] = algoDependencies;
211  }
212 
213  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
214 
215  // Check if we have unmet global input dependencies, and, optionally, heal them
216  // WARNING: this step must be done BEFORE the Precedence Service is initialized
217  if ( m_checkDeps ) {
218  DataObjIDColl unmetDep;
219  for ( auto o : globalInp )
220  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
221 
222  if ( unmetDep.size() > 0 ) {
223 
224  std::ostringstream ost;
225  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
226  ost << "\n o " << *o << " required by Algorithm: ";
227 
228  for ( const auto& p : algosDependenciesMap )
229  if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
230  }
231 
232  if ( !m_useDataLoader.empty() ) {
233 
234  // Find the DataLoader Alg
235  IAlgorithm* dataLoaderAlg( nullptr );
236  for ( IAlgorithm* algo : algos )
237  if ( algo->name() == m_useDataLoader ) {
238  dataLoaderAlg = algo;
239  break;
240  }
241 
242  if ( dataLoaderAlg == nullptr ) {
243  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
244  << "\" found, and unmet INPUT dependencies "
245  << "detected:\n"
246  << ost.str() << endmsg;
247  return StatusCode::FAILURE;
248  }
249 
250  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
251  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
252 
253  // Set the property Load of DataLoader Alg
254  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
255  if ( !dataAlg ) {
256  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
257  << endmsg;
258  return StatusCode::FAILURE;
259  }
260 
261  for ( auto& id : unmetDep ) {
262  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
263  << dataLoaderAlg->name() << endmsg;
265  }
266 
267  } else {
268  fatal() << "Auto DataLoading not requested, "
269  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
270  return StatusCode::FAILURE;
271  }
272 
273  } else {
274  info() << "No unmet INPUT data dependencies were found" << endmsg;
275  }
276  }
277 
278  // Get the precedence service
279  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
280  if ( !m_precSvc.isValid() ) {
281  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
282  return StatusCode::FAILURE;
283  }
284  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
285  if ( !precSvc ) {
286  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
287  return StatusCode::FAILURE;
288  }
289 
290  // Fill the containers to convert algo names to index
291  m_algname_vect.resize( algsNumber );
292  for ( IAlgorithm* algo : algos ) {
293  const std::string& name = algo->name();
294  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
295  m_algname_index_map[name] = index;
296  m_algname_vect.at( index ) = name;
297  }
298 
299  // Shortcut for the message service
300  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
301  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
302 
304  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
305  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
306  m_eventSlots.back().complete = true;
307  }
308  m_actionsCounts.assign( m_maxEventsInFlight, 0 );
309 
310  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
311 
312  // Clearly inform about the level of concurrency
313  info() << "Concurrency level information:" << endmsg;
314  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
315  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
316 
318 
320 
321  // Simulate execution flow
323 
324  return sc;
325 }
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:63
Gaudi::Property< bool > m_showDataFlow
#define ON_DEBUG
StatusCode initialize() override
Definition: Service.cpp:60
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
Gaudi::Property< std::string > m_whiteboardSvcName
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:274
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
Gaudi::Property< bool > m_showDataDeps
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:635
const DataObjIDColl & outputDataObjs() const override
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
STL class.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
T resize(T...args)
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
T at(T...args)
virtual void dumpDataFlow() const =0
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
const DataObjIDColl & inputDataObjs() const override
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
virtual void dumpControlFlow() const =0
Dump precedence rules.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
T find_if(T...args)
T size(T...args)
T assign(T...args)
Gaudi::Property< bool > m_simulateExecution
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Gaudi::Property< bool > m_enableCondSvc
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T back(T...args)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual size_t getNumberOfStores() const =0
Get the number of &#39;slots&#39;.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
T reserve(T...args)
T emplace_back(T...args)
std::thread m_thread
The thread in which the activate function runs.
bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 725 of file AvalancheSchedulerSvc.cpp.

725  {
726 
727  if ( m_actionsCounts[slot.eventContext->slot()] == 0 &&
728  !slot.algsStates.containsAny( {AState::DATAREADY, AState::SCHEDULED} ) &&
729  !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
730 
731  error() << "*** Stall detected in slot " << slot.eventContext->slot() << "! ***" << endmsg;
732 
733  return true;
734  }
735  return false;
736 }
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
ContextID_t slot() const
Definition: EventContext.h:48
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 525 of file AvalancheSchedulerSvc.cpp.

525  {
526  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
527  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
528  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
529  // << " active: " << m_isActive << endmsg;
530  return StatusCode::FAILURE;
531  } else {
532  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
533  // << " active: " << m_isActive << endmsg;
534  m_finishedEvents.pop( eventContext );
535  ++m_freeSlots;
536  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
537  return StatusCode::SUCCESS;
538  }
539 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:48
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
ContextEvt_t evt() const
Definition: EventContext.h:47
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the IOBoundAlgTask.

Definition at line 1044 of file AvalancheSchedulerSvc.cpp.

1045  {
1046  Gaudi::Hive::setCurrentContext( eventContext );
1047  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1048 
1049  if ( sc.isFailure() ) {
1050  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1051  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1052  return StatusCode::FAILURE;
1053  }
1054 
1056 
1057  EventSlot& thisSlot = m_eventSlots[si];
1058 
1059  ON_DEBUG debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si
1060  << endmsg;
1061 
1062  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1063  AState state = algstate.execStatus().isSuccess()
1064  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1065  : AState::ERROR;
1066 
1067  // Update states in the appropriate slot
1068  int subSlotIndex = -1;
1069  if ( eventContext->usesSubSlot() ) {
1070  // Sub-slot
1071  subSlotIndex = eventContext->subSlot();
1072  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1073  } else {
1074  // Event level (standard behaviour)
1075  sc = thisSlot.algsStates.set( iAlgo, state );
1076  }
1077 
1078  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1079  << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1080 
1081  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1082  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1083 
1084  // Schedule an update of the status of the algorithms
1085  ++m_actionsCounts[si];
1086  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1087  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1088  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1089  } );
1090 
1091  return sc;
1092 }
#define ON_DEBUG
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
ContextID_t slot() const
Definition: EventContext.h:48
bool isSuccess() const
Definition: StatusCode.h:267
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:50
bool isFailure() const
Definition: StatusCode.h:130
ContextEvt_t evt() const
Definition: EventContext.h:47
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
StatusCode set(unsigned int iAlgo, State newState)
GAUDI_API void setCurrentContext(const EventContext *ctx)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool filterPassed() const
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
ContextID_t subSlot() const
Definition: EventContext.h:49
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const StatusCode & execStatus() const
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Definition at line 934 of file AvalancheSchedulerSvc.cpp.

934  {
935 
937 
938  // bool IOBound = m_precSvc->isBlocking(algName);
939 
940  const std::string& algName( index2algname( iAlgo ) );
941  IAlgorithm* ialgoPtr = nullptr;
942  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
943 
944  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
945 
947  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
948  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
949  return this->AvalancheSchedulerSvc::promoteToAsyncExecuted( iAlgo, eventContext->slot(), ialgoPtr,
950  eventContext );
951  } );
952  return StatusCode::SUCCESS;
953  };
954  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
955  // tbb::task)? it seems it works..
956  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
957  IOBoundAlgTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
958  m_IOBoundAlgScheduler->push( *theTask );
959 
960  ON_DEBUG debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
961  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
962 
963  // Update states in the appropriate event slot
964  StatusCode updateSc;
965  EventSlot& thisSlot = m_eventSlots[si];
966  if ( eventContext->usesSubSlot() ) {
967  // Sub-slot
968  size_t const subSlotIndex = eventContext->subSlot();
969  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
970  } else {
971  // Event level (standard behaviour)
972  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
973  }
974 
975  ON_VERBOSE if ( updateSc.isSuccess() ) verbose()
976  << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
977  return updateSc;
978  } else {
979  ON_DEBUG debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo )
980  << " on slot " << si << endmsg;
981  return sc;
982  }
983 }
#define ON_DEBUG
Wrapper around I/O-bound Gaudi-algorithms.
Class representing an event slot.
Definition: EventSlot.h:14
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
ContextID_t slot() const
Definition: EventContext.h:48
bool isSuccess() const
Definition: StatusCode.h:267
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:50
ContextEvt_t evt() const
Definition: EventContext.h:47
STL class.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode set(unsigned int iAlgo, State newState)
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
virtual StatusCode push(IAlgTask &task)=0
ContextID_t subSlot() const
Definition: EventContext.h:49
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::promoteToExecuted ( unsigned int  iAlgo,
int  si,
IAlgorithm algo,
EventContext eventContext 
)
private

The call to this method is triggered only from within the AlgoExecutionTask.

Definition at line 990 of file AvalancheSchedulerSvc.cpp.

991  {
992  Gaudi::Hive::setCurrentContext( eventContext );
993  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
994 
995  if ( sc.isFailure() ) {
996  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
997  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
998  return StatusCode::FAILURE;
999  }
1000 
1001  --m_algosInFlight;
1002 
1003  EventSlot& thisSlot = m_eventSlots[si];
1004 
1005  ON_DEBUG debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
1006 
1007  const AlgExecState& algstate = m_algExecStateSvc->algExecState( algo, *eventContext );
1008  AState state = algstate.execStatus().isSuccess()
1009  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1010  : AState::ERROR;
1011 
1012  // Update states in the appropriate slot
1013  int subSlotIndex = -1;
1014  if ( eventContext->usesSubSlot() ) {
1015  // Sub-slot
1016  subSlotIndex = eventContext->subSlot();
1017  sc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, state );
1018  } else {
1019  // Event level (standard behaviour)
1020  sc = thisSlot.algsStates.set( iAlgo, state );
1021  }
1022 
1023  ON_VERBOSE if ( sc.isSuccess() ) verbose()
1024  << "Promoting " << algo->name() << " on slot " << si << " to " << state << endmsg;
1025 
1026  ON_DEBUG debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1027  << m_algosInFlight << endmsg;
1028 
1029  // Schedule an update of the status of the algorithms
1030  ++m_actionsCounts[si];
1031  m_actionsQueue.push( [this, si, iAlgo, subSlotIndex]() {
1032  --this->m_actionsCounts[si]; // no bound check needed as decrements/increments are balanced in the current setup
1033  return this->updateStates( -1, iAlgo, subSlotIndex, si );
1034  } );
1035 
1036  return sc;
1037 }
#define ON_DEBUG
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
ContextID_t slot() const
Definition: EventContext.h:48
bool isSuccess() const
Definition: StatusCode.h:267
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:50
bool isFailure() const
Definition: StatusCode.h:130
ContextEvt_t evt() const
Definition: EventContext.h:47
unsigned int m_algosInFlight
Number of algorithms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
StatusCode set(unsigned int iAlgo, State newState)
GAUDI_API void setCurrentContext(const EventContext *ctx)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool filterPassed() const
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
ContextID_t subSlot() const
Definition: EventContext.h:49
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const StatusCode & execStatus() const
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::promoteToFinished ( unsigned int  iAlgo,
int  si 
)
private
StatusCode AvalancheSchedulerSvc::promoteToScheduled ( unsigned int  iAlgo,
int  si,
EventContext eventContext 
)
private

Algorithm promotion.

Definition at line 873 of file AvalancheSchedulerSvc.cpp.

873  {
874 
876 
877  const std::string& algName( index2algname( iAlgo ) );
878  IAlgorithm* ialgoPtr = nullptr;
879  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
880 
881  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
882 
883  ++m_algosInFlight;
884  auto promote2ExecutedClosure = [this, iAlgo, ialgoPtr, eventContext]() {
885  this->m_actionsQueue.push( [this, iAlgo, ialgoPtr, eventContext]() {
886  return this->AvalancheSchedulerSvc::promoteToExecuted( iAlgo, eventContext->slot(), ialgoPtr, eventContext );
887  } );
888  return StatusCode::SUCCESS;
889  };
890 
891  // Avoid to use tbb if the pool size is 1 and run in this thread
892  if ( -100 != m_threadPoolSize ) {
893  // the child task that executes an Algorithm
894  tbb::task* algoTask = new ( tbb::task::allocate_root() )
895  AlgoExecutionTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
896  // schedule the algoTask
897  tbb::task::enqueue( *algoTask );
898 
899  } else {
900  AlgoExecutionTask theTask( ialgoPtr, *eventContext, serviceLocator(), m_algExecStateSvc,
901  promote2ExecutedClosure );
902  theTask.execute();
903  }
904 
905  ON_DEBUG debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot "
906  << si << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
907 
908  // Update states in the appropriate event slot
909  StatusCode updateSc;
910  EventSlot& thisSlot = m_eventSlots[si];
911  if ( eventContext->usesSubSlot() ) {
912  // Sub-slot
913  size_t const subSlotIndex = eventContext->subSlot();
914  updateSc = thisSlot.allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
915  } else {
916  // Event level (standard behaviour)
917  updateSc = thisSlot.algsStates.set( iAlgo, AState::SCHEDULED );
918  }
919 
921 
922  if ( updateSc.isSuccess() )
923  ON_VERBOSE verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
924  return updateSc;
925  } else {
926  ON_DEBUG debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si
927  << endmsg;
928  return sc;
929  }
930 }
#define ON_DEBUG
Class representing an event slot.
Definition: EventSlot.h:14
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
ContextID_t slot() const
Definition: EventContext.h:48
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
bool isSuccess() const
Definition: StatusCode.h:267
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:90
bool usesSubSlot() const
Definition: EventContext.h:50
ContextEvt_t evt() const
Definition: EventContext.h:47
STL class.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode set(unsigned int iAlgo, State newState)
Gaudi::Property< int > m_threadPoolSize
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
ContextID_t subSlot() const
Definition: EventContext.h:49
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:75
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 450 of file AvalancheSchedulerSvc.cpp.

450  {
451 
452  if ( !eventContext ) {
453  fatal() << "Event context is nullptr" << endmsg;
454  return StatusCode::FAILURE;
455  }
456 
457  if ( m_freeSlots.load() == 0 ) {
458  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
459  return StatusCode::FAILURE;
460  }
461 
462  // no problem as push new event is only called from one thread (event loop manager)
463  --m_freeSlots;
464 
465  auto action = [this, eventContext]() -> StatusCode {
466  // Event processing slot forced to be the same as the wb slot
467  const unsigned int thisSlotNum = eventContext->slot();
468  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
469  if ( !thisSlot.complete ) {
470  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
475  thisSlot.reset( eventContext );
476 
477  // Result status code:
479 
480  // promote to CR and DR the initial set of algorithms
481  Cause cs = {Cause::source::Root, "RootDecisionHub"};
482  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
483  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
484  result = StatusCode::FAILURE;
485  }
486 
487  if ( this->updateStates( thisSlotNum ).isFailure() ) {
488  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
489  result = StatusCode::FAILURE;
490  }
491 
492  return result;
493  }; // end of lambda
494 
495  // Kick off the scheduling!
496  ON_VERBOSE {
497  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
498  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
499  }
500 
501  m_actionsQueue.push( action );
502 
503  return StatusCode::SUCCESS;
504 }
#define ON_DEBUG
Class representing an event slot.
Definition: EventSlot.h:14
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:48
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
bool isFailure() const
Definition: StatusCode.h:130
ContextEvt_t evt() const
Definition: EventContext.h:47
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
std::function< StatusCode()> action
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
Definition: EventSlot.h:79
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:39
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 508 of file AvalancheSchedulerSvc.cpp.

508  {
509  StatusCode sc;
510  for ( auto context : eventContexts ) {
511  sc = pushNewEvent( context );
512  if ( sc != StatusCode::SUCCESS ) return sc;
513  }
514  return sc;
515 }
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1098 of file AvalancheSchedulerSvc.cpp.

1099  {
1100  // Prevent view nesting
1101  if ( sourceContext->usesSubSlot() ) {
1102  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1103  return StatusCode::FAILURE;
1104  }
1105 
1106  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1107 
1108  // It's not possible to create an std::functional from a move-capturing lambda
1109  // So, we have to release the unique pointer
1110  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1111  &nodeName]() -> StatusCode {
1112  // Attach the sub-slot to the top-level slot
1113  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1114 
1115  if ( viewContextPtr ) {
1116  // Re-create the unique pointer
1117  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1118  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1119  return StatusCode::SUCCESS;
1120  } else {
1121  // Disable the view node if there are no views
1122  topSlot.disableSubSlots( nodeName );
1123  return StatusCode::SUCCESS;
1124  }
1125  };
1126 
1127  m_actionsQueue.push( std::move( action ) );
1128 
1129  return StatusCode::SUCCESS;
1130 }
Class representing an event slot.
Definition: EventSlot.h:14
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B...
Definition: EventSlot.h:68
ContextID_t slot() const
Definition: EventContext.h:48
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one) ...
Definition: EventSlot.h:51
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
bool usesSubSlot() const
Definition: EventContext.h:50
T release(T...args)
std::function< StatusCode()> action
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
T move(T...args)
T get(T...args)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
#define ON_VERBOSE
StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 545 of file AvalancheSchedulerSvc.cpp.

545  {
546  if ( m_finishedEvents.try_pop( eventContext ) ) {
547  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
548  << endmsg;
549  ++m_freeSlots;
550  return StatusCode::SUCCESS;
551  }
552  return StatusCode::FAILURE;
553 }
#define ON_DEBUG
ContextID_t slot() const
Definition: EventContext.h:48
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
ContextEvt_t evt() const
Definition: EventContext.h:47
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
StatusCode AvalancheSchedulerSvc::updateStates ( int  si = -1,
int  algo_index = -1,
int  sub_slot = -1,
int  source_slot = -1 
)
private

Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skipping an update of the Control Flow state)

Update the state of the algorithms.

The oldest events are checked before the newest, in order to reduce the event backlog. To check if the event is finished the algorithm checks if:

  • No algorithms have been signed off by the control flow
  • No algorithms have been signed off by the data flow
  • No algorithms have been scheduled

Definition at line 567 of file AvalancheSchedulerSvc.cpp.

568  {
569 
570  StatusCode global_sc( StatusCode::SUCCESS );
571 
572  // Sort from the oldest to the newest event
573  // Prepare a vector of pointers to the slots to avoid copies
574  std::vector<EventSlot*> eventSlotsPtrs;
575 
576  // Consider all slots if si <0 or just one otherwise
577  if ( si < 0 ) {
578  const int eventsSlotsSize( m_eventSlots.size() );
579  eventSlotsPtrs.reserve( eventsSlotsSize );
580  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); ++slotIt ) {
581  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
582  }
583  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
584  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
585  } else {
586  eventSlotsPtrs.push_back( &m_eventSlots[si] );
587  }
588 
589  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
590  int iSlot = thisSlotPtr->eventContext->slot();
591 
592  // Cache the states of the algos to improve readability and performance
593  auto& thisSlot = m_eventSlots[iSlot];
594  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
595 
596  // Perform the I->CR->DR transitions
597  if ( algo_index >= 0 ) {
598  Cause cs = {Cause::source::Task, index2algname( algo_index )};
599 
600  // Run in whole-event context if there's no sub-slot index, or the sub-slot has a different parent
601  if ( sub_slot == -1 || iSlot != source_slot ) {
602  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
603  error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
604  global_sc = StatusCode::FAILURE;
605  }
606  } else {
607  if ( m_precSvc->iterate( thisSlot.allSubSlots[sub_slot], cs ).isFailure() ) {
608  error() << "Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot << " of " << iSlot << endmsg;
609  global_sc = StatusCode::FAILURE;
610  }
611  }
612  }
613 
614  StatusCode partial_sc( StatusCode::FAILURE, true );
615 
616  // Perform DR->SCHEDULED
617  if ( !m_optimizationMode.empty() ) {
618  auto comp_nodes = [this]( const uint& i, const uint& j ) {
619  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
620  };
622  comp_nodes, std::vector<uint>() );
623  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it )
624  buffer.push( *it );
625  while ( !buffer.empty() ) {
626  bool IOBound = false;
627  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
628 
629  if ( !IOBound )
630  partial_sc = promoteToScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
631  else
632  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot, thisSlotPtr->eventContext.get() );
633 
634  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
635  << "Could not apply transition from " << AState::DATAREADY << " for algorithm "
636  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
637 
638  buffer.pop();
639  }
640 
641  } else {
642  for ( auto it = thisAlgsStates.begin( AState::DATAREADY ); it != thisAlgsStates.end( AState::DATAREADY ); ++it ) {
643  uint algIndex = *it;
644 
645  bool IOBound = false;
646  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
647 
648  if ( !IOBound )
649  partial_sc = promoteToScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
650  else
651  partial_sc = promoteToAsyncScheduled( algIndex, iSlot, thisSlotPtr->eventContext.get() );
652 
653  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
654  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
655  << " on processing slot " << iSlot << endmsg;
656  }
657  }
658 
659  // Check for algorithms ready in sub-slots
660  for ( auto& subslot : thisSlot.allSubSlots ) {
661  auto& subslotStates = subslot.algsStates;
662  for ( auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
663  uint algIndex{*it};
664  partial_sc = promoteToScheduled( algIndex, iSlot, subslot.eventContext.get() );
665  // The following verbosity is expensive when the number of sub-slots is high
666  /*ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
667  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << index2algname( algIndex )
668  << " on processing subslot " << subslot.eventContext->slot() << endmsg;*/
669  }
670  }
671 
672  if ( m_dumpIntraEventDynamics ) {
674  s << ( algo_index != -1 ? index2algname( algo_index ) : "START" ) << ", "
675  << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
676  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
677  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
679  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
680  std::ofstream myfile;
681  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
682  myfile << s.str();
683  myfile.close();
684  }
685 
686  // Not complete because this would mean that the slot is already free!
687  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
688  !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
689  !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
690  !thisSlot.complete ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling
695  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
696  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
697  << thisSlot.eventContext->slot() << ")." << endmsg;
698  m_finishedEvents.push( thisSlot.eventContext.release() );
699  }
700 
701  // now let's return the fully evaluated result of the control flow
702  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
703 
704  thisSlot.eventContext.reset( nullptr );
705 
706  } else if ( isStalled( thisSlot ) ) {
707  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
708  eventFailed( thisSlot.eventContext.get() ); // can't release yet
709  }
710  } // end loop on slots
711 
712  ON_VERBOSE verbose() << "States Updated." << endmsg;
713 
714  return global_sc;
715 }
#define ON_DEBUG
AlgsExecutionStates::State AState
Class representing an event slot.
Definition: EventSlot.h:14
T open(T...args)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
ContextID_t slot() const
Definition: EventContext.h:48
Gaudi::Property< bool > m_dumpIntraEventDynamics
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
STL namespace.
T end(T...args)
Gaudi::Property< std::string > m_optimizationMode
bool isFailure() const
Definition: StatusCode.h:130
virtual const std::string printState(EventSlot &) const =0
ContextEvt_t evt() const
Definition: EventContext.h:47
Gaudi::Property< bool > m_useIOBoundAlgScheduler
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
virtual uint getPriority(const std::string &) const =0
Get task priority.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
T close(T...args)
T str(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
Gaudi::Property< int > m_threadPoolSize
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
T count(T...args)
T size(T...args)
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
T begin(T...args)
Iterator begin(State kind)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
string s
Definition: gaudirun.py:312
constexpr static const auto FAILURE
Definition: StatusCode.h:86
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T...args)
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:73
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
T reserve(T...args)
#define ON_VERBOSE
Iterator end(State kind)

Member Data Documentation

std::vector<unsigned int> AvalancheSchedulerSvc::m_actionsCounts
private

Bookkeeping of the number of actions in flight per slot.

Definition at line 263 of file AvalancheSchedulerSvc.h.

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 260 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 219 of file AvalancheSchedulerSvc.h.

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 192 of file AvalancheSchedulerSvc.h.

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 198 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 225 of file AvalancheSchedulerSvc.h.

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 255 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps {this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"}
private

Definition at line 156 of file AvalancheSchedulerSvc.h.

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 222 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file"}

Definition at line 151 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc {this, "EnableConditions", false, "Enable ConditionsSvc"}
private

Definition at line 161 of file AvalancheSchedulerSvc.h.

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 210 of file AvalancheSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 216 of file AvalancheSchedulerSvc.h.

std::atomic_int AvalancheSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 213 of file AvalancheSchedulerSvc.h.

unsigned int AvalancheSchedulerSvc::m_IOBoundAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 228 of file AvalancheSchedulerSvc.h.

SmartIF<IAccelerator> AvalancheSchedulerSvc::m_IOBoundAlgScheduler
private

A shortcut to IO-bound algorithm scheduler.

Definition at line 207 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_IOBoundAlgSchedulerSvcName {this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"}
private

Definition at line 143 of file AvalancheSchedulerSvc.h.

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive {INACTIVE}
private

Flag to track if the scheduler is active or not.

Definition at line 183 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight {1}
private

Definition at line 270 of file AvalancheSchedulerSvc.h.

size_t AvalancheSchedulerSvc::m_maxEventsInFlight {0}
private

Definition at line 269 of file AvalancheSchedulerSvc.h.

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxIOBoundAlgosInFlight
private
Initial value:
{this, "MaxIOBoundAlgosInFlight", 0,
"Maximum number of simultaneous I/O-bound algorithms"}

Definition at line 144 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E"}

Definition at line 149 of file AvalancheSchedulerSvc.h.

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 201 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences"}

Definition at line 169 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms"}

Definition at line 163 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms"}

Definition at line 166 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution"}

Definition at line 146 of file AvalancheSchedulerSvc.h.

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 186 of file AvalancheSchedulerSvc.h.

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"}

Definition at line 139 of file AvalancheSchedulerSvc.h.

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 268 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm"}

Definition at line 158 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_useIOBoundAlgScheduler
private
Initial value:
{this, "PreemptiveIOBoundTasks", false,
"Turn on preemptive way of scheduling of I/O-bound algorithms"}

Definition at line 153 of file AvalancheSchedulerSvc.h.

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots {this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"}
private

Definition at line 172 of file AvalancheSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 204 of file AvalancheSchedulerSvc.h.

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName {this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"}
private

Definition at line 142 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files: