The Gaudi Framework  v33r2 (a6f0ec87)
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
12 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
13 
14 // Local includes
15 #include "AlgsExecutionStates.h"
16 #include "EventSlot.h"
17 #include "PrecedenceSvc.h"
18 
19 // Framework include files
23 #include "GaudiKernel/ICondSvc.h"
25 #include "GaudiKernel/IRunable.h"
26 #include "GaudiKernel/IScheduler.h"
28 #include "GaudiKernel/Service.h"
29 
30 // C++ include files
31 #include <functional>
32 #include <queue>
33 #include <string>
34 #include <string_view>
35 #include <thread>
36 #include <unordered_map>
37 #include <vector>
38 
39 // External libs
40 #include "tbb/concurrent_priority_queue.h"
41 #include "tbb/concurrent_queue.h"
42 #include "tbb/task.h"
43 
44 class IAlgorithm;
45 
46 //---------------------------------------------------------------------------
47 
113 class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
114 
115  template <class T>
116  friend class AlgTask;
117 
118 public:
120  using extends::extends;
121 
123  ~AvalancheSchedulerSvc() override = default;
124 
126  StatusCode initialize() override;
127 
129  StatusCode finalize() override;
130 
132  StatusCode pushNewEvent( EventContext* eventContext ) override;
133 
134  // Make multiple events available to the scheduler
135  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
136 
138  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
139 
141  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
142 
144  unsigned int freeSlots() override;
145 
147  virtual StatusCode scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
148  std::unique_ptr<EventContext> viewContext ) override;
149 
150 private:
153 
154  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
155 
157  this, "ThreadPoolSize", -1,
158  "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
159  "all available hardware threads; -100 requests to bypass TBB executing "
160  "all algorithms in the scheduler's thread."};
161  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
163  this, "MaxBlockingAlgosInFlight", 0, "Maximum allowed number of simultaneously running CPU-blocking algorithms"};
165  this, "SimulateExecution", false,
166  "Flag to perform single-pass simulation of execution flow before the actual execution"};
168  "The following modes are currently available: PCE, COD, DRE, E"};
169  Gaudi::Property<bool> m_dumpIntraEventDynamics{this, "DumpIntraEventDynamics", false,
170  "Dump intra-event concurrency dynamics to csv file"};
172  this, "PreemptiveBlockingTasks", false,
173  "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly."};
174 
175  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"};
176 
178  "Attribute unmet input dependencies to this DataLoader Algorithm"};
179 
180  Gaudi::Property<bool> m_enableCondSvc{this, "EnableConditions", false, "Enable ConditionsSvc"};
181 
182  Gaudi::Property<bool> m_showDataDeps{this, "ShowDataDependencies", true,
183  "Show the INPUT and OUTPUT data dependencies of Algorithms"};
184 
185  Gaudi::Property<bool> m_showDataFlow{this, "ShowDataFlow", false,
186  "Show the configuration of DataFlow between Algorithms"};
187 
188  Gaudi::Property<bool> m_showControlFlow{this, "ShowControlFlow", false,
189  "Show the configuration of all Algorithms and Sequences"};
190 
191  Gaudi::Property<bool> m_verboseSubSlots{this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"};
192 
193  // Utils and shortcuts ----------------------------------------------------
194 
196  void activate();
197 
200 
203 
206 
208  inline unsigned int algname2index( const std::string& algoname ) { return m_algname_index_map[algoname]; };
209 
212 
214  inline const std::string& index2algname( unsigned int index ) { return m_algname_vect[index]; };
215 
218 
221 
224 
227 
229  std::atomic_int m_freeSlots;
230 
232  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
233 
236 
239 
241  unsigned int m_algosInFlight = 0;
242 
244  unsigned int m_blockingAlgosInFlight = 0;
245 
246  // States management ------------------------------------------------------
247 
250 
251  // Update algorithm state and, optionally, revise states of other downstream algorithms
252  StatusCode revise( unsigned int iAlgo, EventContext* contextPtr, AState state, bool iterate = false );
253 
255  struct TaskSpec;
257  StatusCode signoff( const TaskSpec& );
258 
260  bool isStalled( const EventSlot& ) const;
262  void eventFailed( EventContext* eventContext );
263 
265  void dumpSchedulerState( int iSlot );
266 
267  // Algos Management -------------------------------------------------------
268 
271 
272  // Actions management -----------------------------------------------------
273 
275  tbb::concurrent_bounded_queue<action> m_actionsQueue;
276 
278  struct TaskSpec {
280  TaskSpec(){};
281  TaskSpec( IAlgorithm* algPtr, unsigned int algIndex, const std::string& algName, unsigned int algRank,
282  bool blocking, int slotIndex, EventContext* eventContext )
283  : algPtr( algPtr )
284  , algIndex( algIndex )
285  , algName( algName )
286  , algRank( algRank )
287  , blocking( blocking )
288  , slotIndex( slotIndex )
289  , contextPtr( eventContext ){};
291  TaskSpec( const TaskSpec& ) = default;
293  TaskSpec& operator=( const TaskSpec& ) = delete;
295  TaskSpec( TaskSpec&& ) = default;
297  TaskSpec& operator=( TaskSpec&& ) = default;
298 
299  IAlgorithm* algPtr{nullptr};
300  unsigned int algIndex{0};
301  std::string_view algName;
302  unsigned int algRank{0};
303  bool blocking{false};
304  int slotIndex{0};
306  };
307 
309  struct AlgQueueSort {
310  bool operator()( const TaskSpec& i, const TaskSpec& j ) const { return ( i.algRank < j.algRank ); }
311  };
312 
314  tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledQueue;
316 
317  // Prompt the scheduler to call updateStates
319 
320  // ------------------------------------------------------------------------
321 
322  // Service for thread pool initialization
326 };
327 
328 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
Gaudi::Property< bool > m_showDataFlow
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Struct to hold entries in the alg queues.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Class representing an event slot.
Definition: EventSlot.h:24
Gaudi::Property< std::string > m_whiteboardSvcName
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool blocking, int slotIndex, EventContext *eventContext)
Implementation of property with value of concrete type.
Definition: Property.h:370
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
std::queue< TaskSpec > m_retryQueue
std::atomic< bool > m_needsUpdate
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
This class represents an entry point to all the event specific data.
Definition: EventContext.h:34
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Comparison operator to sort the queues.
Gaudi::Property< bool > m_checkDeps
STL class.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
unsigned int m_algosInFlight
Number of algorithms presently in flight.
StatusCode schedule(TaskSpec &&)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
StatusCode finalize() override
Finalise.
~AvalancheSchedulerSvc() override=default
Destructor.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:38
State
Execution states of the algorithms.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
STL class.
STL class.
Gaudi::Property< bool > m_simulateExecution
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< bool > m_enableCondSvc
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
bool operator()(const TaskSpec &i, const TaskSpec &j) const
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::thread m_thread
The thread in which the activate function runs.