The Gaudi Framework  v32r0 (3325bb39)
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
2 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
3 
4 // Local includes
5 #include "AlgsExecutionStates.h"
6 #include "EventSlot.h"
7 #include "PrecedenceSvc.h"
8 
9 // Framework include files
13 #include "GaudiKernel/ICondSvc.h"
15 #include "GaudiKernel/IRunable.h"
16 #include "GaudiKernel/IScheduler.h"
18 #include "GaudiKernel/Service.h"
19 
20 // C++ include files
21 #include <functional>
22 #include <queue>
23 #include <string>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27 
28 // External libs
29 #include "tbb/concurrent_priority_queue.h"
30 #include "tbb/concurrent_queue.h"
31 #include "tbb/task.h"
32 
33 class IAlgorithm;
34 
35 //---------------------------------------------------------------------------
36 
102 class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
103  friend class AlgoExecutionTask;
104 
105 public:
107  using extends::extends;
108 
110  ~AvalancheSchedulerSvc() override = default;
111 
113  StatusCode initialize() override;
114 
116  StatusCode finalize() override;
117 
119  StatusCode pushNewEvent( EventContext* eventContext ) override;
120 
121  // Make multiple events available to the scheduler
122  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
123 
125  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
126 
128  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
129 
131  unsigned int freeSlots() override;
132 
134  virtual StatusCode scheduleEventView( const EventContext* sourceContext, const std::string& nodeName,
135  std::unique_ptr<EventContext> viewContext ) override;
136 
137 private:
140 
141  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
142 
144  this, "ThreadPoolSize", -1,
145  "Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"};
146  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
147  Gaudi::Property<std::string> m_IOBoundAlgSchedulerSvcName{this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"};
148  Gaudi::Property<unsigned int> m_maxIOBoundAlgosInFlight{this, "MaxIOBoundAlgosInFlight", 0,
149  "Maximum number of simultaneous I/O-bound algorithms"};
151  this, "SimulateExecution", false,
152  "Flag to perform single-pass simulation of execution flow before the actual execution"};
154  "The following modes are currently available: PCE, COD, DRE, E"};
155  Gaudi::Property<bool> m_dumpIntraEventDynamics{this, "DumpIntraEventDynamics", false,
156  "Dump intra-event concurrency dynamics to csv file"};
157  Gaudi::Property<bool> m_useIOBoundAlgScheduler{this, "PreemptiveIOBoundTasks", false,
158  "Turn on preemptive way of scheduling of I/O-bound algorithms"};
159 
160  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"};
161 
163  "Attribute unmet input dependencies to this DataLoader Algorithm"};
164 
165  Gaudi::Property<bool> m_enableCondSvc{this, "EnableConditions", false, "Enable ConditionsSvc"};
166 
167  Gaudi::Property<bool> m_showDataDeps{this, "ShowDataDependencies", true,
168  "Show the INPUT and OUTPUT data dependencies of Algorithms"};
169 
170  Gaudi::Property<bool> m_showDataFlow{this, "ShowDataFlow", false,
171  "Show the configuration of DataFlow between Algorithms"};
172 
173  Gaudi::Property<bool> m_showControlFlow{this, "ShowControlFlow", false,
174  "Show the configuration of all Algorithms and Sequences"};
175 
176  Gaudi::Property<bool> m_verboseSubSlots{this, "VerboseSubSlots", false, "Dump algorithm states for all sub-slots"};
177 
178  // Utils and shortcuts ----------------------------------------------------
179 
181  void activate();
182 
185 
188 
191 
193  inline unsigned int algname2index( const std::string& algoname ) { return m_algname_index_map[algoname]; };
194 
197 
199  inline const std::string& index2algname( unsigned int index ) { return m_algname_vect[index]; };
200 
203 
206 
209 
212 
215 
217  std::atomic_int m_freeSlots;
218 
220  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
221 
224 
227 
229  unsigned int m_algosInFlight = 0;
230 
232  unsigned int m_IOBoundAlgosInFlight = 0;
233 
234  // States management ------------------------------------------------------
235 
238  StatusCode updateStates( int si = -1, int algo_index = -1, int sub_slot = -1, int source_slot = -1 );
239 
240  // Update algorithm state in the appropriate event slot
241  StatusCode setAlgState( unsigned int iAlgo, EventContext* contextPtr, AState state );
242 
244  StatusCode enqueue( unsigned int iAlgo, int si, EventContext* );
245  StatusCode promoteToAsyncScheduled( unsigned int iAlgo, int si, EventContext* ); // tests of an asynchronous scheduler
246  StatusCode promoteToExecuted( unsigned int iAlgo, int si, EventContext* );
247  StatusCode promoteToAsyncExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
248  EventContext* ); // tests of an asynchronous scheduler
249  StatusCode promoteToFinished( unsigned int iAlgo, int si );
250 
252  bool isStalled( const EventSlot& ) const;
254  void eventFailed( EventContext* eventContext );
255 
257  void dumpSchedulerState( int iSlot );
258 
259  // Algos Management -------------------------------------------------------
260 
263 
264  // Actions management -----------------------------------------------------
265 
267  tbb::concurrent_bounded_queue<action> m_actionsQueue;
269  // (accessed/modified from the control thread only)
271 
273  struct algQueueEntry {
274  unsigned int algIndex;
277  unsigned int rank;
279  };
280 
282  struct algQueueSort {
283  bool operator()( const algQueueEntry& i, const algQueueEntry& j ) const { return ( i.rank < j.rank ); }
284  };
285 
287  tbb::concurrent_priority_queue<algQueueEntry, algQueueSort> m_scheduledQueue;
289 
290  // ------------------------------------------------------------------------
291 
292  // Service for thread pool initialization
296 };
297 
298 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
Gaudi::Property< bool > m_showDataFlow
bool operator()(const algQueueEntry &i, const algQueueEntry &j) const
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Class representing an event slot.
Definition: EventSlot.h:14
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
unsigned int algIndex
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
Implementation of property with value of concrete type.
Definition: Property.h:352
Gaudi::Property< bool > m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Gaudi::Property< bool > m_showDataDeps
int slotIndex
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
EventContext * contextPtr
Struct to hold entries in the alg queues.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode setAlgState(unsigned int iAlgo, EventContext *contextPtr, AState state)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::queue< algQueueEntry > m_retryQueue
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
unsigned int m_algosInFlight
Number of algorithms presently in flight.
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
StatusCode finalize() override
Finalise.
~AvalancheSchedulerSvc() override=default
Destructor.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
State
Execution states of the algorithms.
Comparison operator to sort the queues.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
StatusCode promoteToFinished(unsigned int iAlgo, int si)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
STL class.
Gaudi::Property< bool > m_simulateExecution
IAlgorithm * algPtr
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
Gaudi::Property< bool > m_enableCondSvc
StatusCode enqueue(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
unsigned int rank
tbb::concurrent_priority_queue< algQueueEntry, algQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
std::thread m_thread
The thread in which the activate function runs.