The Gaudi Framework  v36r7 (7f57a304)
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
12 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
13 
14 // Local includes
15 #include "AlgsExecutionStates.h"
16 #include "EventSlot.h"
17 #include "PrecedenceSvc.h"
18 
19 // Framework include files
22 #include "GaudiKernel/ICondSvc.h"
24 #include "GaudiKernel/IRunable.h"
25 #include "GaudiKernel/IScheduler.h"
27 #include "GaudiKernel/Service.h"
28 
29 // C++ include files
30 #include <functional>
31 #include <queue>
32 #include <string>
33 #include <string_view>
34 #include <thread>
35 #include <unordered_map>
36 #include <vector>
37 
38 // External libs
39 #include "tbb/concurrent_priority_queue.h"
40 #include "tbb/concurrent_queue.h"
41 #include "tbb/task_arena.h"
42 
43 class IAlgorithm;
44 
45 //---------------------------------------------------------------------------
46 
112 class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
113 
114  friend class AlgTask;
115 
116 public:
118  using extends::extends;
119 
122  ~AvalancheSchedulerSvc() noexcept override {}
123 
125  StatusCode initialize() override;
126 
128  StatusCode finalize() override;
129 
131  StatusCode pushNewEvent( EventContext* eventContext ) override;
132 
133  // Make multiple events available to the scheduler
134  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
135 
137  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
138 
140  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
141 
143  unsigned int freeSlots() override;
144 
146  virtual StatusCode scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
147  std::unique_ptr<EventContext> viewContext ) override;
148 
152  virtual void recordOccupancy( int samplePeriod, std::function<void( OccupancySnapshot )> callback ) override;
153 
154 private:
157 
158  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
159 
160  // Occupancy snapshot data
162  std::chrono::system_clock::time_point m_lastSnapshot = std::chrono::system_clock::now();
163  std::function<void( OccupancySnapshot )> m_snapshotCallback;
164 
166  this, "ThreadPoolSize", -1,
167  "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
168  "all available hardware threads; -100 requests to bypass TBB executing "
169  "all algorithms in the scheduler's thread." };
170  Gaudi::Property<std::string> m_whiteboardSvcName{ this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name" };
172  this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms" };
174  this, "SimulateExecution", false,
175  "Flag to perform single-pass simulation of execution flow before the actual execution" };
177  "The following modes are currently available: PCE, COD, DRE, E" };
178  Gaudi::Property<bool> m_dumpIntraEventDynamics{ this, "DumpIntraEventDynamics", false,
179  "Dump intra-event concurrency dynamics to csv file" };
181  this, "PreemptiveBlockingTasks", false,
182  "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
183  Gaudi::Property<bool> m_checkDeps{ this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies" };
184 
185  Gaudi::Property<std::string> m_useDataLoader{ this, "DataLoaderAlg", "",
186  "Attribute unmet input dependencies to this DataLoader Algorithm" };
187 
188  Gaudi::Property<bool> m_enableCondSvc{ this, "EnableConditions", false, "Enable ConditionsSvc" };
189 
190  Gaudi::Property<bool> m_showDataDeps{ this, "ShowDataDependencies", true,
191  "Show the INPUT and OUTPUT data dependencies of Algorithms" };
192 
193  Gaudi::Property<bool> m_showDataFlow{ this, "ShowDataFlow", false,
194  "Show the configuration of DataFlow between Algorithms" };
195 
196  Gaudi::Property<bool> m_showControlFlow{ this, "ShowControlFlow", false,
197  "Show the configuration of all Algorithms and Sequences" };
198 
199  Gaudi::Property<bool> m_verboseSubSlots{ this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots" };
200 
201  // Utils and shortcuts ----------------------------------------------------
202 
204  void activate();
205 
208 
211 
214 
216  inline unsigned int algname2index( const std::string& algoname ) { return m_algname_index_map[algoname]; };
217 
220 
222  inline const std::string& index2algname( unsigned int index ) { return m_algname_vect[index]; };
223 
226 
229 
232 
235 
237  std::atomic_int m_freeSlots{ 0 };
238 
240  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
241 
244 
247 
249  unsigned int m_algosInFlight = 0;
250 
252  unsigned int m_blockingAlgosInFlight = 0;
253 
254  // States management ------------------------------------------------------
255 
258 
259  // Update algorithm state and, optionally, revise states of other downstream algorithms
260  StatusCode revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate = false );
261 
263  struct TaskSpec;
265  StatusCode signoff( const TaskSpec& );
266 
268  bool isStalled( const EventSlot& ) const;
270  void eventFailed( EventContext* eventContext );
271 
273  void dumpSchedulerState( int iSlot );
274 
275  // Algos Management -------------------------------------------------------
276 
279 
280  // Actions management -----------------------------------------------------
281 
283  tbb::concurrent_bounded_queue<action> m_actionsQueue;
284 
286  struct TaskSpec {
288  TaskSpec(){};
289  TaskSpec( IAlgorithm* algPtr, unsigned int algIndex, const std::string& algName, unsigned int algRank,
290  bool blocking, int slotIndex, EventContext* eventContext )
291  : algPtr( algPtr )
292  , algIndex( algIndex )
293  , algName( algName )
294  , algRank( algRank )
295  , blocking( blocking )
296  , slotIndex( slotIndex )
297  , contextPtr( eventContext ){};
299  TaskSpec( const TaskSpec& ) = default;
301  TaskSpec& operator=( const TaskSpec& ) = delete;
303  TaskSpec( TaskSpec&& ) = default;
305  TaskSpec& operator=( TaskSpec&& ) = default;
306 
307  IAlgorithm* algPtr{ nullptr };
308  unsigned int algIndex{ 0 };
309  std::string_view algName;
310  unsigned int algRank{ 0 };
311  bool blocking{ false };
312  int slotIndex{ 0 };
314  };
315 
317  struct AlgQueueSort {
318  bool operator()( const TaskSpec& i, const TaskSpec& j ) const { return ( i.algRank < j.algRank ); }
319  };
320 
322  tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledQueue;
323  tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledBlockingQueue;
325 
326  // Prompt the scheduler to call updateStates
328 
329  // ------------------------------------------------------------------------
330 
331  // Service for thread pool initialization
333  tbb::task_arena* m_arena{ nullptr };
334  size_t m_maxEventsInFlight{ 0 };
335  size_t m_maxAlgosInFlight{ 1 };
336 
337 public:
338  // get next schedule-able TaskSpec
339  bool next( TaskSpec& ts, bool blocking = false ) {
340  return blocking ? m_scheduledBlockingQueue.try_pop( ts ) : m_scheduledQueue.try_pop( ts );
341  };
342 };
343 
344 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
AvalancheSchedulerSvc::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
Definition: AvalancheSchedulerSvc.h:231
AvalancheSchedulerSvc::TaskSpec::algPtr
IAlgorithm * algPtr
Definition: AvalancheSchedulerSvc.h:307
IAlgResourcePool.h
AvalancheSchedulerSvc::m_useDataLoader
Gaudi::Property< std::string > m_useDataLoader
Definition: AvalancheSchedulerSvc.h:185
std::string
STL class.
AvalancheSchedulerSvc::TaskSpec
Struct to hold entries in the alg queues.
Definition: AvalancheSchedulerSvc.h:286
AvalancheSchedulerSvc::finalize
StatusCode finalize() override
Finalise.
Definition: AvalancheSchedulerSvc.cpp:356
AvalancheSchedulerSvc::TaskSpec::operator=
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
AvalancheSchedulerSvc::m_optimizationMode
Gaudi::Property< std::string > m_optimizationMode
Definition: AvalancheSchedulerSvc.h:176
AvalancheSchedulerSvc::TaskSpec::algRank
unsigned int algRank
Definition: AvalancheSchedulerSvc.h:310
AvalancheSchedulerSvc::ACTIVE
@ ACTIVE
Definition: AvalancheSchedulerSvc.h:158
EventSlot.h
std::vector
STL class.
AvalancheSchedulerSvc::iterate
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Definition: AvalancheSchedulerSvc.cpp:591
EventSlot
Class representing an event slot.
Definition: EventSlot.h:24
std::chrono::duration< int64_t, std::milli >
AvalancheSchedulerSvc::m_lastSnapshot
std::chrono::system_clock::time_point m_lastSnapshot
Definition: AvalancheSchedulerSvc.h:162
std::function
AvalancheSchedulerSvc::m_scheduledQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Definition: AvalancheSchedulerSvc.h:322
AvalancheSchedulerSvc::schedule
StatusCode schedule(TaskSpec &&)
Definition: AvalancheSchedulerSvc.cpp:945
AvalancheSchedulerSvc::ActivationState
ActivationState
Definition: AvalancheSchedulerSvc.h:158
AvalancheSchedulerSvc::~AvalancheSchedulerSvc
~AvalancheSchedulerSvc() noexcept override
Destructor.
Definition: AvalancheSchedulerSvc.h:122
AvalancheSchedulerSvc::m_showControlFlow
Gaudi::Property< bool > m_showControlFlow
Definition: AvalancheSchedulerSvc.h:196
compareRootHistos.ts
tuple ts
Definition: compareRootHistos.py:492
AvalancheSchedulerSvc::m_needsUpdate
std::atomic< bool > m_needsUpdate
Definition: AvalancheSchedulerSvc.h:327
std::queue
STL class.
AlgsExecutionStates.h
AvalancheSchedulerSvc::m_enableCondSvc
Gaudi::Property< bool > m_enableCondSvc
Definition: AvalancheSchedulerSvc.h:188
AvalancheSchedulerSvc::deactivate
StatusCode deactivate()
Deactivate scheduler.
Definition: AvalancheSchedulerSvc.cpp:444
AvalancheSchedulerSvc::m_eventSlots
std::vector< EventSlot > m_eventSlots
Vector of events slots.
Definition: AvalancheSchedulerSvc.h:234
AvalancheSchedulerSvc::m_arena
tbb::task_arena * m_arena
Definition: AvalancheSchedulerSvc.h:333
AvalancheSchedulerSvc::m_algExecStateSvc
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Definition: AvalancheSchedulerSvc.h:243
AvalancheSchedulerSvc::TaskSpec::algName
std::string_view algName
Definition: AvalancheSchedulerSvc.h:309
AvalancheSchedulerSvc::FAILURE
@ FAILURE
Definition: AvalancheSchedulerSvc.h:158
AvalancheSchedulerSvc::m_condSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
Definition: AvalancheSchedulerSvc.h:246
AvalancheSchedulerSvc::eventFailed
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Definition: AvalancheSchedulerSvc.cpp:784
AvalancheSchedulerSvc::m_threadPoolSize
Gaudi::Property< int > m_threadPoolSize
Definition: AvalancheSchedulerSvc.h:165
IScheduler.h
AvalancheSchedulerSvc::m_maxEventsInFlight
size_t m_maxEventsInFlight
Definition: AvalancheSchedulerSvc.h:334
AvalancheSchedulerSvc::TaskSpec::TaskSpec
TaskSpec()
Default constructor.
Definition: AvalancheSchedulerSvc.h:288
AvalancheSchedulerSvc::m_maxBlockingAlgosInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
Definition: AvalancheSchedulerSvc.h:171
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
AvalancheSchedulerSvc::TaskSpec::operator=
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
PrecedenceSvc.h
AvalancheSchedulerSvc::TaskSpec::blocking
bool blocking
Definition: AvalancheSchedulerSvc.h:311
IAlgorithm
Definition: IAlgorithm.h:38
AvalancheSchedulerSvc::AlgQueueSort::operator()
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Definition: AvalancheSchedulerSvc.h:318
AvalancheSchedulerSvc::m_enablePreemptiveBlockingTasks
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
Definition: AvalancheSchedulerSvc.h:180
AvalancheSchedulerSvc::m_whiteboardSvcName
Gaudi::Property< std::string > m_whiteboardSvcName
Definition: AvalancheSchedulerSvc.h:170
AvalancheSchedulerSvc
Definition: AvalancheSchedulerSvc.h:112
AvalancheSchedulerSvc::TaskSpec::TaskSpec
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
AvalancheSchedulerSvc::m_simulateExecution
Gaudi::Property< bool > m_simulateExecution
Definition: AvalancheSchedulerSvc.h:173
AvalancheSchedulerSvc::m_scheduledBlockingQueue
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledBlockingQueue
Definition: AvalancheSchedulerSvc.h:323
AvalancheSchedulerSvc::recordOccupancy
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
Definition: AvalancheSchedulerSvc.cpp:1095
AvalancheSchedulerSvc::index2algname
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Definition: AvalancheSchedulerSvc.h:222
AvalancheSchedulerSvc::INACTIVE
@ INACTIVE
Definition: AvalancheSchedulerSvc.h:158
SmartIF< IPrecedenceSvc >
IHiveWhiteBoard.h
AvalancheSchedulerSvc::m_algosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:249
std::atomic< ActivationState >
AvalancheSchedulerSvc::tryPopFinishedEvent
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Definition: AvalancheSchedulerSvc.cpp:571
AvalancheSchedulerSvc::scheduleEventView
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Definition: AvalancheSchedulerSvc.cpp:1055
AvalancheSchedulerSvc::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Definition: AvalancheSchedulerSvc.h:278
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
AvalancheSchedulerSvc::freeSlots
unsigned int freeSlots() override
Get free slots number.
Definition: AvalancheSchedulerSvc.cpp:544
AvalancheSchedulerSvc::m_showDataDeps
Gaudi::Property< bool > m_showDataDeps
Definition: AvalancheSchedulerSvc.h:190
AvalancheSchedulerSvc::m_maxAlgosInFlight
size_t m_maxAlgosInFlight
Definition: AvalancheSchedulerSvc.h:335
AvalancheSchedulerSvc::initialize
StatusCode initialize() override
Initialise.
Definition: AvalancheSchedulerSvc.cpp:74
Service.h
std::chrono::duration::min
T min(T... args)
AvalancheSchedulerSvc::TaskSpec::TaskSpec
TaskSpec(TaskSpec &&)=default
Move constructor.
AvalancheSchedulerSvc::TaskSpec::contextPtr
EventContext * contextPtr
Definition: AvalancheSchedulerSvc.h:313
AvalancheSchedulerSvc::m_dumpIntraEventDynamics
Gaudi::Property< bool > m_dumpIntraEventDynamics
Definition: AvalancheSchedulerSvc.h:178
AvalancheSchedulerSvc::m_retryQueue
std::queue< TaskSpec > m_retryQueue
Definition: AvalancheSchedulerSvc.h:324
IRunable.h
compareRootHistos.state
def state
Definition: compareRootHistos.py:500
AvalancheSchedulerSvc::m_snapshotInterval
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Definition: AvalancheSchedulerSvc.h:161
AvalancheSchedulerSvc::m_threadPoolSvc
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Definition: AvalancheSchedulerSvc.h:332
EventContext
Definition: EventContext.h:34
AlgsExecutionStates::State
State
Execution states of the algorithms Must have contiguous integer values 0, 1...
Definition: AlgsExecutionStates.h:42
AvalancheSchedulerSvc::revise
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
Definition: AvalancheSchedulerSvc.cpp:725
AvalancheSchedulerSvc::activate
void activate()
Activate scheduler.
Definition: AvalancheSchedulerSvc.cpp:387
AvalancheSchedulerSvc::TaskSpec::TaskSpec
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool blocking, int slotIndex, EventContext *eventContext)
Definition: AvalancheSchedulerSvc.h:289
AvalancheSchedulerSvc::m_actionsQueue
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
Definition: AvalancheSchedulerSvc.h:283
AvalancheSchedulerSvc::m_algname_index_map
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Definition: AvalancheSchedulerSvc.h:216
AvalancheSchedulerSvc::m_checkDeps
Gaudi::Property< bool > m_checkDeps
Definition: AvalancheSchedulerSvc.h:183
AvalancheSchedulerSvc::isStalled
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Definition: AvalancheSchedulerSvc.cpp:766
AvalancheSchedulerSvc::algname2index
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
Definition: AvalancheSchedulerSvc.h:216
AlgTask
Definition: AlgTask.h:36
AvalancheSchedulerSvc::AlgQueueSort
Comparison operator to sort the queues.
Definition: AvalancheSchedulerSvc.h:317
AvalancheSchedulerSvc::m_thread
std::thread m_thread
The thread in which the activate function runs.
Definition: AvalancheSchedulerSvc.h:213
AvalancheSchedulerSvc::pushNewEvents
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Definition: AvalancheSchedulerSvc.cpp:533
AvalancheSchedulerSvc::m_showDataFlow
Gaudi::Property< bool > m_showDataFlow
Definition: AvalancheSchedulerSvc.h:193
AvalancheSchedulerSvc::signoff
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
Definition: AvalancheSchedulerSvc.cpp:1021
AvalancheSchedulerSvc::m_freeSlots
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
Definition: AvalancheSchedulerSvc.h:237
AvalancheSchedulerSvc::m_blockingAlgosInFlight
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Definition: AvalancheSchedulerSvc.h:252
IAlgExecStateSvc.h
AvalancheSchedulerSvc::TaskSpec::algIndex
unsigned int algIndex
Definition: AvalancheSchedulerSvc.h:308
AvalancheSchedulerSvc::m_snapshotCallback
std::function< void(OccupancySnapshot)> m_snapshotCallback
Definition: AvalancheSchedulerSvc.h:163
AvalancheSchedulerSvc::pushNewEvent
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Definition: AvalancheSchedulerSvc.cpp:475
AvalancheSchedulerSvc::popFinishedEvent
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Definition: AvalancheSchedulerSvc.cpp:550
AvalancheSchedulerSvc::next
bool next(TaskSpec &ts, bool blocking=false)
Definition: AvalancheSchedulerSvc.h:339
std::unique_ptr< EventContext >
std::unordered_map< std::string, unsigned int >
AvalancheSchedulerSvc::m_precSvc
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Definition: AvalancheSchedulerSvc.h:228
AvalancheSchedulerSvc::m_isActive
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Definition: AvalancheSchedulerSvc.h:210
AvalancheSchedulerSvc::m_finishedEvents
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
Definition: AvalancheSchedulerSvc.h:240
AvalancheSchedulerSvc::m_algname_vect
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Definition: AvalancheSchedulerSvc.h:222
ICondSvc.h
AvalancheSchedulerSvc::dumpSchedulerState
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Definition: AvalancheSchedulerSvc.cpp:805
Gaudi::Property< int >
AvalancheSchedulerSvc::m_verboseSubSlots
Gaudi::Property< bool > m_verboseSubSlots
Definition: AvalancheSchedulerSvc.h:199
AvalancheSchedulerSvc::TaskSpec::slotIndex
int slotIndex
Definition: AvalancheSchedulerSvc.h:312
IThreadPoolSvc.h
gaudirun.callback
callback
Definition: gaudirun.py:202
std::chrono::system_clock::now
T now(T... args)