The Gaudi Framework  master (ba5b4fb7)
Loading...
Searching...
No Matches
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2026 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11#pragma once
12
13// Local includes
14#include "AlgsExecutionStates.h"
15#include "EventSlot.h"
16#include "FiberManager.h"
17#include "PrecedenceSvc.h"
18
19// Framework include files
27#include <GaudiKernel/Service.h>
28
29// C++ include files
30#include <functional>
31#include <memory>
32#include <queue>
33#include <string>
34#include <string_view>
35#include <thread>
36#include <unordered_map>
37#include <vector>
38
39// External libs
40#include <tbb/concurrent_priority_queue.h>
41#include <tbb/concurrent_queue.h>
42#include <tbb/task_arena.h>
43
44class IAlgorithm;
45
46//---------------------------------------------------------------------------
47
113class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
114
115 friend class AlgTask;
116
117public:
119 using extends::extends;
120
122 StatusCode initialize() override;
123
125 StatusCode finalize() override;
126
128 StatusCode pushNewEvent( EventContext* eventContext ) override;
129
130 // Make multiple events available to the scheduler
131 StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
132
134 StatusCode popFinishedEvent( EventContext*& eventContext ) override;
135
137 StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
138
140 unsigned int freeSlots() override;
141
143 void dumpState() override;
144
146 virtual StatusCode scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
147 std::unique_ptr<EventContext> viewContext ) override;
148
152 virtual void recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) override;
153
154private:
155 StatusCode dumpDataDepsGraphFile( const std::map<std::string, DataObjIDColl>& inDeps,
156 const std::map<std::string, DataObjIDColl>& outDeps ) const;
157
158private:
160 using action = std::function<StatusCode()>;
161
162 enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
163
164 // Occupancy snapshot data
165 std::chrono::duration<int64_t, std::milli> m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
166 std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now();
167 std::function<void( OccupancySnapshot )> m_snapshotCallback;
168
170 this, "ThreadPoolSize", -1,
171 "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
172 "all available hardware threads; -100 requests to bypass TBB executing "
173 "all algorithms in the scheduler's thread." };
175 this, "maxParallelismExtra", 0,
176 "Allows to add some extra threads to the maximum parallelism set in TBB"
177 "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
178 Gaudi::Property<std::string> m_whiteboardSvcName{ this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" };
180 this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" };
182 this, "SimulateExecution", false,
183 "Flag to perform single-pass simulation of execution flow before the actual execution" };
185 "The following modes are currently available: PCE, COD, DRE, E" };
186 Gaudi::Property<bool> m_dumpIntraEventDynamics{ this, "DumpIntraEventDynamics", false,
187 "Dump intra-event concurrency dynamics to csv file" };
189 this, "PreemptiveBlockingTasks", false,
190 "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
192 this, "NumOffloadThreads", 2,
193 "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
194 "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
195 Gaudi::Property<bool> m_checkDeps{ this, "CheckDependencies", false,
196 "Runtime check of Algorithm Input Data Dependencies" };
197 Gaudi::Property<bool> m_checkOutput{ this, "CheckOutputUsage", false,
198 "Runtime check of Algorithm Output Data usage" };
200 this,
201 "CheckOutputUsageIgnoreList",
202 {},
203 "Ignore outputs of the Algorithms of this name when doing the check",
204 "OrderedSet<std::string>" };
205
207 "Attribute unmet input dependencies to this DataLoader Algorithm" };
208
209 Gaudi::Property<bool> m_enableCondSvc{ this, "EnableConditions", false, "Enable ConditionsSvc" };
210
211 Gaudi::Property<bool> m_showDataDeps{ this, "ShowDataDependencies", true,
212 "Show the INPUT and OUTPUT data dependencies of Algorithms" };
213
214 Gaudi::Property<bool> m_showDataFlow{ this, "ShowDataFlow", false,
215 "Show the configuration of DataFlow between Algorithms" };
216
217 Gaudi::Property<bool> m_showControlFlow{ this, "ShowControlFlow", false,
218 "Show the configuration of all Algorithms and Sequences" };
219
220 Gaudi::Property<bool> m_verboseSubSlots{ this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" };
221
223 this, "DataDepsGraphFile", "",
224 "Name of the output file (.dot, .md or .graphml extensions allowed) containing the data dependency graph "
225 "for some selected Algorithms" };
226
228 this, "DataDepsGraphAlgPattern", ".*",
229 "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
230 "deps graph" };
231
233 this, "DataDepsGraphObjectPattern", ".*",
234 "Regex pattern for selecting desired input or output by their full key" };
235
236 // Utils and shortcuts ----------------------------------------------------
237
239 void activate();
240
243
245 std::atomic<ActivationState> m_isActive{ INACTIVE };
246
248 std::thread m_thread;
249
251 inline unsigned int algname2index( const std::string& algoname ) { return m_algname_index_map[algoname]; }
252
254 std::unordered_map<std::string, unsigned int> m_algname_index_map;
255
257 inline const std::string& index2algname( unsigned int index ) { return m_algname_vect[index]; }
258
260 std::vector<std::string> m_algname_vect;
261
264
267
269 std::vector<EventSlot> m_eventSlots;
270
272 std::atomic_int m_freeSlots{ 0 };
273
275 tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
276
279
282
284 unsigned int m_algosInFlight = 0;
285
287 unsigned int m_blockingAlgosInFlight = 0;
288
289 // States management ------------------------------------------------------
290
293
294 // Update algorithm state and, optionally, revise states of other downstream algorithms
295 StatusCode revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate = false );
296
298 struct TaskSpec;
300 StatusCode signoff( const TaskSpec& );
301
303 bool isStalled( const EventSlot& ) const;
305 void eventFailed( EventContext* eventContext );
306
308 void dumpSchedulerState( int iSlot );
309
310 // Algos Management -------------------------------------------------------
311
314
315 // Actions management -----------------------------------------------------
316
318 tbb::concurrent_bounded_queue<action> m_actionsQueue;
319
321 struct TaskSpec {
324 TaskSpec( IAlgorithm* algPtr, unsigned int algIndex, const std::string& algName, unsigned int algRank,
325 bool asynchronous, int slotIndex, EventContext* eventContext )
326 : algPtr( algPtr )
327 , algIndex( algIndex )
328 , algName( algName )
329 , algRank( algRank )
332 , contextPtr( eventContext ) {}
333
334 TaskSpec( const TaskSpec& ) = default;
336 TaskSpec& operator=( const TaskSpec& ) = delete;
338 TaskSpec( TaskSpec&& ) = default;
340 TaskSpec& operator=( TaskSpec&& ) = default;
341
342 IAlgorithm* algPtr{ nullptr };
343 unsigned int algIndex{ 0 };
344 std::string_view algName;
345 unsigned int algRank{ 0 };
346 bool asynchronous{ false };
347 int slotIndex{ 0 };
349 };
350
353 bool operator()( const TaskSpec& i, const TaskSpec& j ) const { return ( i.algRank < j.algRank ); }
354 };
355
357 tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledQueue;
358 tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledAsynchronousQueue;
359 std::queue<TaskSpec> m_retryQueue;
360
361 // Prompt the scheduler to call updateStates
362 std::atomic<bool> m_needsUpdate{ true };
363
364 // ------------------------------------------------------------------------
365
366 // Service for thread pool initialization
368 tbb::task_arena* m_arena{ nullptr };
369 std::unique_ptr<FiberManager> m_fiberManager{ nullptr };
370
373
374public:
375 // get next schedule-able TaskSpec
376 bool next( TaskSpec& ts, bool asynchronous ) {
377 if ( asynchronous ) { return m_scheduledAsynchronousQueue.try_pop( ts ); }
378 return m_scheduledQueue.try_pop( ts );
379 }
380};
State
Execution states of the algorithms Must have contiguous integer values 0, 1... N.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
bool next(TaskSpec &ts, bool asynchronous)
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode dumpDataDepsGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class represents an entry point to all the event specific data.
Implementation of property with value of concrete type.
Definition PropertyFwd.h:27
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition IAlgorithm.h:36
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
Base class used to extend a class implementing other interfaces.
Definition extends.h:19
Comparison operator to sort the queues.
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Struct to hold entries in the alg queues.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
TaskSpec(TaskSpec &&)=default
Move constructor.
Class representing an event slot.
Definition EventSlot.h:23