The Gaudi Framework  master (37c0b60a)
AvalancheSchedulerSvc Class Reference

#include </builds/gaudi/Gaudi/GaudiHive/src/AvalancheSchedulerSvc.h>

Inheritance diagram for AvalancheSchedulerSvc:
Collaboration diagram for AvalancheSchedulerSvc:

Classes

struct  AlgQueueSort
 Comparison operator to sort the queues. More...
 
struct  TaskSpec
 Struct to hold entries in the alg queues. More...
 

Public Member Functions

StatusCode initialize () override
 Initialise. More...
 
StatusCode finalize () override
 Finalise. More...
 
StatusCode pushNewEvent (EventContext *eventContext) override
 Make an event available to the scheduler. More...
 
StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts) override
 
StatusCode popFinishedEvent (EventContext *&eventContext) override
 Blocks until an event is available. More...
 
StatusCode tryPopFinishedEvent (EventContext *&eventContext) override
 Try to fetch an event from the scheduler. More...
 
unsigned int freeSlots () override
 Get free slots number. More...
 
void dumpState () override
 Dump scheduler state for all slots. More...
 
virtual StatusCode scheduleEventView (const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
 Method to inform the scheduler about event views. More...
 
virtual void recordOccupancy (int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
 Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result. More...
 
bool next (TaskSpec &ts, bool asynchronous)
 
- Public Member Functions inherited from extends< Service, IScheduler >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::stringgetInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
- Public Member Functions inherited from Service
const std::stringname () const override
 Retrieve name of the service
More...
 
StatusCode configure () override
 
StatusCode initialize () override
 
StatusCode start () override
 
StatusCode stop () override
 
StatusCode finalize () override
 
StatusCode terminate () override
 
Gaudi::StateMachine::State FSMState () const override
 
Gaudi::StateMachine::State targetFSMState () const override
 
StatusCode reinitialize () override
 
StatusCode restart () override
 
StatusCode sysInitialize () override
 Initialize Service
More...
 
StatusCode sysStart () override
 Initialize Service
More...
 
StatusCode sysStop () override
 Initialize Service
More...
 
StatusCode sysFinalize () override
 Finalize Service
More...
 
StatusCode sysReinitialize () override
 Re-initialize the Service. More...
 
StatusCode sysRestart () override
 Re-initialize the Service. More...
 
 Service (std::string name, ISvcLocator *svcloc)
 Standard Constructor
More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator
More...
 
template<class T >
StatusCode service (const std::string &name, const T *&psvc, bool createIf=true) const
 Access a service by name, creating it if it doesn't already exist. More...
 
template<class T >
StatusCode service (const std::string &name, T *&psvc, bool createIf=true) const
 
template<typename IFace = IService>
SmartIF< IFace > service (const std::string &name, bool createIf=true) const
 
template<class T >
StatusCode service (const std::string &svcType, const std::string &svcName, T *&psvc) const
 Access a service by name and type, creating it if it doesn't already exist. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandle< T > &hndl, const std::string &doc="none")
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, bool createIf=true)
 
template<class T >
StatusCode declareTool (ToolHandle< T > &handle, const std::string &toolTypeAndName, bool createIf=true)
 Declare used tool. More...
 
template<class T >
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, ToolHandleArray< T > &hndlArr, const std::string &doc="none")
 
template<class T >
void addToolsArray (ToolHandleArray< T > &hndlArr)
 
const std::vector< IAlgTool * > & tools () const
 
SmartIF< IAuditorSvc > & auditorSvc () const
 The standard auditor service.May not be invoked before sysInitialize() has been invoked. More...
 
- Public Member Functions inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 PropertyHolder ()=default
 
Gaudi::Details::PropertyBasedeclareProperty (Gaudi::Details::PropertyBase &prop)
 Declare a property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, TYPE &value, const std::string &doc="none")
 Helper to wrap a regular data member and use it as a regular property. More...
 
Gaudi::Details::PropertyBasedeclareProperty (const std::string &name, Gaudi::Property< TYPE, VERIFIER, HANDLERS > &prop, const std::string &doc="none")
 Declare a PropertyBase instance setting name and documentation. More...
 
Gaudi::Details::PropertyBasedeclareRemoteProperty (const std::string &name, IProperty *rsvc, const std::string &rname="")
 Declare a remote property. More...
 
StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p) override
 set the property from another property with a different name More...
 
StatusCode setProperty (const std::string &s) override
 set the property from the formatted string More...
 
StatusCode setProperty (const Gaudi::Details::PropertyBase &p)
 Set the property from a property. More...
 
virtual StatusCode setProperty (const std::string &name, const Gaudi::Details::PropertyBase &p)=0
 Set the property from a property with a different name. More...
 
virtual StatusCode setProperty (const std::string &s)=0
 Set the property by string. More...
 
StatusCode setProperty (const std::string &name, const char *v)
 Special case for string literals. More...
 
StatusCode setProperty (const std::string &name, const std::string &v)
 Special case for std::string. More...
 
StatusCode setProperty (const std::string &name, const TYPE &value)
 set the property form the value More...
 
StatusCode setPropertyRepr (const std::string &n, const std::string &r) override
 set the property from name and value string representation More...
 
StatusCode getProperty (Gaudi::Details::PropertyBase *p) const override
 get the property More...
 
const Gaudi::Details::PropertyBasegetProperty (std::string_view name) const override
 get the property by name More...
 
StatusCode getProperty (std::string_view n, std::string &v) const override
 convert the property to the string More...
 
const std::vector< Gaudi::Details::PropertyBase * > & getProperties () const override
 get all properties More...
 
bool hasProperty (std::string_view name) const override
 Return true if we have a property with the given name. More...
 
Gaudi::Details::PropertyBaseproperty (std::string_view name) const
 \fixme property and bindPropertiesTo should be protected More...
 
void bindPropertiesTo (Gaudi::Interfaces::IOptionsSvc &optsSvc)
 
 PropertyHolder (const PropertyHolder &)=delete
 
PropertyHolderoperator= (const PropertyHolder &)=delete
 
- Public Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level msgLevel () const
 get the cached level (originally extracted from the embedded MsgStream) More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Types

enum  ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 }
 
using AState = AlgsExecutionStates::State
 
using action = std::function< StatusCode()>
 

Private Member Functions

StatusCode dumpGraphFile (const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
 
void activate ()
 Activate scheduler. More...
 
StatusCode deactivate ()
 Deactivate scheduler. More...
 
unsigned int algname2index (const std::string &algoname)
 Convert a name to an integer. More...
 
const std::stringindex2algname (unsigned int index)
 Convert an integer to a name. More...
 
StatusCode iterate ()
 Loop on all slots to schedule DATAREADY algorithms and sign off ready events. More...
 
StatusCode revise (unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
StatusCode schedule (TaskSpec &&)
 
StatusCode signoff (const TaskSpec &)
 The call to this method is triggered only from within the AlgTask. More...
 
bool isStalled (const EventSlot &) const
 Check if scheduling in a particular slot is in a stall. More...
 
void eventFailed (EventContext *eventContext)
 Method to execute if an event failed. More...
 
void dumpSchedulerState (int iSlot)
 Dump the state of the scheduler. More...
 

Private Attributes

std::chrono::duration< int64_t, std::millim_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
 
std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now()
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
Gaudi::Property< int > m_threadPoolSize
 
Gaudi::Property< int > m_maxParallelismExtra
 
Gaudi::Property< std::stringm_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
Gaudi::Property< bool > m_simulateExecution
 
Gaudi::Property< std::stringm_optimizationMode
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
Gaudi::Property< int > m_numOffloadThreads
 
Gaudi::Property< bool > m_checkDeps
 
Gaudi::Property< bool > m_checkOutput
 
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
 
Gaudi::Property< std::stringm_useDataLoader
 
Gaudi::Property< bool > m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
 
Gaudi::Property< bool > m_showDataDeps
 
Gaudi::Property< bool > m_showDataFlow
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
 
Gaudi::Property< std::stringm_dataDepsGraphFile
 
Gaudi::Property< std::stringm_dataDepsGraphAlgoPattern
 
Gaudi::Property< std::stringm_dataDepsGraphObjectPattern
 
std::atomic< ActivationStatem_isActive { INACTIVE }
 Flag to track if the scheduler is active or not. More...
 
std::thread m_thread
 The thread in which the activate function runs. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::stringm_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
SmartIF< IPrecedenceSvcm_precSvc
 A shortcut to the Precedence Service. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::vector< EventSlotm_eventSlots
 Vector of events slots. More...
 
std::atomic_int m_freeSlots { 0 }
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
SmartIF< IAlgExecStateSvcm_algExecStateSvc
 Algorithm execution state manager. More...
 
SmartIF< ICondSvcm_condSvc
 A shortcut to service for Conditions handling. More...
 
unsigned int m_algosInFlight = 0
 Number of algorithms presently in flight. More...
 
unsigned int m_blockingAlgosInFlight = 0
 Number of algorithms presently in flight. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
tbb::concurrent_bounded_queue< actionm_actionsQueue
 Queue where closures are stored and picked for execution. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledQueue
 Queues for scheduled algorithms. More...
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSortm_scheduledAsynchronousQueue
 
std::queue< TaskSpecm_retryQueue
 
std::atomic< bool > m_needsUpdate { true }
 
SmartIF< IThreadPoolSvcm_threadPoolSvc
 
tbb::task_arena * m_arena { nullptr }
 
std::unique_ptr< FiberManagerm_fiberManager { nullptr }
 
size_t m_maxEventsInFlight { 0 }
 
size_t m_maxAlgosInFlight { 1 }
 

Friends

class AlgTask
 

Additional Inherited Members

- Public Types inherited from extends< Service, IScheduler >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces... >
 Typedef to the base of this class. More...
 
- Public Types inherited from Service
using Factory = Gaudi::PluginService::Factory< IService *(const std::string &, ISvcLocator *)>
 
- Public Types inherited from PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
using PropertyHolderImpl = PropertyHolder< CommonMessaging< implements< IService, IProperty, IStateful > > >
 Typedef used to refer to this class from derived classes, as in. More...
 
- Public Types inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
using base_class = CommonMessaging
 
- Public Types inherited from extend_interfaces< Interfaces... >
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids... >::type
 take union of the ext_iids of all Interfaces... More...
 
- Protected Member Functions inherited from Service
std::vector< IAlgTool * > & tools ()
 
 ~Service () override
 Standard Destructor
More...
 
int outputLevel () const
 get the Service's output level More...
 
- Protected Member Functions inherited from CommonMessaging< implements< IService, IProperty, IStateful > >
MSG::Level setUpMessaging () const
 Set up local caches. More...
 
MSG::Level resetMessaging ()
 Reinitialize internal states. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from Service
Gaudi::StateMachine::State m_state = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::StateMachine::State m_targetState = Gaudi::StateMachine::OFFLINE
 Service state
More...
 
Gaudi::Property< int > m_outputLevel { this, "OutputLevel", MSG::NIL, "output level" }
 flag indicating whether ToolHandle tools have been added to m_tools More...
 
Gaudi::Property< bool > m_auditInit { this, "AuditServices", false, "[[deprecated]] unused" }
 
Gaudi::Property< bool > m_auditorInitialize { this, "AuditInitialize", false, "trigger auditor on initialize()" }
 
Gaudi::Property< bool > m_auditorStart { this, "AuditStart", false, "trigger auditor on start()" }
 
Gaudi::Property< bool > m_auditorStop { this, "AuditStop", false, "trigger auditor on stop()" }
 
Gaudi::Property< bool > m_auditorFinalize { this, "AuditFinalize", false, "trigger auditor on finalize()" }
 
Gaudi::Property< bool > m_auditorReinitialize { this, "AuditReinitialize", false, "trigger auditor on reinitialize()" }
 
Gaudi::Property< bool > m_auditorRestart { this, "AuditRestart", false, "trigger auditor on restart()" }
 
Gaudi::Property< bool > m_autoRetrieveTools
 
Gaudi::Property< bool > m_checkToolDeps
 
SmartIF< IAuditorSvcm_pAuditorSvc
 Auditor Service
More...
 

Detailed Description

Introduction

The scheduler is named after its ability to generically maximize the average intra-event task occupancy by inducing avalanche-like concurrency disclosure waves in conditions of arbitrary intra-event task precedence constraints (see section 3.2 of http://cern.ch/go/7Jn7).

Task precedence management

The scheduler is driven by graph-based task precedence management. When compared to approach used in the ForwardSchedulerSvc, the following advantages can be emphasized:

(1) Faster decision making (thus lower concurrency disclosure downtime); (2) Capacity for proactive task scheduling decision making.

Point (2) allowed to implement a number of generic, non-intrusive intra-event throughput maximization scheduling strategies.

Scheduling principles

o Task scheduling prerequisites

A task is scheduled ASA all following conditions are met:

  • if a control flow (CF) graph traversal reaches the task;
  • when all data flow (DF) dependencies of the task are satisfied;
  • when the DF-ready task pool parsing mechanism (*) considers it, and:
    • a free (or re-entrant) algorithm instance to run within the task is available;
    • there is a free computational resource to run the task.

o (*) Avalanche induction strategies

The scheduler is able to maximize the intra-event throughput by applying several search strategies within the pool, prioritizing tasks according to the following types of precedence rules graph asymmetries:

(A) Local task-to-data asymmetry; (B) Local task-to-task asymmetry; (C) Global task-to-task asymmetry.

o Other mechanisms of throughput maximization

The scheduler is able to maximize the overall throughput of data processing by preemptive scheduling CPU-blocking tasks. The mechanism can be applied to the following types of tasks:

  • I/O-bound tasks;
  • tasks with computation offloading (accelerators, GPGPUs, clouds, quantum computing devices..joke);
  • synchronization-bound tasks.

Credits

Historically, the AvalancheSchedulerSvc branched off the ForwardSchedulerSvc and in many ways built its success on ideas and code of the latter.

Author
Illya Shapoval
Version
1.0

Definition at line 114 of file AvalancheSchedulerSvc.h.

Member Typedef Documentation

◆ action

Definition at line 161 of file AvalancheSchedulerSvc.h.

◆ AState

Member Enumeration Documentation

◆ ActivationState

Enumerator
INACTIVE 
ACTIVE 
FAILURE 

Definition at line 163 of file AvalancheSchedulerSvc.h.

163 { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };

Member Function Documentation

◆ activate()

void AvalancheSchedulerSvc::activate ( )
private

Activate scheduler.

Activate the scheduler.

From this moment on the queue of actions is checked. The checking will stop when the m_isActive flag is false and the queue is not empty. This will guarantee that all actions are executed and a stall is not created. The TBB pool must be initialised in the thread from where the tasks are launched (http://threadingbuildingblocks.org/docs/doxygen/a00342.html) The scheduler is initialised here since this method runs in a separate thread and spawns the tasks (through the execution of the lambdas)

Definition at line 457 of file AvalancheSchedulerSvc.cpp.

457  {
458 
459  ON_DEBUG debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
460 
461  if ( m_threadPoolSvc->initPool( m_threadPoolSize, m_maxParallelismExtra ).isFailure() ) {
462  error() << "problems initializing ThreadPoolSvc" << endmsg;
464  return;
465  }
466 
467  // Wait for actions pushed into the queue by finishing tasks.
468  action thisAction;
470 
471  m_isActive = ACTIVE;
472 
473  // Continue to wait if the scheduler is running or there is something to do
474  ON_DEBUG debug() << "Start checking the actionsQueue" << endmsg;
475  while ( m_isActive == ACTIVE || m_actionsQueue.size() != 0 ) {
476  m_actionsQueue.pop( thisAction );
477  sc = thisAction();
478  ON_VERBOSE {
479  if ( sc.isFailure() )
480  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
481  else
482  verbose() << "Action succeeded." << endmsg;
483  }
484  else sc.ignore();
485 
486  // If all queued actions have been processed, update the slot states
487  if ( m_needsUpdate.load() && m_actionsQueue.empty() ) {
488  sc = iterate();
489  ON_VERBOSE {
490  if ( sc.isFailure() )
491  verbose() << "Iteration did not succeed (which is not bad per se)." << endmsg;
492  else
493  verbose() << "Iteration succeeded." << endmsg;
494  }
495  else sc.ignore();
496  }
497  }
498 
499  ON_DEBUG debug() << "Terminating thread-pool resources" << endmsg;
500  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
501  error() << "Problems terminating thread pool" << endmsg;
503  }
504 }

◆ algname2index()

unsigned int AvalancheSchedulerSvc::algname2index ( const std::string algoname)
inlineprivate

Convert a name to an integer.

Definition at line 252 of file AvalancheSchedulerSvc.h.

252 { return m_algname_index_map[algoname]; }

◆ deactivate()

StatusCode AvalancheSchedulerSvc::deactivate ( )
private

Deactivate scheduler.

Deactivates the scheduler.

Two actions are pushed into the queue: 1) Drain the scheduler until all events are finished. 2) Flip the status flag m_isActive to false This second action is the last one to be executed by the scheduler.

Definition at line 514 of file AvalancheSchedulerSvc.cpp.

514  {
515 
516  if ( m_isActive == ACTIVE ) {
517 
518  // Set the number of slots available to an error code
519  m_freeSlots.store( 0 );
520 
521  // Empty queue
522  action thisAction;
523  while ( m_actionsQueue.try_pop( thisAction ) ) {};
524 
525  // This would be the last action
526  m_actionsQueue.push( [this]() -> StatusCode {
527  ON_VERBOSE verbose() << "Deactivating scheduler" << endmsg;
529  return StatusCode::SUCCESS;
530  } );
531  }
532 
533  return StatusCode::SUCCESS;
534 }

◆ dumpGraphFile()

StatusCode AvalancheSchedulerSvc::dumpGraphFile ( const std::map< std::string, DataObjIDColl > &  inDeps,
const std::map< std::string, DataObjIDColl > &  outDeps 
) const
private

Definition at line 1177 of file AvalancheSchedulerSvc.cpp.

1178  {
1179  // Both maps should have the same algorithm entries
1180  assert( inDeps.size() == outDeps.size() );
1181 
1182  // Check file extension
1183  enum class FileType : short { UNKNOWN, DOT, MD };
1184  std::regex fileExtensionRegexDot( ".dot$" );
1185  std::regex fileExtensionRegexMd( ".md$" );
1186 
1187  std::string fileName = m_dataDepsGraphFile.value();
1188  FileType fileExtension = FileType::UNKNOWN;
1189  if ( std::regex_search( m_dataDepsGraphFile.value(), fileExtensionRegexDot ) ) {
1190  fileExtension = FileType::DOT;
1191  } else if ( std::regex_search( m_dataDepsGraphFile.value(), fileExtensionRegexMd ) ) {
1192  fileExtension = FileType::MD;
1193  } else {
1194  fileExtension = FileType::DOT;
1195  fileName = fileName + ".dot";
1196  }
1197  info() << "Dumping data dependencies graph to file: " << fileName << endmsg;
1198 
1199  std::string startGraph = "";
1200  std::string stopGraph = "";
1201  // define functions
1202  std::function<std::string( const std::string&, const std::string& )> defineAlg;
1203  std::function<std::string( const DataObjID& )> defineObj;
1204  std::function<std::string( const DataObjID&, const std::string& )> defineInput;
1205  std::function<std::string( const std::string&, const DataObjID& )> defineOutput;
1206 
1207  if ( fileExtension == FileType::DOT ) {
1208  // .dot file
1209  startGraph = "digraph datadeps {\nrankdir=\"LR\";\n\n";
1210  stopGraph = "\n}\n";
1211 
1212  defineAlg = []( const std::string& alg, const std::string& idx ) -> std::string {
1213  return "Alg_" + idx + " [label=\"" + alg + "\";shape=box];\n";
1214  };
1215 
1216  defineObj = []( const DataObjID& obj ) -> std::string {
1217  return "obj_" + std::to_string( obj.hash() ) + " [label=\"" + obj.key() + "\"];\n";
1218  };
1219 
1220  defineInput = []( const DataObjID& obj, const std::string& alg ) -> std::string {
1221  return "obj_" + std::to_string( obj.hash() ) + " -> " + "Alg_" + alg + ";\n";
1222  };
1223 
1224  defineOutput = []( const std::string& alg, const DataObjID& obj ) -> std::string {
1225  return "Alg_" + alg + " -> " + "obj_" + std::to_string( obj.hash() ) + ";\n";
1226  };
1227  } else {
1228  // .md file
1229  startGraph = "```mermaid\ngraph LR;\n\n";
1230  stopGraph = "\n```\n";
1231 
1232  defineAlg = []( const std::string& alg, const std::string& idx ) -> std::string {
1233  return "Alg_" + idx + "{{" + alg + "}}\n";
1234  };
1235 
1236  defineObj = []( const DataObjID& obj ) -> std::string {
1237  return "obj_" + std::to_string( obj.hash() ) + ">" + obj.key() + "]\n";
1238  };
1239 
1240  defineInput = []( const DataObjID& obj, const std::string& alg ) -> std::string {
1241  return "obj_" + std::to_string( obj.hash() ) + " --> " + "Alg_" + alg + "\n";
1242  };
1243 
1244  defineOutput = []( const std::string& alg, const DataObjID& obj ) -> std::string {
1245  return "Alg_" + alg + " --> " + "obj_" + std::to_string( obj.hash() ) + "\n";
1246  };
1247  } // fileExtension
1248 
1249  std::ofstream dataDepthGraphFile( m_dataDepsGraphFile.value(), std::ofstream::out );
1250  dataDepthGraphFile << startGraph;
1251 
1252  // define algs and objects
1253  std::set<std::size_t> definedObjects;
1254 
1255  // Regex for selection of algs and objects
1256  std::regex algNameRegex( m_dataDepsGraphAlgoPattern.value() );
1257  std::regex objNameRegex( m_dataDepsGraphObjectPattern.value() );
1258 
1259  // inDeps and outDeps should have the same entries
1260  std::size_t algoIndex = 0ul;
1261  for ( const auto& [name, ideps] : inDeps ) {
1262  if ( not std::regex_search( name, algNameRegex ) ) continue;
1263  dataDepthGraphFile << defineAlg( name, std::to_string( algoIndex ) );
1264 
1265  // inputs
1266  for ( const auto& dep : ideps ) {
1267  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1268 
1269  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1270  if ( inserted ) dataDepthGraphFile << defineObj( dep );
1271 
1272  dataDepthGraphFile << defineInput( dep, std::to_string( algoIndex ) );
1273  } // loop on ideps
1274 
1275  const auto& odeps = outDeps.at( name );
1276  for ( const auto& dep : odeps ) {
1277  if ( not std::regex_search( dep.fullKey(), objNameRegex ) ) continue;
1278 
1279  const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1280  if ( inserted ) dataDepthGraphFile << defineObj( dep );
1281 
1282  dataDepthGraphFile << defineOutput( std::to_string( algoIndex ), dep );
1283  } // loop on odeps
1284 
1285  ++algoIndex;
1286  } // loop on inDeps
1287 
1288  // end the file
1289  dataDepthGraphFile << stopGraph;
1290  dataDepthGraphFile.close();
1291 
1292  return StatusCode::SUCCESS;
1293 }

◆ dumpSchedulerState()

void AvalancheSchedulerSvc::dumpSchedulerState ( int  iSlot)
private

Dump the state of the scheduler.

Used for debugging purposes, the state of the scheduler is dumped on screen in order to be inspected.

Definition at line 879 of file AvalancheSchedulerSvc.cpp.

879  {
880 
881  // To have just one big message
882  std::ostringstream outputMS;
883 
884  outputMS << "Dumping scheduler state\n"
885  << "=========================================================================================\n"
886  << "++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
887  << "=========================================================================================\n\n";
888 
889  //===========================================================================
890 
891  outputMS << "------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
892  << "------------------\n\n";
893 
894  // Figure if TimelineSvc is available (used below to detect threads IDs)
895  auto timelineSvc = serviceLocator()->service<ITimelineSvc>( "TimelineSvc", false );
896  if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
897  outputMS << "WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
898  } else {
899 
900  // Figure optimal printout layout
901  size_t indt( 0 );
902  for ( auto& slot : m_eventSlots ) {
903 
904  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
905  for ( uint algIndex : schedAlgs ) {
906  if ( index2algname( algIndex ).length() > indt ) indt = index2algname( algIndex ).length();
907  }
908  }
909 
910  // Figure the last running schedule across all slots
911  for ( auto& slot : m_eventSlots ) {
912 
913  const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
914  for ( uint algIndex : schedAlgs ) {
915 
916  const std::string& algoName{ index2algname( algIndex ) };
917 
918  outputMS << " task: " << std::setw( indt ) << algoName << " evt/slot: " << slot.eventContext->evt() << "/"
919  << slot.eventContext->slot();
920 
921  // Try to get POSIX threads IDs the currently running tasks are scheduled to
922  if ( timelineSvc.isValid() ) {
923  TimelineEvent te{};
924  te.algorithm = algoName;
925  te.slot = slot.eventContext->slot();
926  te.event = slot.eventContext->evt();
927 
928  if ( timelineSvc->getTimelineEvent( te ) )
929  outputMS << " thread.id: 0x" << std::hex << te.thread << std::dec;
930  else
931  outputMS << " thread.id: [unknown]"; // this means a task has just
932  // been signed off as SCHEDULED,
933  // but has not been assigned to a thread yet
934  // (i.e., not running yet)
935  }
936  outputMS << " state: [" << m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) << "]\n";
937  }
938  }
939  }
940 
941  //===========================================================================
942 
943  outputMS << "\n---------------------------- Task/CF/FSM Mapping "
944  << ( 0 > iSlot ? "[all slots] --" : "[target slot] " ) << "--------------------------\n\n";
945 
946  int slotCount = -1;
947  bool wasAlgError = ( iSlot >= 0 ) ? m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
948  subSlotAlgsInStates( m_eventSlots[iSlot], { AState::ERROR } )
949  : false;
950 
951  for ( auto& slot : m_eventSlots ) {
952  ++slotCount;
953  if ( slot.complete ) continue;
954 
955  outputMS << "[ slot: "
956  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]" )
957  << ", event: "
958  << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) : "[ctx invalid]" );
959 
960  if ( slot.eventContext->eventID().isValid() ) { outputMS << ", eventID: " << slot.eventContext->eventID(); }
961  outputMS << " ]:\n\n";
962 
963  if ( 0 > iSlot || iSlot == slotCount ) {
964 
965  // If an alg has thrown an error then it's not a failure of the CF/DF graph
966  if ( wasAlgError ) {
967  outputMS << "ERROR alg(s):";
968  int errorCount = 0;
969  const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
970  for ( uint algIndex : errorAlgs ) {
971  outputMS << " " << index2algname( algIndex );
972  ++errorCount;
973  }
974  if ( errorCount == 0 ) outputMS << " in subslot(s)";
975  outputMS << "\n\n";
976  } else {
977  // Snapshot of the Control Flow and FSM states
978  outputMS << m_precSvc->printState( slot ) << "\n";
979  }
980 
981  // Mention sub slots (this is expensive if the number of sub-slots is high)
982  if ( m_verboseSubSlots && !slot.allSubSlots.empty() ) {
983  outputMS << "\nNumber of sub-slots: " << slot.allSubSlots.size() << "\n\n";
984  auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) : "[ctx invalid]";
985  for ( auto& ss : slot.allSubSlots ) {
986  outputMS << "[ slot: " << slotID << ", sub-slot: "
987  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) : "[ctx invalid]" )
988  << ", entry: " << ss.entryPoint << ", event: "
989  << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) : "[ctx invalid]" )
990  << " ]:\n\n";
991  if ( wasAlgError ) {
992  outputMS << "ERROR alg(s):";
993  const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
994  for ( uint algIndex : errorAlgs ) { outputMS << " " << index2algname( algIndex ); }
995  outputMS << "\n\n";
996  } else {
997  // Snapshot of the Control Flow and FSM states in sub slot
998  outputMS << m_precSvc->printState( ss ) << "\n";
999  }
1000  }
1001  }
1002  }
1003  }
1004 
1005  //===========================================================================
1006 
1007  if ( 0 <= iSlot && !wasAlgError ) {
1008  outputMS << "\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1009  m_algExecStateSvc->dump( outputMS, *( m_eventSlots[iSlot].eventContext ) );
1010  }
1011 
1012  outputMS << "\n=========================================================================================\n"
1013  << "++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1014  << "=========================================================================================\n\n";
1015 
1016  info() << outputMS.str() << endmsg;
1017 }

◆ dumpState()

void AvalancheSchedulerSvc::dumpState ( )
override

Dump scheduler state for all slots.

Definition at line 618 of file AvalancheSchedulerSvc.cpp.

618 { dumpSchedulerState( -1 ); }

◆ eventFailed()

void AvalancheSchedulerSvc::eventFailed ( EventContext eventContext)
private

Method to execute if an event failed.

It can be possible that an event fails.

In this case this method is called. It dumps the state of the scheduler and marks the event as finished.

Definition at line 858 of file AvalancheSchedulerSvc.cpp.

858  {
859  const uint slotIdx = eventContext->slot();
860 
861  error() << "Event " << eventContext->evt() << " on slot " << slotIdx << " failed" << endmsg;
862 
863  dumpSchedulerState( msgLevel( MSG::VERBOSE ) ? -1 : slotIdx );
864 
865  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
866  m_precSvc->dumpPrecedenceRules( m_eventSlots[slotIdx] );
867 
868  // Push into the finished events queue the failed context
869  m_eventSlots[slotIdx].complete = true;
870  m_finishedEvents.push( m_eventSlots[slotIdx].eventContext.release() );
871 }

◆ finalize()

StatusCode AvalancheSchedulerSvc::finalize ( )
override

Finalise.

Here the scheduler is deactivated and the thread joined.

Definition at line 423 of file AvalancheSchedulerSvc.cpp.

423  {
424 
426  if ( sc.isFailure() ) warning() << "Base class could not be finalized" << endmsg;
427 
428  sc = deactivate();
429  if ( sc.isFailure() ) warning() << "Scheduler could not be deactivated" << endmsg;
430 
431  debug() << "Deleting FiberManager" << endmsg;
433 
434  info() << "Joining Scheduler thread" << endmsg;
435  m_thread.join();
436 
437  // Final error check after thread pool termination
438  if ( m_isActive == FAILURE ) {
439  error() << "problems in scheduler thread" << endmsg;
440  return StatusCode::FAILURE;
441  }
442 
443  return sc;
444 }

◆ freeSlots()

unsigned int AvalancheSchedulerSvc::freeSlots ( )
override

Get free slots number.

Definition at line 614 of file AvalancheSchedulerSvc.cpp.

614 { return std::max( m_freeSlots.load(), 0 ); }

◆ index2algname()

const std::string& AvalancheSchedulerSvc::index2algname ( unsigned int  index)
inlineprivate

Convert an integer to a name.

Definition at line 258 of file AvalancheSchedulerSvc.h.

258 { return m_algname_vect[index]; }

◆ initialize()

StatusCode AvalancheSchedulerSvc::initialize ( )
override

Initialise.

Here, among some "bureaucracy" operations, the scheduler is activated, executing the activate() function in a new thread.

In addition the algorithms list is acquired from the algResourcePool.

Definition at line 77 of file AvalancheSchedulerSvc.cpp.

77  {
78 
79  // Initialise mother class (read properties, ...)
81  if ( sc.isFailure() ) warning() << "Base class could not be initialized" << endmsg;
82 
83  // Get hold of the TBBSvc. This should initialize the thread pool
84  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
85  if ( !m_threadPoolSvc.isValid() ) {
86  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
87  return StatusCode::FAILURE;
88  }
89  auto castTPS = dynamic_cast<ThreadPoolSvc*>( m_threadPoolSvc.get() );
90  if ( !castTPS ) {
91  fatal() << "Cannot cast ThreadPoolSvc" << endmsg;
92  return StatusCode::FAILURE;
93  }
94  m_arena = castTPS->getArena();
95  if ( !m_arena ) {
96  fatal() << "Cannot find valid TBB task_arena" << endmsg;
97  return StatusCode::FAILURE;
98  }
99 
100  // Activate the scheduler in another thread.
101  info() << "Activating scheduler in a separate thread" << endmsg;
102  std::binary_semaphore fiber_manager_initalized{ 0 };
103  m_thread = std::thread( [this, &fiber_manager_initalized]() {
104  // Initialize FiberManager
105  this->m_fiberManager = std::make_unique<FiberManager>( this->m_numOffloadThreads.value() );
106  fiber_manager_initalized.release();
107  this->activate();
108  } );
109  // Wait for initialization to complete
110  fiber_manager_initalized.acquire();
111 
112  while ( m_isActive != ACTIVE ) {
113  if ( m_isActive == FAILURE ) {
114  fatal() << "Terminating initialization" << endmsg;
115  return StatusCode::FAILURE;
116  } else {
117  ON_DEBUG debug() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
118  sleep( 1 );
119  }
120  }
121 
122  if ( m_enableCondSvc ) {
123  // Get hold of the CondSvc
124  m_condSvc = serviceLocator()->service( "CondSvc" );
125  if ( !m_condSvc.isValid() ) {
126  warning() << "No CondSvc found, or not enabled. "
127  << "Will not manage CondAlgorithms" << endmsg;
128  m_enableCondSvc = false;
129  }
130  }
131 
132  // Get the algo resource pool
133  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
134  if ( !m_algResourcePool.isValid() ) {
135  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
136  return StatusCode::FAILURE;
137  }
138 
139  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
140  if ( !m_algExecStateSvc.isValid() ) {
141  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
142  return StatusCode::FAILURE;
143  }
144 
145  // Get Whiteboard
147  if ( !m_whiteboard.isValid() ) {
148  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
149  return StatusCode::FAILURE;
150  }
151 
152  // Set the MaxEventsInFlight parameters from the number of WB stores
153  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
154 
155  // Set the number of free slots
157 
158  // Get the list of algorithms
159  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
160  const unsigned int algsNumber = algos.size();
161  if ( algsNumber != 0 ) {
162  info() << "Found " << algsNumber << " algorithms" << endmsg;
163  } else {
164  error() << "No algorithms found" << endmsg;
165  return StatusCode::FAILURE;
166  }
167 
168  /* Dependencies
169  1) Look for handles in algo, if none
170  2) Assume none are required
171  */
172 
173  DataObjIDColl globalInp, globalOutp;
174 
175  // figure out all outputs
176  std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
177  for ( IAlgorithm* ialgoPtr : algos ) {
178  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
179  if ( !algoPtr ) {
180  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." << endmsg;
181  return StatusCode::FAILURE;
182  }
183 
184  DataObjIDColl algoOutputs;
185  for ( auto id : algoPtr->outputDataObjs() ) {
186  globalOutp.insert( id );
187  algoOutputs.insert( id );
188  }
189  algosOutputDependenciesMap[algoPtr->name()] = algoOutputs;
190  }
191 
192  std::ostringstream ostdd;
193  ostdd << "Data Dependencies for Algorithms:";
194 
195  std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
196  for ( IAlgorithm* ialgoPtr : algos ) {
197  Gaudi::Algorithm* algoPtr = dynamic_cast<Gaudi::Algorithm*>( ialgoPtr );
198  if ( nullptr == algoPtr ) {
199  fatal() << "Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
200  << ": this will result in a crash." << endmsg;
201  return StatusCode::FAILURE;
202  }
203 
204  DataObjIDColl i1, i2;
205  DHHVisitor avis( i1, i2 );
206  algoPtr->acceptDHVisitor( &avis );
207 
208  ostdd << "\n " << algoPtr->name();
209 
210  auto write_owners = [&avis, &ostdd]( const DataObjID& id ) {
211  auto owners = avis.owners_names_of( id );
212  if ( !owners.empty() ) { GaudiUtils::operator<<( ostdd << ' ', owners ); }
213  };
214 
215  DataObjIDColl algoDependencies;
216  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
217  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
218  DataObjID id = *idp;
219  ostdd << "\n o INPUT " << id;
220  write_owners( id );
221  algoDependencies.insert( id );
222  globalInp.insert( id );
223  }
224  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
225  ostdd << "\n o OUTPUT " << *id;
226  write_owners( *id );
227  if ( id->key().find( ":" ) != std::string::npos ) {
228  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
229  << endmsg;
230  m_showDataDeps = true;
231  }
232  }
233  } else {
234  ostdd << "\n none";
235  }
236  algosInputDependenciesMap[algoPtr->name()] = algoDependencies;
237  }
238 
239  if ( m_showDataDeps ) { info() << ostdd.str() << endmsg; }
240 
241  // If requested, dump a graph of the data dependencies in a .dot or .md file
242  if ( not m_dataDepsGraphFile.empty() ) {
243  if ( dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
244  return StatusCode::FAILURE;
245  }
246  }
247 
248  // Check if we have unmet global input dependencies, and, optionally, heal them
249  // WARNING: this step must be done BEFORE the Precedence Service is initialized
250  DataObjIDColl unmetDepInp, unusedOutp;
251  if ( m_checkDeps || m_checkOutput ) {
252  std::set<std::string> requiredInputKeys;
253  for ( auto o : globalInp ) {
254  // track aliases
255  // (assuming there should be no items with different class and same key corresponding to different objects)
256  requiredInputKeys.insert( o.key() );
257  if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
258  }
259  if ( m_checkOutput ) {
260  for ( auto o : globalOutp ) {
261  if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
262  // check ignores
263  bool ignored{};
264  for ( const std::string& algoName : m_checkOutputIgnoreList ) {
265  auto it = algosOutputDependenciesMap.find( algoName );
266  if ( it != algosOutputDependenciesMap.end() ) {
267  if ( it->second.find( o ) != it->second.end() ) {
268  ignored = true;
269  break;
270  }
271  }
272  }
273  if ( !ignored ) { unusedOutp.insert( o ); }
274  }
275  }
276  }
277  }
278 
279  if ( m_checkDeps ) {
280  if ( unmetDepInp.size() > 0 ) {
281 
282  auto printUnmet = [&]( auto msg ) {
283  for ( const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
284  msg << " o " << *o << " required by Algorithm: " << endmsg;
285 
286  for ( const auto& p : algosInputDependenciesMap )
287  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
288  }
289  };
290 
291  if ( !m_useDataLoader.empty() ) {
292 
293  // Find the DataLoader Alg
294  IAlgorithm* dataLoaderAlg( nullptr );
295  for ( IAlgorithm* algo : algos )
296  if ( m_useDataLoader == algo->name() ) {
297  dataLoaderAlg = algo;
298  break;
299  }
300 
301  if ( dataLoaderAlg == nullptr ) {
302  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
303  << "\" found, and unmet INPUT dependencies "
304  << "detected:" << endmsg;
305  printUnmet( fatal() );
306  return StatusCode::FAILURE;
307  }
308 
309  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
310  << dataLoaderAlg->name() << "\" Algorithm" << endmsg;
311  printUnmet( info() );
312 
313  // Set the property Load of DataLoader Alg
314  Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
315  if ( !dataAlg ) {
316  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Gaudi::Algorithm"
317  << endmsg;
318  return StatusCode::FAILURE;
319  }
320 
321  for ( auto& id : unmetDepInp ) {
322  ON_DEBUG debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/"
323  << dataLoaderAlg->name() << endmsg;
325  }
326 
327  } else {
328  fatal() << "Auto DataLoading not requested, "
329  << "and the following unmet INPUT dependencies were found:" << endmsg;
330  printUnmet( fatal() );
331  return StatusCode::FAILURE;
332  }
333 
334  } else {
335  info() << "No unmet INPUT data dependencies were found" << endmsg;
336  }
337  }
338 
339  if ( m_checkOutput ) {
340  if ( unusedOutp.size() > 0 ) {
341 
342  auto printUnusedOutp = [&]( auto msg ) {
343  for ( const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
344  msg << " o " << *o << " produced by Algorithm: " << endmsg;
345 
346  for ( const auto& p : algosOutputDependenciesMap )
347  if ( p.second.find( *o ) != p.second.end() ) msg << " * " << p.first << endmsg;
348  }
349  };
350 
351  fatal() << "The following unused OUTPUT items were found:" << endmsg;
352  printUnusedOutp( fatal() );
353  return StatusCode::FAILURE;
354  } else {
355  info() << "No unused OUTPUT items were found" << endmsg;
356  }
357  }
358 
359  // Get the precedence service
360  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
361  if ( !m_precSvc.isValid() ) {
362  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
363  return StatusCode::FAILURE;
364  }
365  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
366  if ( !precSvc ) {
367  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
368  return StatusCode::FAILURE;
369  }
370 
371  // Fill the containers to convert algo names to index
372  m_algname_vect.resize( algsNumber );
373  for ( IAlgorithm* algo : algos ) {
374  const std::string& name = algo->name();
375  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
378  }
379 
380  // Shortcut for the message service
381  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
382  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
383 
385  for ( size_t i = 0; i < m_maxEventsInFlight; ++i ) {
386  m_eventSlots.emplace_back( algsNumber, precSvc->getRules()->getControlFlowNodeCounter(), messageSvc );
387  m_eventSlots.back().complete = true;
388  }
389 
390  if ( m_threadPoolSize > 1 ) { m_maxAlgosInFlight = (size_t)m_threadPoolSize; }
391 
392  // Clearly inform about the level of concurrency
393  info() << "Concurrency level information:" << endmsg;
394  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
395  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
396  info() << " o Fiber thread pool size: " << m_numOffloadThreads << endmsg;
397 
398  // Inform about task scheduling prescriptions
399  info() << "Task scheduling settings:" << endmsg;
400  info() << " o Avalanche generation mode: "
401  << ( m_optimizationMode.empty() ? "disabled" : m_optimizationMode.toString() ) << endmsg;
402  info() << " o Preemptive scheduling of CPU-blocking tasks: "
404  ? ( "enabled (max. " + std::to_string( m_maxBlockingAlgosInFlight ) + " concurrent tasks)" )
405  : "disabled" )
406  << endmsg;
407  info() << " o Scheduling of condition tasks: " << ( m_enableCondSvc ? "enabled" : "disabled" ) << endmsg;
408 
409  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
410 
411  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
412 
413  // Simulate execution flow
414  if ( m_simulateExecution ) sc = m_precSvc->simulate( m_eventSlots[0] );
415 
416  return sc;
417 }

◆ isStalled()

bool AvalancheSchedulerSvc::isStalled ( const EventSlot slot) const
private

Check if scheduling in a particular slot is in a stall.

Check if we are in present of a stall condition for a particular slot.

This is the case when a slot has no actions queued in the actionsQueue, has no scheduled algorithms and has no algorithms with all of its dependencies satisfied.

Definition at line 840 of file AvalancheSchedulerSvc.cpp.

840  {
841 
842  if ( !slot.algsStates.containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
843  !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
844 
845  error() << "*** Stall detected, event context: " << slot.eventContext.get() << endmsg;
846 
847  return true;
848  }
849  return false;
850 }

◆ iterate()

StatusCode AvalancheSchedulerSvc::iterate ( )
private

Loop on all slots to schedule DATAREADY algorithms and sign off ready events.

Loop on all slots to schedule DATAREADY algorithms, sign off ready ones or detect execution stalls.

To check if an event is finished the method verifies that the root control flow decision of the task precedence graph is resolved and there are no algorithms moving in-between INITIAL and EVTACCEPTED FSM states.

Definition at line 665 of file AvalancheSchedulerSvc.cpp.

665  {
666 
667  StatusCode global_sc( StatusCode::SUCCESS );
668 
669  // Retry algorithms
670  const size_t retries = m_retryQueue.size();
671  for ( unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
672  TaskSpec retryTS = std::move( m_retryQueue.front() );
673  m_retryQueue.pop();
674  global_sc = schedule( std::move( retryTS ) );
675  }
676 
677  // Loop over all slots
678  OccupancySnapshot nextSnap;
679  auto now = std::chrono::system_clock::now();
680  for ( EventSlot& thisSlot : m_eventSlots ) {
681 
682  // Ignore slots without a valid context (relevant when populating scheduler for first time)
683  if ( !thisSlot.eventContext ) continue;
684 
685  int iSlot = thisSlot.eventContext->slot();
686 
687  // Cache the states of the algorithms to improve readability and performance
688  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
689 
690  StatusCode partial_sc = StatusCode::FAILURE;
691 
692  // Make an occupancy snapshot
695 
696  // Initialise snapshot
697  if ( nextSnap.states.empty() ) {
698  nextSnap.time = now;
699  nextSnap.states.resize( m_eventSlots.size() );
700  }
701 
702  // Store alg states
703  std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
704  slotStateTotals.resize( AState::MAXVALUE );
705  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
706  slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset( AState( state ) );
707  }
708 
709  // Add subslot alg states
710  for ( auto& subslot : thisSlot.allSubSlots ) {
711  for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
712  slotStateTotals[state] += subslot.algsStates.sizeOfSubset( AState( state ) );
713  }
714  }
715  }
716 
717  // Perform DR->SCHEDULED
718  const auto& drAlgs = thisAlgsStates.algsInState( AState::DATAREADY );
719  for ( uint algIndex : drAlgs ) {
720  const std::string& algName{ index2algname( algIndex ) };
721  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
722  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
723 
724  partial_sc =
725  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
726 
727  ON_VERBOSE if ( partial_sc.isFailure() ) verbose()
728  << "Could not apply transition from " << AState::DATAREADY << " for algorithm " << algName
729  << " on processing slot " << iSlot << endmsg;
730  }
731 
732  // Check for algorithms ready in sub-slots
733  for ( auto& subslot : thisSlot.allSubSlots ) {
734  const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
735  for ( uint algIndex : drAlgsSubSlot ) {
736  const std::string& algName{ index2algname( algIndex ) };
737  unsigned int rank{ m_optimizationMode.empty() ? 0 : m_precSvc->getPriority( algName ) };
738  bool asynchronous{ m_precSvc->isAsynchronous( algName ) };
739  partial_sc =
740  schedule( TaskSpec( nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
741  }
742  }
743 
744  if ( m_dumpIntraEventDynamics ) {
746  s << "START, " << thisAlgsStates.sizeOfSubset( AState::CONTROLREADY ) << ", "
747  << thisAlgsStates.sizeOfSubset( AState::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( AState::SCHEDULED )
748  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
751  std::ofstream myfile;
752  myfile.open( "IntraEventFSMOccupancy_" + threads + "T.csv", std::ios::app );
753  myfile << s.str();
754  myfile.close();
755  }
756 
757  // Not complete because this would mean that the slot is already free!
758  if ( m_precSvc->CFRulesResolved( thisSlot ) &&
759  !thisSlot.algsStates.containsAny(
760  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
761  !subSlotAlgsInStates( thisSlot,
762  { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
763  !thisSlot.complete ) {
764 
765  thisSlot.complete = true;
766  // if the event did not fail, add it to the finished events
767  // otherwise it is taken care of in the error handling
768  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
769  ON_DEBUG debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
770  << thisSlot.eventContext->slot() << ")." << endmsg;
771  m_finishedEvents.push( thisSlot.eventContext.release() );
772  }
773 
774  // now let's return the fully evaluated result of the control flow
775  ON_DEBUG debug() << m_precSvc->printState( thisSlot ) << endmsg;
776 
777  thisSlot.eventContext.reset( nullptr );
778 
779  } else if ( isStalled( thisSlot ) ) {
780  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
781  eventFailed( thisSlot.eventContext.get() ); // can't release yet
782  }
783  partial_sc.ignore();
784  } // end loop on slots
785 
786  // Process snapshot
787  if ( !nextSnap.states.empty() ) {
788  m_lastSnapshot = nextSnap.time;
789  m_snapshotCallback( std::move( nextSnap ) );
790  }
791 
792  ON_VERBOSE verbose() << "Iteration done." << endmsg;
793  m_needsUpdate.store( false );
794  return global_sc;
795 }

◆ next()

bool AvalancheSchedulerSvc::next ( TaskSpec ts,
bool  asynchronous 
)
inline

Definition at line 377 of file AvalancheSchedulerSvc.h.

377  {
378  if ( asynchronous ) { return m_scheduledAsynchronousQueue.try_pop( ts ); }
379  return m_scheduledQueue.try_pop( ts );
380  }

◆ popFinishedEvent()

StatusCode AvalancheSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
override

Blocks until an event is available.

Get a finished event or block until one becomes available.

Definition at line 624 of file AvalancheSchedulerSvc.cpp.

624  {
625 
626  // ON_DEBUG debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
627  if ( m_freeSlots.load() == (int)m_maxEventsInFlight || m_isActive == INACTIVE ) {
628  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
629  // << " active: " << m_isActive << endmsg;
630  return StatusCode::FAILURE;
631  } else {
632  // ON_DEBUG debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
633  // << " active: " << m_isActive << endmsg;
634  m_finishedEvents.pop( eventContext );
635  ++m_freeSlots;
636  ON_DEBUG debug() << "Popped slot " << eventContext->slot() << " (event " << eventContext->evt() << ")" << endmsg;
637  return StatusCode::SUCCESS;
638  }
639 }

◆ pushNewEvent()

StatusCode AvalancheSchedulerSvc::pushNewEvent ( EventContext eventContext)
override

Make an event available to the scheduler.

Add event to the scheduler.

There are two cases possible: 1) No slot is free. A StatusCode::FAILURE is returned. 2) At least one slot is free. An action which resets the slot and kicks off its update is queued.

Definition at line 545 of file AvalancheSchedulerSvc.cpp.

545  {
546 
547  if ( !eventContext ) {
548  fatal() << "Event context is nullptr" << endmsg;
549  return StatusCode::FAILURE;
550  }
551 
552  if ( m_freeSlots.load() == 0 ) {
553  ON_DEBUG debug() << "A free processing slot could not be found." << endmsg;
554  return StatusCode::FAILURE;
555  }
556 
557  // no problem as push new event is only called from one thread (event loop manager)
558  --m_freeSlots;
559 
560  auto action = [this, eventContext]() -> StatusCode {
561  // Event processing slot forced to be the same as the wb slot
562  const unsigned int thisSlotNum = eventContext->slot();
563  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
564  if ( !thisSlot.complete ) {
565  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
566  return StatusCode::FAILURE;
567  }
568 
569  ON_DEBUG debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
570  thisSlot.reset( eventContext );
571 
572  // Result status code:
574 
575  // promote to CR and DR the initial set of algorithms
576  Cause cs = { Cause::source::Root, "RootDecisionHub" };
577  if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
578  error() << "Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum << endmsg;
579  result = StatusCode::FAILURE;
580  }
581 
582  if ( this->iterate().isFailure() ) {
583  error() << "Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum << endmsg;
584  result = StatusCode::FAILURE;
585  }
586 
587  return result;
588  }; // end of lambda
589 
590  // Kick off scheduling
591  ON_VERBOSE {
592  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
593  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
594  }
595 
596  m_actionsQueue.push( action );
597 
598  return StatusCode::SUCCESS;
599 }

◆ pushNewEvents()

StatusCode AvalancheSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
override

Definition at line 603 of file AvalancheSchedulerSvc.cpp.

603  {
604  StatusCode sc;
605  for ( auto context : eventContexts ) {
606  sc = pushNewEvent( context );
607  if ( sc != StatusCode::SUCCESS ) return sc;
608  }
609  return sc;
610 }

◆ recordOccupancy()

void AvalancheSchedulerSvc::recordOccupancy ( int  samplePeriod,
std::function< void(OccupancySnapshot)>  callback 
)
overridevirtual

Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each sample, apply the callback function to the result.

Definition at line 1162 of file AvalancheSchedulerSvc.cpp.

1162  {
1163 
1164  auto action = [this, samplePeriod, callback = std::move( callback )]() -> StatusCode {
1165  if ( samplePeriod < 0 ) {
1167  } else {
1170  }
1171  return StatusCode::SUCCESS;
1172  };
1173 
1174  m_actionsQueue.push( std::move( action ) );
1175 }

◆ revise()

StatusCode AvalancheSchedulerSvc::revise ( unsigned int  iAlgo,
EventContext contextPtr,
AState  state,
bool  iterate = false 
)
private

Definition at line 799 of file AvalancheSchedulerSvc.cpp.

799  {
800  StatusCode sc;
801  auto slotIndex = contextPtr->slot();
802  EventSlot& slot = m_eventSlots[slotIndex];
803  Cause cs = { Cause::source::Task, index2algname( iAlgo ) };
804 
805  if ( contextPtr->usesSubSlot() ) {
806  // Sub-slot
807  auto subSlotIndex = contextPtr->subSlot();
808  EventSlot& subSlot = slot.allSubSlots[subSlotIndex];
809 
810  sc = subSlot.algsStates.set( iAlgo, state );
811 
812  if ( sc.isSuccess() ) {
813  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
814  << ", subslot:" << subSlotIndex << ", event:" << contextPtr->evt() << "]" << endmsg;
815  // Revise states of algorithms downstream the precedence graph
816  if ( iterate ) sc = m_precSvc->iterate( subSlot, cs );
817  }
818  } else {
819  // Event level (standard behaviour)
820  sc = slot.algsStates.set( iAlgo, state );
821 
822  if ( sc.isSuccess() ) {
823  ON_VERBOSE verbose() << "Promoted " << index2algname( iAlgo ) << " to " << state << " [slot:" << slotIndex
824  << ", event:" << contextPtr->evt() << "]" << endmsg;
825  // Revise states of algorithms downstream the precedence graph
826  if ( iterate ) sc = m_precSvc->iterate( slot, cs );
827  }
828  }
829  return sc;
830 }

◆ schedule()

StatusCode AvalancheSchedulerSvc::schedule ( TaskSpec &&  ts)
private

Definition at line 1021 of file AvalancheSchedulerSvc.cpp.

1021  {
1022 
1023  // Check if a free Algorithm instance is available
1024  StatusCode getAlgSC( m_algResourcePool->acquireAlgorithm( ts.algName, ts.algPtr ) );
1025 
1026  // If an instance is available, proceed to scheduling
1027  StatusCode sc;
1028  if ( getAlgSC.isSuccess() ) {
1029 
1030  // Decide how to schedule the task and schedule it
1031  if ( -100 != m_threadPoolSize ) {
1032 
1033  // Cache values before moving the TaskSpec further
1034  unsigned int algIndex{ ts.algIndex };
1035  std::string_view algName( ts.algName );
1036  unsigned int algRank{ ts.algRank };
1037  bool asynchronous{ ts.asynchronous };
1038  int slotIndex{ ts.slotIndex };
1039  EventContext* contextPtr{ ts.contextPtr };
1040 
1041  if ( asynchronous ) {
1042  // Add to asynchronous scheduled queue
1044 
1045  // Schedule task
1046  m_fiberManager->schedule( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1047  }
1048 
1049  if ( !asynchronous ) {
1050  // Add the algorithm to the scheduled queue
1051  m_scheduledQueue.push( std::move( ts ) );
1052 
1053  // Prepare a TBB task that will execute the Algorithm according to the above queued specs
1054  m_arena->enqueue( AlgTask( this, serviceLocator(), m_algExecStateSvc, asynchronous ) );
1055  ++m_algosInFlight;
1056  }
1057  sc = revise( algIndex, contextPtr, AState::SCHEDULED );
1058 
1059  ON_DEBUG debug() << "Scheduled " << algName << " [slot:" << slotIndex << ", event:" << contextPtr->evt()
1060  << ", rank:" << algRank << ", asynchronous:" << ( asynchronous ? "yes" : "no" )
1061  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1063  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1064  : "" )
1065  << endmsg;
1066 
1067  } else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
1068  // Beojan: I don't think this bit works. ts hasn't been pushed into any queue so AlgTask won't retrieve it
1069  ++m_algosInFlight;
1070  sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1071  AlgTask( this, serviceLocator(), m_algExecStateSvc, ts.asynchronous )();
1072  --m_algosInFlight;
1073  }
1074  } else { // if no Algorithm instance available, retry later
1075 
1076  sc = revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1077  // Add the algorithm to the retry queue
1078  m_retryQueue.push( std::move( ts ) );
1079  }
1080 
1082 
1083  return sc;
1084 }

◆ scheduleEventView()

StatusCode AvalancheSchedulerSvc::scheduleEventView ( const EventContext sourceContext,
const std::string nodeName,
std::unique_ptr< EventContext viewContext 
)
overridevirtual

Method to inform the scheduler about event views.

Definition at line 1122 of file AvalancheSchedulerSvc.cpp.

1123  {
1124  // Prevent view nesting
1125  if ( sourceContext->usesSubSlot() ) {
1126  fatal() << "Attempted to nest EventViews at node " << nodeName << ": this is not supported" << endmsg;
1127  return StatusCode::FAILURE;
1128  }
1129 
1130  ON_VERBOSE verbose() << "Queuing a view for [" << viewContext.get() << "]" << endmsg;
1131 
1132  // It's not possible to create an std::functional from a move-capturing lambda
1133  // So, we have to release the unique pointer
1134  auto action = [this, slotIndex = sourceContext->slot(), viewContextPtr = viewContext.release(),
1135  &nodeName]() -> StatusCode {
1136  // Attach the sub-slot to the top-level slot
1137  EventSlot& topSlot = this->m_eventSlots[slotIndex];
1138 
1139  if ( viewContextPtr ) {
1140  // Re-create the unique pointer
1141  auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1142  topSlot.addSubSlot( std::move( viewContext ), nodeName );
1143  return StatusCode::SUCCESS;
1144  } else {
1145  // Disable the view node if there are no views
1146  topSlot.disableSubSlots( nodeName );
1147  return StatusCode::SUCCESS;
1148  }
1149  };
1150 
1151  m_actionsQueue.push( std::move( action ) );
1152 
1153  return StatusCode::SUCCESS;
1154 }

◆ signoff()

StatusCode AvalancheSchedulerSvc::signoff ( const TaskSpec ts)
private

The call to this method is triggered only from within the AlgTask.

Definition at line 1091 of file AvalancheSchedulerSvc.cpp.

1091  {
1092 
1093  Gaudi::Hive::setCurrentContext( ts.contextPtr );
1094 
1095  --m_algosInFlight;
1096 
1097  const AlgExecState& algstate = m_algExecStateSvc->algExecState( ts.algPtr, *( ts.contextPtr ) );
1098  AState state = algstate.execStatus().isSuccess()
1099  ? ( algstate.filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1100  : AState::ERROR;
1101 
1102  // Update algorithm state and revise the downstream states
1103  auto sc = revise( ts.algIndex, ts.contextPtr, state, true );
1104 
1105  ON_DEBUG debug() << "Executed " << ts.algName << " [slot:" << ts.slotIndex << ", event:" << ts.contextPtr->evt()
1106  << ", rank:" << ts.algRank << ", asynchronous:" << ( ts.asynchronous ? "yes" : "no" )
1107  << "]. Scheduled algorithms: " << m_algosInFlight + m_blockingAlgosInFlight
1109  ? " (including " + std::to_string( m_blockingAlgosInFlight ) + " - off TBB runtime)"
1110  : "" )
1111  << endmsg;
1112 
1113  // Prompt a call to updateStates
1114  m_needsUpdate.store( true );
1115  return sc;
1116 }

◆ tryPopFinishedEvent()

StatusCode AvalancheSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
override

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 645 of file AvalancheSchedulerSvc.cpp.

645  {
646 
647  if ( m_finishedEvents.try_pop( eventContext ) ) {
648  ON_DEBUG debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
649  << endmsg;
650  ++m_freeSlots;
651  return StatusCode::SUCCESS;
652  }
653  return StatusCode::FAILURE;
654 }

Friends And Related Function Documentation

◆ AlgTask

friend class AlgTask
friend

Definition at line 116 of file AvalancheSchedulerSvc.h.

Member Data Documentation

◆ m_actionsQueue

tbb::concurrent_bounded_queue<action> AvalancheSchedulerSvc::m_actionsQueue
private

Queue where closures are stored and picked for execution.

Definition at line 319 of file AvalancheSchedulerSvc.h.

◆ m_algExecStateSvc

SmartIF<IAlgExecStateSvc> AvalancheSchedulerSvc::m_algExecStateSvc
private

Algorithm execution state manager.

Definition at line 279 of file AvalancheSchedulerSvc.h.

◆ m_algname_index_map

std::unordered_map<std::string, unsigned int> AvalancheSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 255 of file AvalancheSchedulerSvc.h.

◆ m_algname_vect

std::vector<std::string> AvalancheSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 261 of file AvalancheSchedulerSvc.h.

◆ m_algosInFlight

unsigned int AvalancheSchedulerSvc::m_algosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 285 of file AvalancheSchedulerSvc.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AvalancheSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 314 of file AvalancheSchedulerSvc.h.

◆ m_arena

tbb::task_arena* AvalancheSchedulerSvc::m_arena { nullptr }
private

Definition at line 369 of file AvalancheSchedulerSvc.h.

◆ m_blockingAlgosInFlight

unsigned int AvalancheSchedulerSvc::m_blockingAlgosInFlight = 0
private

Number of algorithms presently in flight.

Definition at line 288 of file AvalancheSchedulerSvc.h.

◆ m_checkDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkDeps
private
Initial value:
{ this, "CheckDependencies", false,
"Runtime check of Algorithm Input Data Dependencies" }

Definition at line 196 of file AvalancheSchedulerSvc.h.

◆ m_checkOutput

Gaudi::Property<bool> AvalancheSchedulerSvc::m_checkOutput
private
Initial value:
{ this, "CheckOutputUsage", false,
"Runtime check of Algorithm Output Data usage" }

Definition at line 198 of file AvalancheSchedulerSvc.h.

◆ m_checkOutputIgnoreList

Gaudi::Property<std::vector<std::string> > AvalancheSchedulerSvc::m_checkOutputIgnoreList
private
Initial value:
{
this,
"CheckOutputUsageIgnoreList",
{},
"Ignore outputs of the Algorithms of this name when doing the check",
"OrderedSet<std::string>" }

Definition at line 200 of file AvalancheSchedulerSvc.h.

◆ m_condSvc

SmartIF<ICondSvc> AvalancheSchedulerSvc::m_condSvc
private

A shortcut to service for Conditions handling.

Definition at line 282 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphAlgoPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
private
Initial value:
{
this, "DataDepsGraphAlgPattern", ".*",
"Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
"deps graph" }

Definition at line 228 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphFile

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphFile
private
Initial value:
{
this, "DataDepsGraphFile", "",
"Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
"Algorithms" }

Definition at line 223 of file AvalancheSchedulerSvc.h.

◆ m_dataDepsGraphObjectPattern

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
private
Initial value:
{
this, "DataDepsGraphObjectPattern", ".*",
"Regex pattern for selecting desired input or output by their full key" }

Definition at line 233 of file AvalancheSchedulerSvc.h.

◆ m_dumpIntraEventDynamics

Gaudi::Property<bool> AvalancheSchedulerSvc::m_dumpIntraEventDynamics
private
Initial value:
{ this, "DumpIntraEventDynamics", false,
"Dump intra-event concurrency dynamics to csv file" }

Definition at line 187 of file AvalancheSchedulerSvc.h.

◆ m_enableCondSvc

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enableCondSvc { this, "EnableConditions", false, "Enable ConditionsSvc" }
private

Definition at line 210 of file AvalancheSchedulerSvc.h.

◆ m_enablePreemptiveBlockingTasks

Gaudi::Property<bool> AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
private
Initial value:
{
this, "PreemptiveBlockingTasks", false,
"Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." }

Definition at line 189 of file AvalancheSchedulerSvc.h.

◆ m_eventSlots

std::vector<EventSlot> AvalancheSchedulerSvc::m_eventSlots
private

Vector of events slots.

Definition at line 270 of file AvalancheSchedulerSvc.h.

◆ m_fiberManager

std::unique_ptr<FiberManager> AvalancheSchedulerSvc::m_fiberManager { nullptr }
private

Definition at line 370 of file AvalancheSchedulerSvc.h.

◆ m_finishedEvents

tbb::concurrent_bounded_queue<EventContext*> AvalancheSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 276 of file AvalancheSchedulerSvc.h.

◆ m_freeSlots

std::atomic_int AvalancheSchedulerSvc::m_freeSlots { 0 }
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 273 of file AvalancheSchedulerSvc.h.

◆ m_isActive

std::atomic<ActivationState> AvalancheSchedulerSvc::m_isActive { INACTIVE }
private

Flag to track if the scheduler is active or not.

Definition at line 246 of file AvalancheSchedulerSvc.h.

◆ m_lastSnapshot

std::chrono::system_clock::time_point AvalancheSchedulerSvc::m_lastSnapshot = std::chrono::system_clock::now()
private

Definition at line 167 of file AvalancheSchedulerSvc.h.

◆ m_maxAlgosInFlight

size_t AvalancheSchedulerSvc::m_maxAlgosInFlight { 1 }
private

Definition at line 373 of file AvalancheSchedulerSvc.h.

◆ m_maxBlockingAlgosInFlight

Gaudi::Property<unsigned int> AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
private
Initial value:
{
this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" }

Definition at line 180 of file AvalancheSchedulerSvc.h.

◆ m_maxEventsInFlight

size_t AvalancheSchedulerSvc::m_maxEventsInFlight { 0 }
private

Definition at line 372 of file AvalancheSchedulerSvc.h.

◆ m_maxParallelismExtra

Gaudi::Property<int> AvalancheSchedulerSvc::m_maxParallelismExtra
private
Initial value:
{
this, "maxParallelismExtra", 0,
"Allows to add some extra threads to the maximum parallelism set in TBB"
"The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" }

Definition at line 175 of file AvalancheSchedulerSvc.h.

◆ m_needsUpdate

std::atomic<bool> AvalancheSchedulerSvc::m_needsUpdate { true }
private

Definition at line 363 of file AvalancheSchedulerSvc.h.

◆ m_numOffloadThreads

Gaudi::Property<int> AvalancheSchedulerSvc::m_numOffloadThreads
private
Initial value:
{
this, "NumOffloadThreads", 2,
"Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
"and use Boost Fiber functionality to suspend while waiting for offloaded work." }

Definition at line 192 of file AvalancheSchedulerSvc.h.

◆ m_optimizationMode

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_optimizationMode
private
Initial value:
{ this, "Optimizer", "",
"The following modes are currently available: PCE, COD, DRE, E" }

Definition at line 185 of file AvalancheSchedulerSvc.h.

◆ m_precSvc

SmartIF<IPrecedenceSvc> AvalancheSchedulerSvc::m_precSvc
private

A shortcut to the Precedence Service.

Definition at line 264 of file AvalancheSchedulerSvc.h.

◆ m_retryQueue

std::queue<TaskSpec> AvalancheSchedulerSvc::m_retryQueue
private

Definition at line 360 of file AvalancheSchedulerSvc.h.

◆ m_scheduledAsynchronousQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
private

Definition at line 359 of file AvalancheSchedulerSvc.h.

◆ m_scheduledQueue

tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> AvalancheSchedulerSvc::m_scheduledQueue
private

Queues for scheduled algorithms.

Definition at line 358 of file AvalancheSchedulerSvc.h.

◆ m_showControlFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showControlFlow
private
Initial value:
{ this, "ShowControlFlow", false,
"Show the configuration of all Algorithms and Sequences" }

Definition at line 218 of file AvalancheSchedulerSvc.h.

◆ m_showDataDeps

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataDeps
private
Initial value:
{ this, "ShowDataDependencies", true,
"Show the INPUT and OUTPUT data dependencies of Algorithms" }

Definition at line 212 of file AvalancheSchedulerSvc.h.

◆ m_showDataFlow

Gaudi::Property<bool> AvalancheSchedulerSvc::m_showDataFlow
private
Initial value:
{ this, "ShowDataFlow", false,
"Show the configuration of DataFlow between Algorithms" }

Definition at line 215 of file AvalancheSchedulerSvc.h.

◆ m_simulateExecution

Gaudi::Property<bool> AvalancheSchedulerSvc::m_simulateExecution
private
Initial value:
{
this, "SimulateExecution", false,
"Flag to perform single-pass simulation of execution flow before the actual execution" }

Definition at line 182 of file AvalancheSchedulerSvc.h.

◆ m_snapshotCallback

std::function<void( OccupancySnapshot )> AvalancheSchedulerSvc::m_snapshotCallback
private

Definition at line 168 of file AvalancheSchedulerSvc.h.

◆ m_snapshotInterval

std::chrono::duration<int64_t, std::milli> AvalancheSchedulerSvc::m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min()
private

Definition at line 166 of file AvalancheSchedulerSvc.h.

◆ m_thread

std::thread AvalancheSchedulerSvc::m_thread
private

The thread in which the activate function runs.

Definition at line 249 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSize

Gaudi::Property<int> AvalancheSchedulerSvc::m_threadPoolSize
private
Initial value:
{
this, "ThreadPoolSize", -1,
"Size of the global thread pool initialised by TBB; a value of -1 requests to use"
"all available hardware threads; -100 requests to bypass TBB executing "
"all algorithms in the scheduler's thread." }

Definition at line 170 of file AvalancheSchedulerSvc.h.

◆ m_threadPoolSvc

SmartIF<IThreadPoolSvc> AvalancheSchedulerSvc::m_threadPoolSvc
private

Definition at line 368 of file AvalancheSchedulerSvc.h.

◆ m_useDataLoader

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_useDataLoader
private
Initial value:
{ this, "DataLoaderAlg", "",
"Attribute unmet input dependencies to this DataLoader Algorithm" }

Definition at line 207 of file AvalancheSchedulerSvc.h.

◆ m_verboseSubSlots

Gaudi::Property<bool> AvalancheSchedulerSvc::m_verboseSubSlots { this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" }
private

Definition at line 221 of file AvalancheSchedulerSvc.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AvalancheSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 267 of file AvalancheSchedulerSvc.h.

◆ m_whiteboardSvcName

Gaudi::Property<std::string> AvalancheSchedulerSvc::m_whiteboardSvcName { this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" }
private

Definition at line 179 of file AvalancheSchedulerSvc.h.


The documentation for this class was generated from the following files:
IOTest.evt
evt
Definition: IOTest.py:107
EventSlot::eventContext
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
Definition: EventSlot.h:83
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:267
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
PrecedenceSvc
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:31
std::vector::resize
T resize(T... args)
Gaudi::Details::PropertyBase::name
const std::string name() const
property name
Definition: PropertyBase.h:39
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:207
std::string
STL class.
std::list< IAlgorithm * >
Gaudi::Algorithm::acceptDHVisitor
void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:186
Read.app
app
Definition: Read.py:36
std::move
T move(T... args)
Gaudi::Algorithm::name
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:526
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:185
std::unordered_set< DataObjID, DataObjID_Hasher >
std::vector::reserve
T reserve(T... args)
ON_VERBOSE
#define ON_VERBOSE
Definition: AvalancheSchedulerSvc.cpp:46
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:163
concurrency::PrecedenceRulesGraph::getControlFlowNodeCounter
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
Definition: PrecedenceRulesGraph.h:659
gaudirun.s
string s
Definition: gaudirun.py:346
std::vector< int >
std::unordered_set::find
T find(T... args)
std::map::size
T size(T... args)
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:665
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
AlgsExecutionStates
Definition: AlgsExecutionStates.h:38
DataHandleHolderBase::addDependency
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Definition: DataHandleHolderBase.h:86
std::chrono::duration
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
AvalancheSchedulerSvc::m_scheduledAsynchronousQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
Definition: AvalancheSchedulerSvc.h:359
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:167
PrecedenceSvc::getRules
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:75
std::stringstream
STL class.
std::unique_ptr::get
T get(T... args)
EventStatus::Success
@ Success
Definition: IAlgExecStateSvc.h:72
std::unique_ptr::release
T release(T... args)
EventContext::usesSubSlot
bool usesSubSlot() const
Definition: EventContext.h:53
AvalancheSchedulerSvc::m_dataDepsGraphAlgoPattern
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Definition: AvalancheSchedulerSvc.h:228
std::vector::back
T back(T... args)
std::function
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:358
AvalancheSchedulerSvc::m_fiberManager
std::unique_ptr< FiberManager > m_fiberManager
Definition: AvalancheSchedulerSvc.h:370
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:1021
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:218
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:363
DHHVisitor
Definition: DataHandleHolderVisitor.h:21
GaudiPartProp.tests.id
id
Definition: tests.py:111
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:210
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:514
std::unique_ptr::reset
T reset(T... args)
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:270
Gaudi::DataHandle::Writer
@ Writer
Definition: DataHandle.h:40
concurrency::AlgorithmNode::getAlgoIndex
unsigned int getAlgoIndex() const
Get algorithm index.
Definition: PrecedenceRulesGraph.h:520
AvalancheSchedulerSvc::m_numOffloadThreads
Gaudi::Property< int > m_numOffloadThreads
Definition: AvalancheSchedulerSvc.h:192
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:369
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:279
EventSlot::complete
bool complete
Flags completion of the event.
Definition: EventSlot.h:89
std::hex
T hex(T... args)
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:163
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:282
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:858
ManySmallAlgs.alg
alg
Definition: ManySmallAlgs.py:81
TimelineEvent
Definition: ITimelineSvc.h:23
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:170
EventSlot::addSubSlot
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
Definition: EventSlot.h:61
EventStatus::AlgStall
@ AlgStall
Definition: IAlgExecStateSvc.h:72
AvalancheSchedulerSvc::m_dataDepsGraphObjectPattern
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Definition: AvalancheSchedulerSvc.h:233
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:372
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:180
GaudiUtils::operator<<
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Definition: SerializeSTL.h:90
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
ITimelineSvc
Definition: ITimelineSvc.h:37
std::map::at
T at(T... args)
IAlgorithm
Definition: IAlgorithm.h:38
std::atomic::load
T load(T... args)
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
std::ofstream
STL class.
AvalancheSchedulerSvc::m_maxParallelismExtra
Gaudi::Property< int > m_maxParallelismExtra
Definition: AvalancheSchedulerSvc.h:175
compareRootHistos.ts
ts
Definition: compareRootHistos.py:488
EventContext::slot
ContextID_t slot() const
Definition: EventContext.h:51
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:189
FiberManager::schedule
void schedule(F &&func)
Schedule work to run on the asynchronous pool.
Definition: FiberManager.h:54
Io::UNKNOWN
@ UNKNOWN
Definition: IFileMgr.h:156
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:179
AvalancheSchedulerSvc::m_checkOutput
Gaudi::Property< bool > m_checkOutput
Definition: AvalancheSchedulerSvc.h:198
EventSlot::reset
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
Definition: EventSlot.h:49
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:237
std::to_string
T to_string(T... args)
EventSlot::disableSubSlots
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
Definition: EventSlot.h:78
AlgExecState::execStatus
const StatusCode & execStatus() const
Definition: IAlgExecStateSvc.h:42
std::ofstream::close
T close(T... args)
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:182
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:258
EventSlot::allSubSlots
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Definition: EventSlot.h:100
AvalancheSchedulerSvc::AState
AlgsExecutionStates::State AState
Definition: AvalancheSchedulerSvc.h:160
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:163
std::ofstream::open
T open(T... args)
SmartIF< IMessageSvc >
genconfuser.verbose
verbose
Definition: genconfuser.py:28
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:285
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::map
STL class.
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:314
std::regex
Cause::source::Root
@ Root
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:212
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:373
DataObjID
Definition: DataObjID.h:47
std::regex_search
T regex_search(T... args)
AlgsExecutionStates::containsAny
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
Definition: AlgsExecutionStates.h:75
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
std::chrono::duration::min
T min(T... args)
std::ostringstream
STL class.
ON_DEBUG
#define ON_DEBUG
Definition: AvalancheSchedulerSvc.cpp:45
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
std::vector::emplace_back
T emplace_back(T... args)
concurrency::PrecedenceRulesGraph::getAlgorithmNode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
Definition: PrecedenceRulesGraph.h:651
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:187
AlgsExecutionStates::set
StatusCode set(unsigned int iAlgo, State newState)
Definition: AlgsExecutionStates.cpp:23
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:360
MSG::VERBOSE
@ VERBOSE
Definition: IMessageSvc.h:25
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
EventContext::subSlot
ContextID_t subSlot() const
Definition: EventContext.h:52
Cause::source::Task
@ Task
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
DataHandleHolderBase::outputDataObjs
const DataObjIDColl & outputDataObjs() const override
Definition: DataHandleHolderBase.h:84
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:166
Gaudi::Decays::valid
bool valid(Iterator begin, Iterator end)
check the validness of the trees or nodes
Definition: Nodes.h:36
std
STL namespace.
std::set::insert
T insert(T... args)
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:368
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
AvalancheSchedulerSvc::m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphFile
Definition: AvalancheSchedulerSvc.h:223
EventContext
Definition: EventContext.h:34
TimelineEvent::algorithm
std::string algorithm
Definition: ITimelineSvc.h:31
Gaudi::Property::toString
std::string toString() const override
value -> string
Definition: Property.h:415
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:799
AlgExecState::filterPassed
bool filterPassed() const
Definition: IAlgExecStateSvc.h:40
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:457
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:319
std::unordered_set::empty
T empty(T... args)
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:255
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:196
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:840
AvalancheSchedulerSvc::AlgTask
friend class AlgTask
Definition: AvalancheSchedulerSvc.h:116
std::ostringstream::str
T str(T... args)
std::atomic::store
T store(T... args)
std::size_t
DataHandleHolderBase::inputDataObjs
const DataObjIDColl & inputDataObjs() const override
Definition: DataHandleHolderBase.h:83
DataObjID::hash
std::size_t hash() const
Definition: DataObjID.h:69
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:249
std::unordered_set::end
T end(T... args)
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:215
AlgExecState
Definition: IAlgExecStateSvc.h:36
AvalancheSchedulerSvc::m_checkOutputIgnoreList
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
Definition: AvalancheSchedulerSvc.h:200
std::setw
T setw(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
std::max
T max(T... args)
AlgsExecutionStates::sizeOfSubset
size_t sizeOfSubset(State state) const
Definition: AlgsExecutionStates.h:89
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:273
compareRootHistos.state
state
Definition: compareRootHistos.py:496
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:288
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:168
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:545
AvalancheSchedulerSvc::action
std::function< StatusCode()> action
Definition: AvalancheSchedulerSvc.h:161
AvalancheSchedulerSvc::dumpGraphFile
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Definition: AvalancheSchedulerSvc.cpp:1177
AlgsExecutionStates::algsInState
const boost::container::flat_set< int > algsInState(State state) const
Definition: AlgsExecutionStates.h:83
std::unique_ptr< EventContext >
EventSlot::algsStates
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:85
Cause
Definition: PrecedenceRulesGraph.h:396
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:264
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:246
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:276
std::set
STL class.
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:261
GPUAvalancheSchedulerSimpleTest.threads
threads
Definition: GPUAvalancheSchedulerSimpleTest.py:57
EventContext::evt
ContextEvt_t evt() const
Definition: EventContext.h:50
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:879
AvalancheSchedulerSvc::m_verboseSubSlots
Gaudi::Property< bool > m_verboseSubSlots
Definition: AvalancheSchedulerSvc.h:221
std::thread::join
T join(T... args)
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
PrepareBase.out
out
Definition: PrepareBase.py:20
ThreadPoolSvc
A service which initializes a TBB thread pool.
Definition: ThreadPoolSvc.h:38
gaudirun.callback
callback
Definition: gaudirun.py:202
std::chrono::system_clock::now
T now(T... args)