Go to the documentation of this file.
   11 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H 
   12 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H 
   35 #include <string_view> 
   37 #include <unordered_map> 
   41 #include <tbb/concurrent_priority_queue.h> 
   42 #include <tbb/concurrent_queue.h> 
   43 #include <tbb/task_arena.h> 
  120   using extends::extends;
 
  171       this, 
"ThreadPoolSize", -1,
 
  172       "Size of the global thread pool initialised by TBB; a value of -1 requests to use" 
  173       "all available hardware threads; -100 requests to bypass TBB executing " 
  174       "all algorithms in the scheduler's thread." };
 
  176       this, 
"maxParallelismExtra", 0,
 
  177       "Allows to add some extra threads to the maximum parallelism set in TBB" 
  178       "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
 
  181       this, 
"MaxBlockingAlgosInFlight", 0, 
"Maximum allowed number of simultaneously running CPU-blocking algorithms" };
 
  183       this, 
"SimulateExecution", 
false,
 
  184       "Flag to perform single-pass simulation of execution flow before the actual execution" };
 
  186                                                    "The following modes are currently available: PCE, COD, DRE,  E" };
 
  188                                                   "Dump intra-event concurrency dynamics to csv file" };
 
  190       this, 
"PreemptiveBlockingTasks", 
false,
 
  191       "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
 
  193       this, 
"NumOffloadThreads", 2,
 
  194       "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged " 
  195       "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
 
  197                                      "Runtime check of Algorithm Input Data Dependencies" };
 
  199                                        "Runtime check of Algorithm Output Data usage" };
 
  202       "CheckOutputUsageIgnoreList",
 
  204       "Ignore outputs of the Algorithms of this name when doing the check",
 
  205       "OrderedSet<std::string>" };
 
  208                                                 "Attribute unmet input dependencies to this DataLoader Algorithm" };
 
  213                                         "Show the INPUT and OUTPUT data dependencies of Algorithms" };
 
  216                                         "Show the configuration of DataFlow between Algorithms" };
 
  219                                            "Show the configuration of all Algorithms and Sequences" };
 
  224       this, 
"DataDepsGraphFile", 
"",
 
  225       "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected " 
  229       this, 
"DataDepsGraphAlgPattern", 
".*",
 
  230       "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data " 
  234       this, 
"DataDepsGraphObjectPattern", 
".*",
 
  235       "Regex pattern for selecting desired input or output by their full key" };
 
  383 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H 
  
 
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< std::string > m_useDataLoader
Struct to hold entries in the alg queues.
StatusCode finalize() override
Finalise.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
Gaudi::Property< std::string > m_optimizationMode
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Class representing an event slot.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
std::chrono::system_clock::time_point m_lastSnapshot
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::unique_ptr< FiberManager > m_fiberManager
StatusCode schedule(TaskSpec &&)
Gaudi::Property< bool > m_showControlFlow
std::atomic< bool > m_needsUpdate
Gaudi::Property< bool > m_enableCondSvc
StatusCode deactivate()
Deactivate scheduler.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Gaudi::Property< int > m_numOffloadThreads
tbb::task_arena * m_arena
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< int > m_threadPoolSize
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
size_t m_maxEventsInFlight
TaskSpec()
Default constructor.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
Gaudi::Property< int > m_maxParallelismExtra
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< bool > m_checkOutput
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
Gaudi::Property< bool > m_simulateExecution
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Base class used to extend a class implementing other interfaces.
unsigned int freeSlots() override
Get free slots number.
Gaudi::Property< bool > m_showDataDeps
size_t m_maxAlgosInFlight
void dumpState() override
Dump scheduler state for all slots.
StatusCode initialize() override
Initialise.
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
bool next(TaskSpec &ts, bool asynchronous)
TaskSpec(TaskSpec &&)=default
Move constructor.
EventContext * contextPtr
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::queue< TaskSpec > m_retryQueue
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_dataDepsGraphFile
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
void activate()
Activate scheduler.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
Comparison operator to sort the queues.
std::thread m_thread
The thread in which the activate function runs.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Gaudi::Property< bool > m_showDataFlow
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
std::function< void(OccupancySnapshot)> m_snapshotCallback
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Gaudi::Property< bool > m_verboseSubSlots
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...