ForwardSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_FORWARDSCHEDULERSVC_H
2 #define GAUDIHIVE_FORWARDSCHEDULERSVC_H
3 
4 #include <functional>
5 #include <string>
6 #include <thread>
7 #include <unordered_map>
8 #include <vector>
9 
10 // External libs
11 #include "tbb/concurrent_queue.h"
12 
13 // Framework include files
18 #include "GaudiKernel/IRunable.h"
19 #include "GaudiKernel/IScheduler.h"
21 #include "GaudiKernel/Service.h"
22 
23 // Local includes
24 #include "AlgsExecutionStates.h"
25 #include "DataFlowManager.h"
26 #include "EventSlot.h"
27 #include "ExecutionFlowManager.h"
28 
29 // C++ include files
30 #include <functional>
31 #include <string>
32 #include <thread>
33 #include <unordered_map>
34 #include <vector>
35 
36 // External libs
37 #include "tbb/concurrent_queue.h"
38 
40 
41 //---------------------------------------------------------------------------
42 
84 class ForwardSchedulerSvc : public extends<Service, IScheduler>
85 {
86 public:
88  using extends::extends;
89 
91  ~ForwardSchedulerSvc() override = default;
92 
94  StatusCode initialize() override;
95 
97  StatusCode finalize() override;
98 
100  StatusCode pushNewEvent( EventContext* eventContext ) override;
101 
102  // Make multiple events available to the scheduler
103  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
104 
106  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
107 
109  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
110 
112  unsigned int freeSlots() override;
113 
114 private:
115  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
116 
117  Gaudi::Property<int> m_maxEventsInFlight{this, "MaxEventsInFlight", 0,
118  "Maximum number of event processed simultaneously"};
120  this, "ThreadPoolSize", -1,
121  "Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"};
122  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
123  Gaudi::Property<std::string> m_IOBoundAlgSchedulerSvcName{this, "IOBoundAlgSchedulerSvc", "IOBoundAlgSchedulerSvc"};
125  "[[deprecated]] Taken from the whiteboard"};
126 
127  Gaudi::Property<unsigned int> m_maxIOBoundAlgosInFlight{this, "MaxIOBoundAlgosInFlight", 0,
128  "Maximum number of simultaneous I/O-bound algorithms"};
129  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
130  Gaudi::Property<bool> m_CFNext{this, "useGraphFlowManagement", false,
131  "Temporary property to switch between ControlFlow implementations"};
132  // XXX: CF tests. Temporary property to switch between DataFlow implementations
133  Gaudi::Property<bool> m_DFNext{this, "DataFlowManagerNext", false,
134  "Temporary property to switch between DataFlow implementations"};
136  this, "SimulateExecution", false,
137  "Flag to perform single-pass simulation of execution flow before the actual execution"};
139  "The following modes are currently available: PCE, COD, DRE, E"};
140  Gaudi::Property<bool> m_dumpIntraEventDynamics{this, "DumpIntraEventDynamics", false,
141  "Dump intra-event concurrency dynamics to csv file"};
142  Gaudi::Property<bool> m_useIOBoundAlgScheduler{this, "PreemptiveIOBoundTasks", false,
143  "Turn on preemptive way of scheduling of I/O-bound algorithms"};
145  this, "AlgosDependencies", {}, "[[deprecated]]"};
146  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "[[deprecated]]"};
147 
148  // Utils and shortcuts ----------------------------------------------------
149 
151  void activate();
152 
155 
158 
161 
163  inline unsigned int algname2index( const std::string& algoname );
164 
167 
169  inline const std::string& index2algname( unsigned int index );
170 
173 
176 
179 
182 
184  std::atomic_int m_freeSlots;
185 
187  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
188 
190  StatusCode eventFailed( EventContext* eventContext );
191 
194 
195  // States management ------------------------------------------------------
196 
198  unsigned int m_algosInFlight = 0;
199 
201  unsigned int m_IOBoundAlgosInFlight = 0;
202 
205  StatusCode updateStates( int si = -1, const std::string& algo_name = std::string() );
206 
208  StatusCode promoteToControlReady( unsigned int iAlgo, int si );
209  StatusCode promoteToDataReady( unsigned int iAlgo, int si );
210  StatusCode promoteToScheduled( unsigned int iAlgo, int si );
211  StatusCode promoteToAsyncScheduled( unsigned int iAlgo, int si ); // tests of an asynchronous scheduler
212  StatusCode promoteToExecuted( unsigned int iAlgo, int si, IAlgorithm* algo, EventContext* );
213  StatusCode promoteToAsyncExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
214  EventContext* ); // tests of an asynchronous scheduler
215  StatusCode promoteToFinished( unsigned int iAlgo, int si );
216 
218  StatusCode isStalled( int si );
219 
221  void dumpSchedulerState( int iSlot );
222 
224  bool m_updateNeeded = true;
225 
226  // Algos Management -------------------------------------------------------
229 
232 
233  // Actions management -----------------------------------------------------
234 
236 
238  tbb::concurrent_bounded_queue<action> m_actionsQueue;
239 
242 
243  // Needed to queue actions on algorithm finishing and decrement algos in flight
244  friend class AlgoExecutionTask;
245  friend class IOBoundAlgTask;
246 
247  // Service for thread pool initialization
249 
250  bool m_first = true;
251 
253  {
254 
255  public:
256  SchedulerState( Algorithm* a, EventContext* e, pthread_t t ) : m_a( a ), m_e( *e ), m_t( t ) {}
257 
258  Algorithm* alg() const { return m_a; }
259  EventContext ctx() const { return m_e; }
260  pthread_t thread() const { return m_t; }
261 
263  {
264  os << ss.ctx() << " a: " << ss.alg()->name() << " [" << std::hex << ss.alg() << std::dec << "] t: 0x"
265  << std::hex << ss.thread() << std::dec;
266  return os;
267  }
268 
269  bool operator==( const SchedulerState& ss ) const { return ( m_a == ss.alg() ); }
270 
271  bool operator==( Algorithm* a ) const { return ( m_a == a ); }
272 
273  bool operator<( const SchedulerState& rhs ) const { return ( m_a < rhs.alg() ); }
274 
275  private:
278  pthread_t m_t;
279  };
280 
283 
284 public:
285  void addAlg( Algorithm*, EventContext*, pthread_t );
286  bool delAlg( Algorithm* );
287  void dumpState() override;
288 
289 private:
290  void dumpState( std::ostringstream& );
291 };
292 
293 #endif // GAUDIHIVE_FORWARDSCHEDULERSVC_H
StatusCode deactivate()
Deactivate scheduler.
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
Wrapper around I/O-bound Gaudi-algorithms.
SchedulerState(Algorithm *a, EventContext *e, pthread_t t)
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_CFNext
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
Gaudi::Property< bool > m_simulateExecution
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
Implementation of property with value of concrete type.
Definition: Property.h:314
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
StatusCode finalize() override
Finalise.
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
AlgsExecutionStates::State State
Gaudi::Property< bool > m_checkDeps
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
STL class.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
void addAlg(Algorithm *, EventContext *, pthread_t)
unsigned int m_algosInFlight
Number of algoritms presently in flight.
Manage the execution flow using an execution flow graph Once initialized, the graph is const and can ...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
bool operator<(const SchedulerState &rhs) const
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
StatusCode promoteToFinished(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
bool operator==(Algorithm *a) const
Gaudi::Property< std::string > m_whiteboardSvcName
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
bool operator==(const SchedulerState &ss) const
~ForwardSchedulerSvc() override=default
Destructor.
unsigned int freeSlots() override
Get free slots number.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Gaudi::Property< int > m_maxEventsInFlight
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
STL class.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
Gaudi::Property< bool > m_useIOBoundAlgScheduler
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Gaudi::Property< bool > m_dumpIntraEventDynamics
T hex(T...args)
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend std::ostream & operator<<(std::ostream &os, const SchedulerState &ss)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::function< StatusCode()> action
State
Execution states of the algorithms.
Gaudi::Property< int > m_threadPoolSize
STL class.
STL class.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)