Go to the documentation of this file.
11 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
12 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
35 #include <string_view>
37 #include <unordered_map>
41 #include <tbb/concurrent_priority_queue.h>
42 #include <tbb/concurrent_queue.h>
43 #include <tbb/task_arena.h>
120 using extends::extends;
171 this,
"ThreadPoolSize", -1,
172 "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
173 "all available hardware threads; -100 requests to bypass TBB executing "
174 "all algorithms in the scheduler's thread." };
176 this,
"maxParallelismExtra", 0,
177 "Allows to add some extra threads to the maximum parallelism set in TBB"
178 "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
181 this,
"MaxBlockingAlgosInFlight", 0,
"Maximum allowed number of simultaneously running CPU-blocking algorithms" };
183 this,
"SimulateExecution",
false,
184 "Flag to perform single-pass simulation of execution flow before the actual execution" };
186 "The following modes are currently available: PCE, COD, DRE, E" };
188 "Dump intra-event concurrency dynamics to csv file" };
190 this,
"PreemptiveBlockingTasks",
false,
191 "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
193 this,
"NumOffloadThreads", 2,
194 "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
195 "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
197 "Runtime check of Algorithm Input Data Dependencies" };
199 "Runtime check of Algorithm Output Data usage" };
202 "CheckOutputUsageIgnoreList",
204 "Ignore outputs of the Algorithms of this name when doing the check",
205 "OrderedSet<std::string>" };
208 "Attribute unmet input dependencies to this DataLoader Algorithm" };
213 "Show the INPUT and OUTPUT data dependencies of Algorithms" };
216 "Show the configuration of DataFlow between Algorithms" };
219 "Show the configuration of all Algorithms and Sequences" };
224 this,
"DataDepsGraphFile",
"",
225 "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
229 this,
"DataDepsGraphAlgPattern",
".*",
230 "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
234 this,
"DataDepsGraphObjectPattern",
".*",
235 "Regex pattern for selecting desired input or output by their full key" };
383 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Gaudi::Property< std::string > m_useDataLoader
Struct to hold entries in the alg queues.
StatusCode finalize() override
Finalise.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
Gaudi::Property< std::string > m_optimizationMode
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Class representing an event slot.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
std::chrono::system_clock::time_point m_lastSnapshot
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::unique_ptr< FiberManager > m_fiberManager
StatusCode schedule(TaskSpec &&)
Gaudi::Property< bool > m_showControlFlow
std::atomic< bool > m_needsUpdate
Gaudi::Property< bool > m_enableCondSvc
StatusCode deactivate()
Deactivate scheduler.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Gaudi::Property< int > m_numOffloadThreads
tbb::task_arena * m_arena
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< int > m_threadPoolSize
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
size_t m_maxEventsInFlight
TaskSpec()
Default constructor.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
Gaudi::Property< int > m_maxParallelismExtra
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< bool > m_checkOutput
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
Gaudi::Property< bool > m_simulateExecution
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Base class used to extend a class implementing other interfaces.
unsigned int freeSlots() override
Get free slots number.
Gaudi::Property< bool > m_showDataDeps
size_t m_maxAlgosInFlight
void dumpState() override
Dump scheduler state for all slots.
StatusCode initialize() override
Initialise.
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
bool next(TaskSpec &ts, bool asynchronous)
TaskSpec(TaskSpec &&)=default
Move constructor.
EventContext * contextPtr
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::queue< TaskSpec > m_retryQueue
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_dataDepsGraphFile
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
void activate()
Activate scheduler.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
Comparison operator to sort the queues.
std::thread m_thread
The thread in which the activate function runs.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Gaudi::Property< bool > m_showDataFlow
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
std::function< void(OccupancySnapshot)> m_snapshotCallback
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Gaudi::Property< bool > m_verboseSubSlots
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...