The Gaudi Framework  master (82fdf313)
Loading...
Searching...
No Matches
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11#pragma once
12
13// Local includes
14#include "AlgsExecutionStates.h"
15#include "EventSlot.h"
16#include "FiberManager.h"
17#include "PrecedenceSvc.h"
18
19// Framework include files
27#include <GaudiKernel/Service.h>
28
29// C++ include files
30#include <functional>
31#include <memory>
32#include <queue>
33#include <string>
34#include <string_view>
35#include <thread>
36#include <unordered_map>
37#include <vector>
38
39// External libs
40#include <tbb/concurrent_priority_queue.h>
41#include <tbb/concurrent_queue.h>
42#include <tbb/task_arena.h>
43
44class IAlgorithm;
45
46//---------------------------------------------------------------------------
47
113class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
114
115 friend class AlgTask;
116
117public:
119 using extends::extends;
120
122 StatusCode initialize() override;
123
125 StatusCode finalize() override;
126
128 StatusCode pushNewEvent( EventContext* eventContext ) override;
129
130 // Make multiple events available to the scheduler
131 StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
132
134 StatusCode popFinishedEvent( EventContext*& eventContext ) override;
135
137 StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
138
140 unsigned int freeSlots() override;
141
143 void dumpState() override;
144
146 virtual StatusCode scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
147 std::unique_ptr<EventContext> viewContext ) override;
148
152 virtual void recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) override;
153
154private:
155 StatusCode dumpGraphFile( const std::map<std::string, DataObjIDColl>& inDeps,
156 const std::map<std::string, DataObjIDColl>& outDeps ) const;
157
158private:
160 using action = std::function<StatusCode()>;
161
162 enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
163
164 // Occupancy snapshot data
165 std::chrono::duration<int64_t, std::milli> m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
166 std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now();
167 std::function<void( OccupancySnapshot )> m_snapshotCallback;
168
170 this, "ThreadPoolSize", -1,
171 "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
172 "all available hardware threads; -100 requests to bypass TBB executing "
173 "all algorithms in the scheduler's thread." };
175 this, "maxParallelismExtra", 0,
176 "Allows to add some extra threads to the maximum parallelism set in TBB"
177 "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
178 Gaudi::Property<std::string> m_whiteboardSvcName{ this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" };
180 this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" };
182 this, "SimulateExecution", false,
183 "Flag to perform single-pass simulation of execution flow before the actual execution" };
185 "The following modes are currently available: PCE, COD, DRE, E" };
186 Gaudi::Property<bool> m_dumpIntraEventDynamics{ this, "DumpIntraEventDynamics", false,
187 "Dump intra-event concurrency dynamics to csv file" };
189 this, "PreemptiveBlockingTasks", false,
190 "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
192 this, "NumOffloadThreads", 2,
193 "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
194 "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
195 Gaudi::Property<bool> m_checkDeps{ this, "CheckDependencies", false,
196 "Runtime check of Algorithm Input Data Dependencies" };
197 Gaudi::Property<bool> m_checkOutput{ this, "CheckOutputUsage", false,
198 "Runtime check of Algorithm Output Data usage" };
200 this,
201 "CheckOutputUsageIgnoreList",
202 {},
203 "Ignore outputs of the Algorithms of this name when doing the check",
204 "OrderedSet<std::string>" };
205
207 "Attribute unmet input dependencies to this DataLoader Algorithm" };
208
209 Gaudi::Property<bool> m_enableCondSvc{ this, "EnableConditions", false, "Enable ConditionsSvc" };
210
211 Gaudi::Property<bool> m_showDataDeps{ this, "ShowDataDependencies", true,
212 "Show the INPUT and OUTPUT data dependencies of Algorithms" };
213
214 Gaudi::Property<bool> m_showDataFlow{ this, "ShowDataFlow", false,
215 "Show the configuration of DataFlow between Algorithms" };
216
217 Gaudi::Property<bool> m_showControlFlow{ this, "ShowControlFlow", false,
218 "Show the configuration of all Algorithms and Sequences" };
219
220 Gaudi::Property<bool> m_verboseSubSlots{ this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" };
221
223 this, "DataDepsGraphFile", "",
224 "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
225 "Algorithms" };
226
228 this, "DataDepsGraphAlgPattern", ".*",
229 "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
230 "deps graph" };
231
233 this, "DataDepsGraphObjectPattern", ".*",
234 "Regex pattern for selecting desired input or output by their full key" };
235
236 // Utils and shortcuts ----------------------------------------------------
237
239 void activate();
240
243
245 std::atomic<ActivationState> m_isActive{ INACTIVE };
246
248 std::thread m_thread;
249
251 inline unsigned int algname2index( const std::string& algoname ) { return m_algname_index_map[algoname]; }
252
254 std::unordered_map<std::string, unsigned int> m_algname_index_map;
255
257 inline const std::string& index2algname( unsigned int index ) { return m_algname_vect[index]; }
258
260 std::vector<std::string> m_algname_vect;
261
264
267
269 std::vector<EventSlot> m_eventSlots;
270
272 std::atomic_int m_freeSlots{ 0 };
273
275 tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
276
279
282
284 unsigned int m_algosInFlight = 0;
285
287 unsigned int m_blockingAlgosInFlight = 0;
288
289 // States management ------------------------------------------------------
290
293
294 // Update algorithm state and, optionally, revise states of other downstream algorithms
295 StatusCode revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate = false );
296
298 struct TaskSpec;
300 StatusCode signoff( const TaskSpec& );
301
303 bool isStalled( const EventSlot& ) const;
305 void eventFailed( EventContext* eventContext );
306
308 void dumpSchedulerState( int iSlot );
309
310 // Algos Management -------------------------------------------------------
311
314
315 // Actions management -----------------------------------------------------
316
318 tbb::concurrent_bounded_queue<action> m_actionsQueue;
319
321 struct TaskSpec {
324 TaskSpec( IAlgorithm* algPtr, unsigned int algIndex, const std::string& algName, unsigned int algRank,
325 bool asynchronous, int slotIndex, EventContext* eventContext )
326 : algPtr( algPtr )
327 , algIndex( algIndex )
328 , algName( algName )
329 , algRank( algRank )
332 , contextPtr( eventContext ) {}
333
334 TaskSpec( const TaskSpec& ) = default;
336 TaskSpec& operator=( const TaskSpec& ) = delete;
338 TaskSpec( TaskSpec&& ) = default;
340 TaskSpec& operator=( TaskSpec&& ) = default;
341
342 IAlgorithm* algPtr{ nullptr };
343 unsigned int algIndex{ 0 };
344 std::string_view algName;
345 unsigned int algRank{ 0 };
346 bool asynchronous{ false };
347 int slotIndex{ 0 };
349 };
350
353 bool operator()( const TaskSpec& i, const TaskSpec& j ) const { return ( i.algRank < j.algRank ); }
354 };
355
357 tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledQueue;
358 tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledAsynchronousQueue;
359 std::queue<TaskSpec> m_retryQueue;
360
361 // Prompt the scheduler to call updateStates
362 std::atomic<bool> m_needsUpdate{ true };
363
364 // ------------------------------------------------------------------------
365
366 // Service for thread pool initialization
368 tbb::task_arena* m_arena{ nullptr };
369 std::unique_ptr<FiberManager> m_fiberManager{ nullptr };
370
373
374public:
375 // get next schedule-able TaskSpec
376 bool next( TaskSpec& ts, bool asynchronous ) {
377 if ( asynchronous ) { return m_scheduledAsynchronousQueue.try_pop( ts ); }
378 return m_scheduledQueue.try_pop( ts );
379 }
380};
State
Execution states of the algorithms Must have contiguous integer values 0, 1... N.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
bool next(TaskSpec &ts, bool asynchronous)
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class represents an entry point to all the event specific data.
Implementation of property with value of concrete type.
Definition PropertyFwd.h:27
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
Base class used to extend a class implementing other interfaces.
Definition extends.h:19
Comparison operator to sort the queues.
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Struct to hold entries in the alg queues.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
TaskSpec(TaskSpec &&)=default
Move constructor.
Class representing an event slot.
Definition EventSlot.h:23