The Gaudi Framework  v30r0 (c919700c)
AvalancheSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_AVALANCHESCHEDULERSVC_H
2 #define GAUDIHIVE_AVALANCHESCHEDULERSVC_H
3 
4 // Local includes
5 #include "AlgsExecutionStates.h"
6 #include "EventSlot.h"
7 #include "PrecedenceSvc.h"
8 
9 // Framework include files
10 #include "GaudiKernel/Algorithm.h"
14 #include "GaudiKernel/ICondSvc.h"
16 #include "GaudiKernel/IRunable.h"
17 #include "GaudiKernel/IScheduler.h"
19 #include "GaudiKernel/Service.h"
20 
21 // C++ include files
22 #include <functional>
23 #include <string>
24 #include <thread>
25 #include <unordered_map>
26 #include <vector>
27 
28 // External libs
29 #include "tbb/concurrent_queue.h"
30 #include "tbb/task.h"
31 
34 
35 //---------------------------------------------------------------------------
36 
102 class AvalancheSchedulerSvc : public extends<Service, IScheduler>
103 {
104 public:
106  using extends::extends;
107 
109  ~AvalancheSchedulerSvc() override = default;
110 
112  StatusCode initialize() override;
113 
115  StatusCode finalize() override;
116 
118  StatusCode pushNewEvent( EventContext* eventContext ) override;
119 
120  // Make multiple events available to the scheduler
121  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
122 
124  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
125 
127  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
128 
130  unsigned int freeSlots() override;
131 
132 private:
133  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
134 
136  this, "ThreadPoolSize", -1,
137  "Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"};
138  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
139  Gaudi::Property<std::string> m_IOBoundAlgSchedulerSvcName{this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"};
140  Gaudi::Property<unsigned int> m_maxIOBoundAlgosInFlight{this, "MaxIOBoundAlgosInFlight", 0,
141  "Maximum number of simultaneous I/O-bound algorithms"};
143  this, "SimulateExecution", false,
144  "Flag to perform single-pass simulation of execution flow before the actual execution"};
146  "The following modes are currently available: PCE, COD, DRE, E"};
147  Gaudi::Property<bool> m_dumpIntraEventDynamics{this, "DumpIntraEventDynamics", false,
148  "Dump intra-event concurrency dynamics to csv file"};
149  Gaudi::Property<bool> m_useIOBoundAlgScheduler{this, "PreemptiveIOBoundTasks", false,
150  "Turn on preemptive way of scheduling of I/O-bound algorithms"};
151 
152  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"};
153 
155  "Attribute unmet input dependencies to this DataLoader Algorithm"};
156 
157  Gaudi::Property<bool> m_enableCondSvc{this, "EnableConditions", false, "Enable ConditionsSvc"};
158 
159  Gaudi::Property<bool> m_showDataDeps{this, "ShowDataDependencies", true,
160  "Show the INPUT and OUTPUT data dependencies of Algorithms"};
161 
162  Gaudi::Property<bool> m_showDataFlow{this, "ShowDataFlow", false,
163  "Show the configuration of DataFlow between Algorithms"};
164 
165  Gaudi::Property<bool> m_showControlFlow{this, "ShowControlFlow", false,
166  "Show the configuration of all Algorithms and Sequences"};
167 
168  // Utils and shortcuts ----------------------------------------------------
169 
171  void activate();
172 
175 
178 
181 
183  inline unsigned int algname2index( const std::string& algoname );
184 
187 
189  inline const std::string& index2algname( unsigned int index );
190 
193 
196 
199 
202 
205 
207  std::atomic_int m_freeSlots;
208 
210  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
211 
213  StatusCode eventFailed( EventContext* eventContext );
214 
217 
220 
221  // States management ------------------------------------------------------
222 
224  unsigned int m_algosInFlight = 0;
225 
227  unsigned int m_IOBoundAlgosInFlight = 0;
228 
231  StatusCode updateStates( int si = -1, int algo_index = -1 );
232 
234  StatusCode promoteToScheduled( unsigned int iAlgo, int si );
235  StatusCode promoteToAsyncScheduled( unsigned int iAlgo, int si ); // tests of an asynchronous scheduler
236  StatusCode promoteToExecuted( unsigned int iAlgo, int si, IAlgorithm* algo, EventContext* );
237  StatusCode promoteToAsyncExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
238  EventContext* ); // tests of an asynchronous scheduler
239  StatusCode promoteToFinished( unsigned int iAlgo, int si );
240 
242  StatusCode isStalled( int si );
243 
245  void dumpSchedulerState( int iSlot );
246 
247  // Algos Management -------------------------------------------------------
250 
253 
254  // Actions management -----------------------------------------------------
255 
257  tbb::concurrent_bounded_queue<action> m_actionsQueue;
258 
259  // ------------------------------------------------------------------------
260 
261  // Service for thread pool initialization
265  bool m_first = true;
266 
268  {
269 
270  public:
271  SchedulerState( Algorithm* a, EventContext* e, pthread_t t ) : m_a( a ), m_e( *e ), m_t( t ) {}
272 
273  Algorithm* alg() const { return m_a; }
274  EventContext ctx() const { return m_e; }
275  pthread_t thread() const { return m_t; }
276 
278  {
279  os << ss.ctx() << " a: " << ss.alg()->name() << " [" << std::hex << ss.alg() << std::dec << "] t: 0x"
280  << std::hex << ss.thread() << std::dec;
281  return os;
282  }
283 
284  bool operator==( const SchedulerState& ss ) const { return ( m_a == ss.alg() ); }
285 
286  bool operator==( Algorithm* a ) const { return ( m_a == a ); }
287 
288  bool operator<( const SchedulerState& rhs ) const { return ( m_a < rhs.alg() ); }
289 
290  private:
293  pthread_t m_t;
294  };
295 
298 
299 public:
300  void addAlg( Algorithm*, EventContext*, pthread_t );
301  bool delAlg( Algorithm* );
302  void dumpState() override;
303 
304 private:
305  void dumpState( std::ostringstream& );
306 };
307 
308 #endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
Gaudi::Property< bool > m_showDataFlow
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:737
Implementation of property with value of concrete type.
Definition: Property.h:319
Gaudi::Property< bool > m_dumpIntraEventDynamics
Gaudi::Property< bool > m_showDataDeps
bool operator==(const SchedulerState &ss) const
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
Gaudi::Property< std::string > m_optimizationMode
friend std::ostream & operator<<(std::ostream &os, const SchedulerState &ss)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
AlgsExecutionStates::State State
void addAlg(Algorithm *, EventContext *, pthread_t)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
Gaudi::Property< bool > m_checkDeps
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
bool operator<(const SchedulerState &rhs) const
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
unsigned int m_algosInFlight
Number of algoritms presently in flight.
static std::list< SchedulerState > m_sState
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
SchedulerState(Algorithm *a, EventContext *e, pthread_t t)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
std::function< StatusCode()> action
StatusCode finalize() override
Finalise.
~AvalancheSchedulerSvc() override=default
Destructor.
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
STL class.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
StatusCode promoteToFinished(unsigned int iAlgo, int si)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
Gaudi::Property< bool > m_simulateExecution
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
Gaudi::Property< bool > m_enableCondSvc
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
T hex(T...args)
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
StatusCode updateStates(int si=-1, int algo_index=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
State
Execution states of the algorithms.
STL class.
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
std::thread m_thread
The thread in which the activate function runs.
StatusCode m_drain()
Drain the actions present in the queue.