The Gaudi Framework  master (ff829712)
Loading...
Searching...
No Matches
AvalancheSchedulerSvc Class Reference

#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

StatusCode initialize () override
 Initialise.
 
StatusCode finalize () override
 Finalise.
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler.
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available.
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler.
 
unsigned int freeSlots () override
 Get free slots number.
 
void dumpState () override
 Dump scheduler state for all slots.
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views.
 
virtual void recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
 Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.
 
bool next (TaskSpec &ts, bool asynchronous)
 
- Public Member Functions inherited from extends< Service, IScheduler >
void const * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast.
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface.
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames.
 
- Public Member Functions inherited from Service
const std::string & name () const override
 Retrieve name of the service.
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service.
 
StatusCode sysStart () override
 Initialize Service.
 
StatusCode sysStop () override
 Initialize Service.
 
StatusCode sysFinalize () override
 Finalize Service.
 
StatusCode sysReinitialize () override
 Re-initialize the Service.
 
StatusCode sysRestart () override
 Re-initialize the Service.
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor.
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator.
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T>
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T>
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T>
StatusCode declareTool (ToolHandle< T > &handle, const std::string &toolTypeAndName, bool createIf=true)
 Declare used tool.
 
template<class T>
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T>
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked.
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property.
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals.
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string.
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value
 
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property.
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property.
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation.
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property.
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property.
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals.
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string.
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name.
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream)
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor.
 
const SmartIF< IMessageSvc > & msgSvc () const
 The standard message service.
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream.
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts.
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS)
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL)
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR)
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR)
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING)
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO)
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG)
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE)
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO)
 

Private Types

enum  ActivationState { INACTIVE = 0 , ACTIVE = 1 , FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function<StatusCode()>
 

Private Member Functions

StatusCode dumpGraphFile (const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
 
void activate ()
 Activate scheduler.
 
StatusCode deactivate ()
 Deactivate scheduler.
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer.
 
const std::string & index2algname (unsigned int index)
 Convert an integer to a name.
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask.
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall.
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed.
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler.
 

Private Attributes

std::chrono::duration< int64_t, std::milli > m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
 
std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now()
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< int > m_maxParallelismExtra
 
Gaudi::Property< std::string > m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::string > m_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< int > m_numOffloadThreads
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< bool > m_checkOutput
 
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
 
Gaudi::Property< std::string > m_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
 
Gaudi::Property< std::string > m_dataDepsGraphFile
 
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
 
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
 
std::atomic< ActivationStatem_isActive { INACTIVE }
 Flag to track if the scheduler is active or not.
 
std::thread m_thread
 The thread in which the activate function runs.
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion.
 
std::vector< std::string > m_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion.
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service.
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard.
 
std::vector< EventSlotm_eventSlots
 Vector of events slots.
 
std::atomic_int m_freeSlots { 0 }
 Atomic to account for asyncronous updates by the scheduler wrt the rest.
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events.
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager.
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling.
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight.
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight.
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool.
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution.
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms.
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledAsynchronousQueue
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate { true }
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
tbb::task_arena * m_arena { nullptr }
 
std::unique_ptr< FiberManagerm_fiberManager { nullptr }
 
size_t m_maxEventsInFlight { 0 }
 
size_t m_maxAlgosInFlight { 1 }
 

Friends

class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class
 Typedef to this class.
 
using extend_interfaces_base
 Typedef to the base of this class.
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory<IService*( const std::string&, ISvcLocator* )>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl
 Typedef used to refer to this class from derived classes, as in.
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids
 take union of the ext_iids of all Interfaces...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 
int outputLevel () const
 get the Service's output level
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches.
 
MSG::Level resetMessaging ()
 Reinitialize internal states.
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream.
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state.
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state.
 
Gaudi::Property< int > m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" }
 flag indicating whether ToolHandle tools have been added to m_tools
 
Gaudi::Property< bool > m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" }
 
Gaudi::Property< bool > m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" }
 
Gaudi::Property< bool > m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" }
 
Gaudi::Property< bool > m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" }
 
Gaudi::Property< bool > m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" }
 
Gaudi::Property< bool > m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" }
 
Gaudi::Property< bool > m_autoRetrieveTools
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service.
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 113 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

using AvalancheSchedulerSvc::action = std::function<StatusCode()>
private

Definition at line 160 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 162 of file AvalancheSchedulerSvc.h.

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 458 of file AvalancheSchedulerSvc.cpp.

458 {
459
460 ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
461
462 if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
463 error() << "problems initializing ThreadPoolSvc" << endmsg;
465 return;
466 }
467
468 // Wait for actions pushed into the queue by finishing tasks.
469 action thisAction;
470 StatusCode sc( StatusCode::SUCCESS );
471
473
474 // Continue to wait if the scheduler is running or there is something to do
475 ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
476 while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
477 m_actionsQueue.pop( thisAction );
478 sc = thisAction();
479 ON_VERBOSE {
480 if ( sc.isFailure() )
481 verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
482 else
483 verbose() << "Action succeeded." << endmsg;
484 }
485 else sc.ignore();
486
487 // If all queued actions have been processed, update the slot states
488 if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
489 sc = iterate();
490 ON_VERBOSE {
491 if ( sc.isFailure() )
492 verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
493 else
494 verbose() << "Iteration succeeded." << endmsg;
495 }
496 else sc.ignore();
497 }
498 }
499
500 ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
501 if ( m_threadPoolSvc->terminatePool().isFailure() ) {
502 error() << "Problems terminating thread pool" << endmsg;
504 }
505}
#define ON_VERBOSE
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define ON_DEBUG
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< int > m_threadPoolSize
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::atomic< bool > m_needsUpdate
Gaudi::Property< int > m_maxParallelismExtra
std::function< StatusCode()> action
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
constexpr static const auto SUCCESS
Definition StatusCode.h:99

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string & algoname)
inlineprivate

Convert a name to an integer.

Definition at line 251 of file AvalancheSchedulerSvc.h.

251{ return m_algname_index_map[algoname]; }
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 515 of file AvalancheSchedulerSvc.cpp.

515 {
516
517 if ( m_isActive == ACTIVE ) {
518
519 // Set the number of slots available to an error code
520 m_freeSlots.store( 0 );
521
522 // Empty queue
523 action thisAction;
524 while ( m_actionsQueue.try_pop( thisAction ) ) {};
525
526 // This would be the last action
527 m_actionsQueue.push( [this]() -> StatusCode {
528 ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
530 return StatusCode::SUCCESS;
531 } );
532 }
533
534 return StatusCode::SUCCESS;
535}
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.

◆ dumpGraphFile()

StatusCode AvalancheSchedulerSvc::dumpGraphFile ( const std::map< std::string, DataObjIDColl > & inDeps,
const std::map< std::string, DataObjIDColl > & outDeps ) const
private

Definition at line 1178 of file AvalancheSchedulerSvc.cpp.

1179 {
1180 // Both maps should have the same algorithm entries
1181 assert( inDeps.size() == outDeps.size() );
1182
1183 Gaudi::Hive::Graph g{ m_dataDepsGraphFile.value() };
1184 info() << "Dumping data dependencies graph to file: " << g.fileName() << endmsg;
1185
1186 // define algs and objects
1187 std::set<std::size_t> definedObjects;
1188
1189 // Regex for selection of algs and objects
1190 std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1191 std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1192
1193 // inDeps and outDeps should have the same entries
1194 std::size_t algoIndex = 0ul;
1195 for ( const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) ) continue;
1197 std::string algIndex = "Alg_" + std::to_string( algoIndex );
1198 g.addNode( algName, algIndex );
1199
1200 // inputs
1201 for ( const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1203
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1206 if ( inserted ) g.addNode( dep.key(), objIndex );
1207
1208 g.addEdge( dep.key(), objIndex, algName, algIndex );
1209 } // loop on ideps
1210
1211 const auto& odeps = outDeps.at( algName );
1212 for ( const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1214
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex = "obj_" + std::to_string( dep.hash() );
1217 if ( inserted ) g.addNode( dep.key(), objIndex );
1218
1219 g.addEdge( algName, algIndex, dep.key(), objIndex );
1220 } // loop on odeps
1221
1222 ++algoIndex;
1223 } // loop on inDeps
1224
1225 return StatusCode::SUCCESS;
1226}
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< std::string > m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
dict g
Definition gaudirun.py:582

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 880 of file AvalancheSchedulerSvc.cpp.

880 {
881
882 // To have just one big message
883 std::ostringstream outputMS;
884
885 outputMS << "Dumping scheduler state\n"
886 << "=========================================================================================\n"
887 << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 << "=========================================================================================\n\n";
889
890 //===========================================================================
891
892 outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 << "------------------\n\n";
894
895 // Figure if TimelineSvc is available (used below to detect threads IDs)
896 auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
899 } else {
900
901 // Figure optimal printout layout
902 size_t indt( 0 );
903 for ( auto& slot : m_eventSlots ) {
904
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
907 if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
908 }
909 }
910
911 // Figure the last running schedule across all slots
912 for ( auto& slot : m_eventSlots ) {
913
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
916
917 const std::string& algoName{ index2algname( algIndex ) };
918
919 outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
920 << slot.eventContext->slot();
921
922 // Try to get POSIX threads IDs the currently running tasks are scheduled to
923 if ( timelineSvc.isValid() ) {
924 TimelineEvent te{};
925 te.algorithm = algoName;
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
928
929 if ( timelineSvc->getTimelineEvent( te ) )
930 outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
931 else
932 outputMS << " thread.id: [unknown]"; // this means a task has just
933 // been signed off as SCHEDULED,
934 // but has not been assigned to a thread yet
935 // (i.e., not running yet)
936 }
937 outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
938 }
939 }
940 }
941
942 //===========================================================================
943
944 outputMS << "\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
946
947 int slotCount = -1;
948 bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949 subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
950 : false;
951
952 for ( auto& slot : m_eventSlots ) {
953 ++slotCount;
954 if ( slot.complete ) continue;
955
956 outputMS << "[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
958 << ", event: "
959 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
960
961 if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
962 outputMS << " ]:\n\n";
963
964 if ( 0 > iSlot || iSlot == slotCount ) {
965
966 // If an alg has thrown an error then it's not a failure of the CF/DF graph
967 if ( wasAlgError ) {
968 outputMS << "ERROR alg(s):";
969 int errorCount = 0;
970 const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
972 outputMS << " " << index2algname( algIndex );
973 ++errorCount;
974 }
975 if ( errorCount == 0 ) outputMS << " in subslot(s)";
976 outputMS << "\n\n";
977 } else {
978 // Snapshot of the Control Flow and FSM states
979 outputMS << m_precSvc->printState( slot ) << "\n";
980 }
981
982 // Mention sub slots (this is expensive if the number of sub-slots is high)
983 if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
984 outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
986 for ( auto& ss : slot.allSubSlots ) {
987 outputMS << "[ slot: " << slotID << ", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
989 << ", entry: " << ss.entryPoint << ", event: "
990 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
991 << " ]:\n\n";
992 if ( wasAlgError ) {
993 outputMS << "ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
996 outputMS << "\n\n";
997 } else {
998 // Snapshot of the Control Flow and FSM states in sub slot
999 outputMS << m_precSvc->printState( ss ) << "\n";
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 //===========================================================================
1007
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1010 m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1011 }
1012
1013 outputMS << "\n=========================================================================================\n"
1014 << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 << "=========================================================================================\n\n";
1016
1017 info() << outputMS.str() << endmsg;
1018}
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336

◆ dumpState()

void AvalancheSchedulerSvc::dumpState ( )
override

Dump scheduler state for all slots.

Definition at line 619 of file AvalancheSchedulerSvc.cpp.

619{ dumpSchedulerState( -1 ); }
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext * eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 859 of file AvalancheSchedulerSvc.cpp.

859 {
860 const uint slotIdx = eventContext->slot();
861
862 error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
863
864 dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
865
866 // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
867 m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
868
869 // Push into the finished events queue the failed context
870 m_eventSlots[slotIdx].complete = true;
871 m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
872}
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
ContextEvt_t evt() const
ContextID_t slot() const
@ VERBOSE
Definition IMessageSvc.h:22

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 424 of file AvalancheSchedulerSvc.cpp.

424 {
425
426 StatusCode sc( Service::finalize() );
427 if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
428
429 sc = deactivate();
430 if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
431
432 debug() << "Deleting FiberManager" << endmsg;
433 m_fiberManager.reset();
434
435 info() << "Joining Scheduler thread" << endmsg;
436 m_thread.join();
437
438 // Final error check after thread pool termination
439 if ( m_isActive == FAILURE ) {
440 error() << "problems in scheduler thread" << endmsg;
441 return StatusCode::FAILURE;
442 }
443
444 return sc;
445}
StatusCode deactivate()
Deactivate scheduler.
std::unique_ptr< FiberManager > m_fiberManager
std::thread m_thread
The thread in which the activate function runs.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
StatusCode finalize() override
Definition Service.cpp:223
constexpr static const auto FAILURE
Definition StatusCode.h:100

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 615 of file AvalancheSchedulerSvc.cpp.

615{ return std::max( m_freeSlots.load(), 0 ); }

◆ index2algname()

const std::string & AvalancheSchedulerSvc::index2algname ( unsigned int index)
inlineprivate

Convert an integer to a name.

Definition at line 257 of file AvalancheSchedulerSvc.h.

257{ return m_algname_vect[index]; }
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 78 of file AvalancheSchedulerSvc.cpp.

78 {
79
80 // Initialise mother class (read properties, ...)
81 StatusCode sc( Service::initialize() );
82 if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
83
84 // Get hold of the TBBSvc. This should initialize the thread pool
85 m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
86 if ( !m_threadPoolSvc.isValid() ) {
87 fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
89 }
90 auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
91 if ( !castTPS ) {
92 fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
94 }
95 m_arena = castTPS->getArena();
96 if ( !m_arena ) {
97 fatal() << "Cannot find valid TBB task_arena" << endmsg;
99 }
100
101 // Activate the scheduler in another thread.
102 info() << "Activating scheduler in a separate thread" << endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [this, &fiber_manager_initalized]() {
105 // Initialize FiberManager
106 this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
107 fiber_manager_initalized.release();
108 this->activate();
109 } );
110 // Wait for initialization to complete
111 fiber_manager_initalized.acquire();
112
113 while ( m_isActive != ACTIVE ) {
114 if ( m_isActive == FAILURE ) {
115 fatal() << "Terminating initialization" << endmsg;
116 return StatusCode::FAILURE;
117 } else {
118 ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
119 sleep( 1 );
120 }
121 }
122
123 if ( m_enableCondSvc ) {
124 // Get hold of the CondSvc
125 m_condSvc = serviceLocator()->service( "CondSvc" );
126 if ( !m_condSvc.isValid() ) {
127 warning() << "No CondSvc found, or not enabled. "
128 << "Will not manage CondAlgorithms" << endmsg;
129 m_enableCondSvc = false;
130 }
131 }
132
133 // Get the algo resource pool
134 m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
135 if ( !m_algResourcePool.isValid() ) {
136 fatal() << "Error retrieving AlgoResourcePool" << endmsg;
137 return StatusCode::FAILURE;
138 }
139
140 m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
141 if ( !m_algExecStateSvc.isValid() ) {
142 fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
143 return StatusCode::FAILURE;
144 }
145
146 // Get Whiteboard
148 if ( !m_whiteboard.isValid() ) {
149 fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
150 return StatusCode::FAILURE;
151 }
152
153 // Set the MaxEventsInFlight parameters from the number of WB stores
154 m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
155
156 // Set the number of free slots
158
159 // Get the list of algorithms
160 const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() << "Found " << algsNumber << " algorithms" << endmsg;
164 } else {
165 error() << "No algorithms found" << endmsg;
166 return StatusCode::FAILURE;
167 }
168
169 /* Dependencies
170 1) Look for handles in algo, if none
171 2) Assume none are required
172 */
173
174 DataObjIDColl globalInp, globalOutp;
175
176 // figure out all outputs
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
178 for ( IAlgorithm* ialgoPtr : algos ) {
179 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
180 if ( !algoPtr ) {
181 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
182 return StatusCode::FAILURE;
183 }
184
185 DataObjIDColl algoOutputs;
186 for ( auto id : algoPtr->outputDataObjs() ) {
187 globalOutp.insert( id );
188 algoOutputs.insert( id );
189 }
190 algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
191 }
192
193 std::ostringstream ostdd;
194 ostdd << "Data Dependencies for Algorithms:";
195
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
197 for ( IAlgorithm* ialgoPtr : algos ) {
198 Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
199 if ( nullptr == algoPtr ) {
200 fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201 << ": this will result in a crash." << endmsg;
202 return StatusCode::FAILURE;
203 }
204
205 DataObjIDColl i1, i2;
206 DHHVisitor avis( i1, i2 );
207 algoPtr->acceptDHVisitor( &avis );
208
209 ostdd << "\n " << algoPtr->name();
210
211 auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
212 auto owners = avis.owners_names_of( id );
213 if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
214 };
215
216 DataObjIDColl algoDependencies;
217 if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
218 for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
219 DataObjID id = *idp;
220 ostdd << "\n o INPUT " << id;
221 write_owners( id );
222 algoDependencies.insert( id );
223 globalInp.insert( id );
224 }
225 for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
226 ostdd << "\n o OUTPUT " << *id;
227 write_owners( *id );
228 if ( id->key().find( ":" ) != std::string::npos ) {
229 error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
230 << endmsg;
231 m_showDataDeps = true;
232 }
233 }
234 } else {
235 ostdd << "\n none";
236 }
237 algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
238 }
239
240 if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
241
242 // If requested, dump a graph of the data dependencies in a .dot or .md file
243 if ( not m_dataDepsGraphFile.empty() ) {
244 if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
245 return StatusCode::FAILURE;
246 }
247 }
248
249 // Check if we have unmet global input dependencies, and, optionally, heal them
250 // WARNING: this step must be done BEFORE the Precedence Service is initialized
251 DataObjIDColl unmetDepInp, unusedOutp;
252 if ( m_checkDeps || m_checkOutput ) {
253 std::set<std::string> requiredInputKeys;
254 for ( auto o : globalInp ) {
255 // track aliases
256 // (assuming there should be no items with different class and same key corresponding to different objects)
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
259 }
260 if ( m_checkOutput ) {
261 for ( auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
263 // check ignores
264 bool ignored{};
265 for ( const std::string& algoName : m_checkOutputIgnoreList ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
269 ignored = true;
270 break;
271 }
272 }
273 }
274 if ( !ignored ) { unusedOutp.insert( o ); }
275 }
276 }
277 }
278 }
279
280 if ( m_checkDeps ) {
281 if ( unmetDepInp.size() > 0 ) {
282
283 auto printUnmet = [&]( auto msg ) {
284 for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg << " o " << *o << " required by Algorithm: " << endmsg;
286
287 for ( const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
289 }
290 };
291
292 if ( !m_useDataLoader.empty() ) {
293
294 // Find the DataLoader Alg
295 IAlgorithm* dataLoaderAlg( nullptr );
296 for ( IAlgorithm* algo : algos )
297 if ( m_useDataLoader == algo->name() ) {
298 dataLoaderAlg = algo;
299 break;
300 }
301
302 if ( dataLoaderAlg == nullptr ) {
303 fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
304 << "\" found, and unmet INPUT dependencies "
305 << "detected:" << endmsg;
306 printUnmet( fatal() );
307 return StatusCode::FAILURE;
308 }
309
310 info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
311 << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
312 printUnmet( info() );
313
314 // Set the property Load of DataLoader Alg
315 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
316 if ( !dataAlg ) {
317 fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
318 << endmsg;
319 return StatusCode::FAILURE;
320 }
321
322 for ( auto& id : unmetDepInp ) {
323 ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
324 << dataLoaderAlg->name() << endmsg;
326 }
327
328 } else {
329 fatal() << "Auto DataLoading not requested, "
330 << "and the following unmet INPUT dependencies were found:" << endmsg;
331 printUnmet( fatal() );
332 return StatusCode::FAILURE;
333 }
334
335 } else {
336 info() << "No unmet INPUT data dependencies were found" << endmsg;
337 }
338 }
339
340 if ( m_checkOutput ) {
341 if ( unusedOutp.size() > 0 ) {
342
343 auto printUnusedOutp = [&]( auto msg ) {
344 for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg << " o " << *o << " produced by Algorithm: " << endmsg;
346
347 for ( const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
349 }
350 };
351
352 fatal() << "The following unused OUTPUT items were found:" << endmsg;
353 printUnusedOutp( fatal() );
354 return StatusCode::FAILURE;
355 } else {
356 info() << "No unused OUTPUT items were found" << endmsg;
357 }
358 }
359
360 // Get the precedence service
361 m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
362 if ( !m_precSvc.isValid() ) {
363 fatal() << "Error retrieving PrecedenceSvc" << endmsg;
364 return StatusCode::FAILURE;
365 }
366 const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
367 if ( !precSvc ) {
368 fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
369 return StatusCode::FAILURE;
370 }
371
372 // Fill the containers to convert algo names to index
373 m_algname_vect.resize( algsNumber );
374 for ( IAlgorithm* algo : algos ) {
375 const std::string& name = algo->name();
376 auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
378 m_algname_vect.at( index ) = name;
379 }
380
381 // Shortcut for the message service
382 SmartIF<IMessageSvc> messageSvc( serviceLocator() );
383 if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
384
386 for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
387 m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
388 m_eventSlots.back().complete = true;
389 }
390
392
393 // Clearly inform about the level of concurrency
394 info() << "Concurrency level information:" << endmsg;
395 info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
396 info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
397 info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
398
399 // Inform about task scheduling prescriptions
400 info() << "Task scheduling settings:" << endmsg;
401 info() << " o Avalanche generation mode: "
402 << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
403 info() << " o Preemptive scheduling of CPU-blocking tasks: "
405 ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
406 : "disabled" )
407 << endmsg;
408 info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
409
410 if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
411
412 if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
413
414 // Simulate execution flow
415 if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
416
417 return sc;
418}
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition DataObjID.h:121
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Gaudi::Property< std::string > m_useDataLoader
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Gaudi::Property< bool > m_showDataFlow
Gaudi::Property< bool > m_checkDeps
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
Gaudi::Property< bool > m_enableCondSvc
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< bool > m_showDataDeps
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
const DataObjIDColl & outputDataObjs() const override
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & inputDataObjs() const override
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode initialize() override
Definition Service.cpp:118
unsigned int getAlgoIndex() const
Get algorithm index.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot & slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 841 of file AvalancheSchedulerSvc.cpp.

841 {
842
843 if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
845
846 error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
847
848 return true;
849 }
850 return false;
851}
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition EventSlot.h:82
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition EventSlot.h:84

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 666 of file AvalancheSchedulerSvc.cpp.

666 {
667
668 StatusCode global_sc( StatusCode::SUCCESS );
669
670 // Retry algorithms
671 const size_t retries = m_retryQueue.size();
672 for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
673 TaskSpec retryTS = std::move( m_retryQueue.front() );
674 m_retryQueue.pop();
675 global_sc = schedule( std::move( retryTS ) );
676 }
677
678 // Loop over all slots
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
681 for ( EventSlot& thisSlot : m_eventSlots ) {
682
683 // Ignore slots without a valid context (relevant when populating scheduler for first time)
684 if ( !thisSlot.eventContext ) continue;
685
686 int iSlot = thisSlot.eventContext->slot();
687
688 // Cache the states of the algorithms to improve readability and performance
689 AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
690
691 StatusCode partial_sc = StatusCode::FAILURE;
692
693 // Make an occupancy snapshot
694 if ( m_snapshotInterval != std::chrono::duration<int64_t, std::milli>::min() &&
696
697 // Initialise snapshot
698 if ( nextSnap.states.empty() ) {
699 nextSnap.time = now;
700 nextSnap.states.resize( m_eventSlots.size() );
701 }
702
703 // Store alg states
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
706 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707 slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
708 }
709
710 // Add subslot alg states
711 for ( auto& subslot : thisSlot.allSubSlots ) {
712 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713 slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
714 }
715 }
716 }
717
718 // Perform DR->SCHEDULED
719 const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
721 const std::string& algName{ index2algname( algIndex ) };
722 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
723 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
724
725 partial_sc =
726 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
727
728 ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
729 << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
730 << " on processing slot " << iSlot << endmsg;
731 }
732
733 // Check for algorithms ready in sub-slots
734 for ( auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
737 const std::string& algName{ index2algname( algIndex ) };
738 unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
739 bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
740 partial_sc =
741 schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
742 }
743 }
744
746 std::stringstream s;
747 s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
748 << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
749 << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
750 auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
753 myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
754 myfile << s.str();
755 myfile.close();
756 }
757
758 // Not complete because this would mean that the slot is already free!
759 if ( m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
765
766 thisSlot.complete = true;
767 // if the event did not fail, add it to the finished events
768 // otherwise it is taken care of in the error handling
769 if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
770 ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
771 << thisSlot.eventContext->slot() << ")." << endmsg;
772 m_finishedEvents.push( thisSlot.eventContext.release() );
773 }
774
775 // now let's return the fully evaluated result of the control flow
776 ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
777
778 thisSlot.eventContext.reset( nullptr );
779
780 } else if ( isStalled( thisSlot ) ) {
781 m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
782 eventFailed( thisSlot.eventContext.get() ); // can't release yet
783 }
784 partial_sc.ignore();
785 } // end loop on slots
786
787 // Process snapshot
788 if ( !nextSnap.states.empty() ) {
789 m_lastSnapshot = nextSnap.time;
790 m_snapshotCallback( std::move( nextSnap ) );
791 }
792
793 ON_VERBOSE verbose() << "Iteration done." << endmsg;
794 m_needsUpdate.store( false );
795 return global_sc;
796}
const boost::container::flat_set< int > algsInState(State state) const
size_t sizeOfSubset(State state) const
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
StatusCode schedule(TaskSpec &&)
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
bool isFailure() const
Definition StatusCode.h:129
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition StatusCode.h:139
Struct to hold entries in the alg queues.

◆ next()

bool AvalancheSchedulerSvc::next ( TaskSpec & ts,
bool asynchronous )
inline

Definition at line 376 of file AvalancheSchedulerSvc.h.

376 {
377 if ( asynchronous ) { return m_scheduledAsynchronousQueue.try_pop( ts ); }
378 return m_scheduledQueue.try_pop( ts );
379 }
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *& eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 625 of file AvalancheSchedulerSvc.cpp.

625 {
626
627 // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
628 if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
629 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
630 // << " active: " << m_isActive << endmsg;
631 return StatusCode::FAILURE;
632 } else {
633 // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
634 // << " active: " << m_isActive << endmsg;
635 m_finishedEvents.pop( eventContext );
636 ++m_freeSlots;
637 ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
638 return StatusCode::SUCCESS;
639 }
640}

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext * eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 546 of file AvalancheSchedulerSvc.cpp.

546 {
547
548 if ( !eventContext ) {
549 fatal() << "Event context is nullptr" << endmsg;
550 return StatusCode::FAILURE;
551 }
552
553 if ( m_freeSlots.load() == 0 ) {
554 ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
555 return StatusCode::FAILURE;
556 }
557
558 // no problem as push new event is only called from one thread (event loop manager)
559 --m_freeSlots;
560
561 auto action = [this, eventContext]() -> StatusCode {
562 // Event processing slot forced to be the same as the wb slot
563 const unsigned int thisSlotNum = eventContext->slot();
564 EventSlot& thisSlot = m_eventSlots[thisSlotNum];
565 if ( !thisSlot.complete ) {
566 fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
567 return StatusCode::FAILURE;
568 }
569
570 ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
571 thisSlot.reset( eventContext );
572
573 // Result status code:
574 StatusCode result = StatusCode::SUCCESS;
575
576 // promote to CR and DR the initial set of algorithms
577 Cause cs = { Cause::source::Root, "RootDecisionHub" };
578 if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
580 result = StatusCode::FAILURE;
581 }
582
583 if ( this->iterate().isFailure() ) {
584 error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
585 result = StatusCode::FAILURE;
586 }
587
588 return result;
589 }; // end of lambda
590
591 // Kick off scheduling
592 ON_VERBOSE {
593 verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
594 verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
595 }
596
597 m_actionsQueue.push( action );
598
599 return StatusCode::SUCCESS;
600}
bool complete
Flags completion of the event.
Definition EventSlot.h:88
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition EventSlot.h:48

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > & eventContexts)
override

Definition at line 604 of file AvalancheSchedulerSvc.cpp.

604 {
605 StatusCode sc;
606 for ( auto context : eventContexts ) {
607 sc = pushNewEvent( context );
608 if ( sc != StatusCode::SUCCESS ) return sc;
609 }
610 return sc;
611}
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.

◆ recordOccupancy()

void AvalancheSchedulerSvc::recordOccupancy ( int samplePeriod,
std::function< void(OccupancySnapshot)> callback )
overridevirtual

Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.

Definition at line 1163 of file AvalancheSchedulerSvc.cpp.

1163 {
1164
1165 auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1166 if ( samplePeriod < 0 ) {
1167 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
1168 } else {
1169 this->m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1170 m_snapshotCallback = std::move( callback );
1171 }
1172 return StatusCode::SUCCESS;
1173 };
1174
1175 m_actionsQueue.push( std::move( action ) );
1176}

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int iAlgo,
EventContext * contextPtr,
AState state,
bool iterate = false )
private

Definition at line 800 of file AvalancheSchedulerSvc.cpp.

800 {
801 StatusCode sc;
802 auto slotIndex = contextPtr->slot();
803 EventSlot& slot = m_eventSlots[slotIndex];
804 Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
805
806 if ( contextPtr->usesSubSlot() ) {
807 // Sub-slot
808 auto subSlotIndex = contextPtr->subSlot();
809 EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
810
811 sc = subSlot.algsStates.set( iAlgo, state );
812
813 if ( sc.isSuccess() ) {
814 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
815 << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
816 // Revise states of algorithms downstream the precedence graph
817 if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
818 }
819 } else {
820 // Event level (standard behaviour)
821 sc = slot.algsStates.set( iAlgo, state );
822
823 if ( sc.isSuccess() ) {
824 ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
825 << ", event:" << contextPtr->evt() << "]" << endmsg;
826 // Revise states of algorithms downstream the precedence graph
827 if ( iterate ) sc = m_precSvc->iterate( slot, cs );
828 }
829 }
830 return sc;
831}
StatusCode set(unsigned int iAlgo, State newState)
ContextID_t subSlot() const
bool usesSubSlot() const
bool isSuccess() const
Definition StatusCode.h:314
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition EventSlot.h:99

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec && ts)
private

Definition at line 1022 of file AvalancheSchedulerSvc.cpp.

1022 {
1023
1024 // Check if a free Algorithm instance is available
1025 StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1026
1027 // If an instance is available, proceed to scheduling
1028 StatusCode sc;
1029 if ( getAlgSC.isSuccess() ) {
1030
1031 // Decide how to schedule the task and schedule it
1032 if ( -100 != m_threadPoolSize ) {
1033
1034 // Cache values before moving the TaskSpec further
1035 unsigned int algIndex{ ts.algIndex };
1036 std::string_view algName( ts.algName );
1037 unsigned int algRank{ ts.algRank };
1038 bool asynchronous{ ts.asynchronous };
1039 int slotIndex{ ts.slotIndex };
1040 EventContext* contextPtr{ ts.contextPtr };
1041
1042 if ( asynchronous ) {
1043 // Add to asynchronous scheduled queue
1044 m_scheduledAsynchronousQueue.push( std::move( ts ) );
1045
1046 // Schedule task
1047 m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1048 }
1049
1050 if ( !asynchronous ) {
1051 // Add the algorithm to the scheduled queue
1052 m_scheduledQueue.push( std::move( ts ) );
1053
1054 // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1055 m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1057 }
1058 sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1059
1060 ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1061 << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1062 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1064 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1065 : "" )
1066 << endmsg;
1067
1068 } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1069 // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1071 sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1072 AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1074 }
1075 } else { // if no Algorithm instance available, retry later
1076
1077 sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1078 // Add the algorithm to the retry queue
1079 m_retryQueue.push( std::move( ts ) );
1080 }
1081
1083
1084 return sc;
1085}
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext * sourceContext,
const std::string & nodeName,
std::unique_ptr< EventContext > viewContext )
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1123 of file AvalancheSchedulerSvc.cpp.

1124 {
1125 // Prevent view nesting
1126 if ( sourceContext->usesSubSlot() ) {
1127 fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1128 return StatusCode::FAILURE;
1129 }
1130
1131 ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1132
1133 // It's not possible to create an std::functional from a move-capturing lambda
1134 // So, we have to release the unique pointer
1135 auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1136 &nodeName]() -> StatusCode {
1137 // Attach the sub-slot to the top-level slot
1138 EventSlot& topSlot = this->m_eventSlots[slotIndex];
1139
1140 if ( viewContextPtr ) {
1141 // Re-create the unique pointer
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.addSubSlot( std::move( viewContext ), nodeName );
1144 return StatusCode::SUCCESS;
1145 } else {
1146 // Disable the view node if there are no views
1147 topSlot.disableSubSlots( nodeName );
1148 return StatusCode::SUCCESS;
1149 }
1150 };
1151
1152 m_actionsQueue.push( std::move( action ) );
1153
1154 return StatusCode::SUCCESS;
1155}
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition EventSlot.h:60
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition EventSlot.h:77

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec & ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 1092 of file AvalancheSchedulerSvc.cpp.

1092 {
1093
1094 Gaudi::Hive::setCurrentContext( ts.contextPtr );
1095
1097
1098 const AlgExecStateRef algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1099 AState state = algstate.execStatus().isSuccess()
1100 ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1101 : AState::ERROR;
1102
1103 // Update algorithm state and revise the downstream states
1104 auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1105
1106 ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1107 << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1108 << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1110 ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1111 : "" )
1112 << endmsg;
1113
1114 // Prompt a call to updateStates
1115 m_needsUpdate.store( true );
1116 return sc;
1117}
const StatusCode & execStatus() const
bool filterPassed() const
GAUDI_API void setCurrentContext(const EventContext *ctx)

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *& eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 646 of file AvalancheSchedulerSvc.cpp.

646 {
647
648 if ( m_finishedEvents.try_pop( eventContext ) ) {
649 ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
650 << endmsg;
651 ++m_freeSlots;
652 return StatusCode::SUCCESS;
653 }
654 return StatusCode::FAILURE;
655}

Friends And Related Symbol Documentation

◆ AlgTask

friend class AlgTask
friend

Definition at line 115 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 318 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 278 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 254 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 260 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 284 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 313 of file AvalancheSchedulerSvc.h.

◆ m_arena

tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr }
private

Definition at line 368 of file AvalancheSchedulerSvc.h.

368{ nullptr };

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 287 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps
private
Initial value:
{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }

Definition at line 195 of file AvalancheSchedulerSvc.h.

195 { this, "CheckDependencies", false,
196 "Runtime check of Algorithm Input Data Dependencies" };

◆ m_checkOutput

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkOutput
private
Initial value:
{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }

Definition at line 197 of file AvalancheSchedulerSvc.h.

197 { this, "CheckOutputUsage", false,
198 "Runtime check of Algorithm Output Data usage" };

◆ m_checkOutputIgnoreList

Gaudi::Property<std::vector<std::string> > AvalancheSchedulerSvc::m_checkOutputIgnoreList
private
Initial value:
{
this,
"CheckOutputUsageIgnoreList",
{},
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }

Definition at line 199 of file AvalancheSchedulerSvc.h.

199 {
200 this,
201 "CheckOutputUsageIgnoreList",
202 {},
203 "Ignore outputs of the Algorithms of this name when doing the check",
204 "OrderedSet<std::string>" };

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 281 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphAlgoPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
private
Initial value:
{
this, "DataDepsGraphAlgPattern", ".*",
"Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
"deps graph" }

Definition at line 227 of file AvalancheSchedulerSvc.h.

227 {
228 this, "DataDepsGraphAlgPattern", ".*",
229 "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
230 "deps graph" };

◆ m_dataDepsGraphFile

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphFile
private
Initial value:
{
this, "DataDepsGraphFile", "",
"Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
"Algorithms" }

Definition at line 222 of file AvalancheSchedulerSvc.h.

222 {
223 this, "DataDepsGraphFile", "",
224 "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
225 "Algorithms" };

◆ m_dataDepsGraphObjectPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
private
Initial value:
{
this, "DataDepsGraphObjectPattern", ".*",
"Regex pattern for selecting desired input or output by their full key" }

Definition at line 232 of file AvalancheSchedulerSvc.h.

232 {
233 this, "DataDepsGraphObjectPattern", ".*",
234 "Regex pattern for selecting desired input or output by their full key" };

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }

Definition at line 186 of file AvalancheSchedulerSvc.h.

186 { this, "DumpIntraEventDynamics", false,
187 "Dump intra-event concurrency dynamics to csv file" };

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
private

Definition at line 209 of file AvalancheSchedulerSvc.h.

209{ this, "EnableConditions", false, "Enable ConditionsSvc" };

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }

Definition at line 188 of file AvalancheSchedulerSvc.h.

188 {
189 this, "PreemptiveBlockingTasks", false,
190 "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 269 of file AvalancheSchedulerSvc.h.

◆ m_fiberManager

std::unique_ptr<FiberManager> AvalancheSchedulerSvc::m_fiberManager { nullptr }
private

Definition at line 369 of file AvalancheSchedulerSvc.h.

369{ nullptr };

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 275 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 }
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 272 of file AvalancheSchedulerSvc.h.

272{ 0 };

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive { INACTIVE }
private

Flag to track if the scheduler is active or not.

Definition at line 245 of file AvalancheSchedulerSvc.h.

245{ INACTIVE };

◆ m_lastSnapshot

std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now()
private

Definition at line 166 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 }
private

Definition at line 372 of file AvalancheSchedulerSvc.h.

372{ 1 };

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }

Definition at line 179 of file AvalancheSchedulerSvc.h.

179 {
180 this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" };

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 }
private

Definition at line 371 of file AvalancheSchedulerSvc.h.

371{ 0 };

◆ m_maxParallelismExtra

Gaudi::Property<int> AvalancheSchedulerSvc::m_maxParallelismExtra
private
Initial value:
{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }

Definition at line 174 of file AvalancheSchedulerSvc.h.

174 {
175 this, "maxParallelismExtra", 0,
176 "Allows to add some extra threads to the maximum parallelism set in TBB"
177 "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true }
private

Definition at line 362 of file AvalancheSchedulerSvc.h.

362{ true };

◆ m_numOffloadThreads

Gaudi::Property<int> AvalancheSchedulerSvc::m_numOffloadThreads
private
Initial value:
{
this, "NumOffloadThreads", 2,
"Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
"and use Boost Fiber functionality to suspend while waiting for offloaded work." }

Definition at line 191 of file AvalancheSchedulerSvc.h.

191 {
192 this, "NumOffloadThreads", 2,
193 "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
194 "and use Boost Fiber functionality to suspend while waiting for offloaded work." };

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }

Definition at line 184 of file AvalancheSchedulerSvc.h.

184 { this, "Optimizer", "",
185 "The following modes are currently available: PCE, COD, DRE, E" };

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 263 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 359 of file AvalancheSchedulerSvc.h.

◆ m_scheduledAsynchronousQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
private

Definition at line 358 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 357 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }

Definition at line 217 of file AvalancheSchedulerSvc.h.

217 { this, "ShowControlFlow", false,
218 "Show the configuration of all Algorithms and Sequences" };

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }

Definition at line 211 of file AvalancheSchedulerSvc.h.

211 { this, "ShowDataDependencies", true,
212 "Show the INPUT and OUTPUT data dependencies of Algorithms" };

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }

Definition at line 214 of file AvalancheSchedulerSvc.h.

214 { this, "ShowDataFlow", false,
215 "Show the configuration of DataFlow between Algorithms" };

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }

Definition at line 181 of file AvalancheSchedulerSvc.h.

181 {
182 this, "SimulateExecution", false,
183 "Flag to perform single-pass simulation of execution flow before the actual execution" };

◆ m_snapshotCallback

std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback
private

Definition at line 167 of file AvalancheSchedulerSvc.h.

◆ m_snapshotInterval

std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
private

Definition at line 165 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 248 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }

Definition at line 169 of file AvalancheSchedulerSvc.h.

169 {
170 this, "ThreadPoolSize", -1,
171 "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
172 "all available hardware threads; -100 requests to bypass TBB executing "
173 "all algorithms in the scheduler's thread." };

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 367 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }

Definition at line 206 of file AvalancheSchedulerSvc.h.

206 { this, "DataLoaderAlg", "",
207 "Attribute unmet input dependencies to this DataLoader Algorithm" };

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
private

Definition at line 220 of file AvalancheSchedulerSvc.h.

220{ this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" };

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 266 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
private

Definition at line 178 of file AvalancheSchedulerSvc.h.

178{ this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" };

The documentation for this class was generated from the following files: