The Gaudi Framework  v29r0 (ff2e7097)
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
2 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
3 
4 // Local includes
5 #include "AlgsExecutionStates.h"
6 #include "EventSlot.h"
7 #include "PrecedenceSvc.h"
8 
9 // Framework include files
10 #include "GaudiKernel/Algorithm.h"
14 #include "GaudiKernel/ICondSvc.h"
16 #include "GaudiKernel/IRunable.h"
17 #include "GaudiKernel/IScheduler.h"
19 #include "GaudiKernel/Service.h"
20 
21 // C++ include files
22 #include <functional>
23 #include <string>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27 
28 // External libs
29 #include "tbb/concurrent_queue.h"
30 #include "tbb/task.h"
31 
34 
35 //---------------------------------------------------------------------------
36 
102 class AvalancheSchedulerSvc : public extends<Service, IScheduler>
103 {
104 public:
106  using extends::extends;
107 
109  ~AvalancheSchedulerSvc() override = default;
110 
112  StatusCode initialize() override;
113 
115  StatusCode finalize() override;
116 
118  StatusCode pushNewEvent( EventContext* eventContext ) override;
119 
120  // Make multiple events available to the scheduler
121  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
122 
124  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
125 
127  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
128 
130  unsigned int freeSlots() override;
131 
132 private:
133  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
134 
136  this, "ThreadPoolSize", -1,
137  "Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"};
138  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
139  Gaudi::Property<std::string> m_IOBoundAlgSchedulerSvcName{this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"};
140  Gaudi::Property<unsigned int> m_maxIOBoundAlgosInFlight{this, "MaxIOBoundAlgosInFlight", 0,
141  "Maximum number of simultaneous I/O-bound algorithms"};
143  this, "SimulateExecution", false,
144  "Flag to perform single-pass simulation of execution flow before the actual execution"};
146  "The following modes are currently available: PCE, COD, DRE, E"};
147  Gaudi::Property<bool> m_dumpIntraEventDynamics{this, "DumpIntraEventDynamics", false,
148  "Dump intra-event concurrency dynamics to csv file"};
149  Gaudi::Property<bool> m_useIOBoundAlgScheduler{this, "PreemptiveIOBoundTasks", false,
150  "Turn on preemptive way of scheduling of I/O-bound algorithms"};
151 
152  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"};
153 
155  "Attribute unmet input dependencies to this DataLoader Algorithm"};
156 
157  Gaudi::Property<bool> m_enableCondSvc{this, "EnableConditions", false, "Enable ConditionsSvc"};
158 
159  Gaudi::Property<bool> m_showDataDeps{this, "ShowDataDependencies", true,
160  "Show the INPUT and OUTPUT data dependencies of Algorithms"};
161 
162  Gaudi::Property<bool> m_showDataFlow{this, "ShowDataFlow", false,
163  "Show the configuration of DataFlow between Algorithms"};
164 
165  Gaudi::Property<bool> m_showControlFlow{this, "ShowControlFlow", false,
166  "Show the configuration of all Algorithms and Sequences"};
167 
168  // Utils and shortcuts ----------------------------------------------------
169 
171  void activate();
172 
175 
178 
181 
183  inline unsigned int algname2index( const std::string& algoname );
184 
187 
189  inline const std::string& index2algname( unsigned int index );
190 
193 
196 
199 
202 
205 
207  std::atomic_int m_freeSlots;
208 
210  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
211 
213  StatusCode eventFailed( EventContext* eventContext );
214 
217 
220 
221  // States management ------------------------------------------------------
222 
224  unsigned int m_algosInFlight = 0;
225 
227  unsigned int m_IOBoundAlgosInFlight = 0;
228 
231  StatusCode updateStates( int si = -1, const std::string& algo_name = std::string() );
232 
234  StatusCode promoteToScheduled( unsigned int iAlgo, int si );
235  StatusCode promoteToAsyncScheduled( unsigned int iAlgo, int si ); // tests of an asynchronous scheduler
236  StatusCode promoteToExecuted( unsigned int iAlgo, int si, IAlgorithm* algo, EventContext* );
237  StatusCode promoteToAsyncExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
238  EventContext* ); // tests of an asynchronous scheduler
239  StatusCode promoteToFinished( unsigned int iAlgo, int si );
240 
242  StatusCode isStalled( int si );
243 
245  void dumpSchedulerState( int iSlot );
246 
248  bool m_updateNeeded = true;
249 
250  // Algos Management -------------------------------------------------------
253 
256 
257  // Actions management -----------------------------------------------------
258 
260  tbb::concurrent_bounded_queue<action> m_actionsQueue;
261 
262  // helper task to enqueue the scheduler's actions (closures)
263  struct enqueueSchedulerActionTask : public tbb::task {
264 
267 
269  : m_closure( _closure ), m_scheduler( scheduler )
270  {
271  }
272 
273  tbb::task* execute() override
274  {
275  m_scheduler->m_actionsQueue.push( m_closure );
276  return nullptr;
277  }
278  };
279 
280  // ------------------------------------------------------------------------
281 
282  // Service for thread pool initialization
286  bool m_first = true;
287 
289  {
290 
291  public:
292  SchedulerState( Algorithm* a, EventContext* e, pthread_t t ) : m_a( a ), m_e( *e ), m_t( t ) {}
293 
294  Algorithm* alg() const { return m_a; }
295  EventContext ctx() const { return m_e; }
296  pthread_t thread() const { return m_t; }
297 
299  {
300  os << ss.ctx() << " a: " << ss.alg()->name() << " [" << std::hex << ss.alg() << std::dec << "] t: 0x"
301  << std::hex << ss.thread() << std::dec;
302  return os;
303  }
304 
305  bool operator==( const SchedulerState& ss ) const { return ( m_a == ss.alg() ); }
306 
307  bool operator==( Algorithm* a ) const { return ( m_a == a ); }
308 
309  bool operator<( const SchedulerState& rhs ) const { return ( m_a < rhs.alg() ); }
310 
311  private:
314  pthread_t m_t;
315  };
316 
319 
320 public:
321  void addAlg( Algorithm*, EventContext*, pthread_t );
322  bool delAlg( Algorithm* );
323  void dumpState() override;
324 
325 private:
326  void dumpState( std::ostringstream& );
327 };
328 
329 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
Gaudi::Property< bool > m_showDataFlow
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
Implementation of property with value of concrete type.
Definition: Property.h:319
Gaudi::Property< bool > m_dumpIntraEventDynamics
Gaudi::Property< bool > m_showDataDeps
bool operator==(const SchedulerState &ss) const
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
enqueueSchedulerActionTask(AvalancheSchedulerSvc *scheduler, std::function< StatusCode()> _closure)
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
friend std::ostream & operator<<(std::ostream &os, const SchedulerState &ss)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
AlgsExecutionStates::State State
void addAlg(Algorithm *, EventContext *, pthread_t)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
bool operator<(const SchedulerState &rhs) const
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
unsigned int m_algosInFlight
Number of algoritms presently in flight.
static std::list< SchedulerState > m_sState
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
SchedulerState(Algorithm *a, EventContext *e, pthread_t t)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
std::function< StatusCode()> action
StatusCode finalize() override
Finalise.
~AvalancheSchedulerSvc() override=default
Destructor.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
STL class.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
StatusCode promoteToFinished(unsigned int iAlgo, int si)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
Gaudi::Property< bool > m_simulateExecution
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Gaudi::Property< bool > m_enableCondSvc
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
STL class.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::thread m_thread
The thread in which the activate function runs.
StatusCode m_drain()
Drain the actions present in the queue.