ForwardSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_FORWARDSCHEDULERSVC_H
2 #define GAUDIHIVE_FORWARDSCHEDULERSVC_H
3 
4 // Framework include files
6 #include "GaudiKernel/IRunable.h"
7 #include "GaudiKernel/Service.h"
11 
12 // Local includes
13 #include "AlgsExecutionStates.h"
14 #include "EventSlot.h"
15 #include "ExecutionFlowManager.h"
16 #include "DataFlowManager.h"
17 
18 // C++ include files
19 #include <vector>
20 #include <string>
21 #include <unordered_map>
22 #include <functional>
23 #include <thread>
24 
25 // External libs
26 #include "tbb/concurrent_queue.h"
27 
29 
30 //---------------------------------------------------------------------------
31 
73 class ForwardSchedulerSvc: public extends<Service,
74  IScheduler> {
75 public:
78 
81 
83  virtual StatusCode initialize();
84 
86  virtual StatusCode finalize();
87 
89  virtual StatusCode pushNewEvent(EventContext* eventContext);
90 
91  // Make multiple events available to the scheduler
92  virtual StatusCode pushNewEvents(std::vector<EventContext*>& eventContexts);
93 
95  virtual StatusCode popFinishedEvent(EventContext*& eventContext);
96 
98  virtual StatusCode tryPopFinishedEvent(EventContext*& eventContext);
99 
101  virtual unsigned int freeSlots();
102 
103 
104 private:
105 
107  INACTIVE = 0,
108  ACTIVE = 1,
110  };
111 
112  // Utils and shortcuts ----------------------------------------------------
113 
115  void activate();
116 
119 
122 
125 
127  inline unsigned int algname2index(const std::string& algoname);
128 
131 
133  inline const std::string& index2algname (unsigned int index);
134 
137 
140 
143 
146 
149 
151  std::atomic_int m_freeSlots;
152 
154  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
155 
157  StatusCode eventFailed(EventContext* eventContext);
158 
159 
160  // States management ------------------------------------------------------
161 
163  unsigned int m_maxAlgosInFlight;
164 
166  unsigned int m_algosInFlight;
167 
170  StatusCode updateStates(int si=-1, const std::string& algo_name=std::string());
171 
173  StatusCode promoteToControlReady(unsigned int iAlgo, int si);
174  StatusCode promoteToDataReady(unsigned int iAlgo, int si);
175  StatusCode promoteToScheduled(unsigned int iAlgo, int si);
176  StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm* algo);
177  StatusCode promoteToFinished(unsigned int iAlgo, int si);
178 
180  StatusCode isStalled(int si);
181 
183  void dumpSchedulerState(int iSlot);
184 
187 
188  // Algos Management -------------------------------------------------------
191 
194 
197 
200 
201  // Actions management -----------------------------------------------------
202 
204 
206  tbb::concurrent_bounded_queue<action> m_actionsQueue;
207 
210  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
211  bool m_CFNext;
212  // XXX: CF tests. Temporary property to switch between DataFlow implementations
213  bool m_DFNext;
214  // Flag to perform single-pass simulation of execution flow before the actual execution
216  // Optimization mode in which algorithms, ready for execution, are prioritized in special way
218  // Dump intra-event concurrency dynamics to csv file
220 
221  // Needed to queue actions on algorithm finishing and decrement algos in flight
222  friend class AlgoExecutionTask;
223 
224  // Service for thread pool initialization
226 
227  bool m_first;
228 
230 
231  public:
232  SchedulerState(Algorithm* a, EventContext* e, pthread_t t):
233  m_a(a), m_e(*e), m_t(t)
234  {
235  }
236 
237  Algorithm* alg() const { return m_a; }
238  EventContext ctx() const { return m_e; }
239  pthread_t thread() const { return m_t; }
240 
242  os << ss.ctx()
243  << " a: " << ss.alg()->name()
244  << " [" << std::hex << ss.alg() << std::dec
245  << "] t: 0x" << std::hex << ss.thread() << std::dec;
246  return os;
247  }
248 
249  bool operator== (const SchedulerState& ss) const {
250  return ( m_a == ss.alg() );
251  }
252 
253  bool operator== (Algorithm* a) const {
254  return ( m_a == a );
255  }
256 
257  bool operator< ( const SchedulerState& rhs) const {
258  return ( m_a < rhs.alg() );
259  }
260 
261  private:
264  pthread_t m_t;
265 
266  };
267 
270 
271 public:
272  void addAlg(Algorithm*, EventContext*, pthread_t);
273  bool delAlg(Algorithm*);
274  void dumpState();
275 
276 private:
278 
280 
281 
282 };
283 
284 #endif // GAUDIHIVE_FORWARDSCHEDULERSVC_H
StatusCode deactivate()
Deactivate scheduler.
SchedulerState(Algorithm *a, EventContext *e, pthread_t t)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
ForwardSchedulerSvc(const std::string &name, ISvcLocator *svc)
Constructor.
virtual StatusCode initialize()
Initialise.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
AlgsExecutionStates::State State
The SchedulerSvc implements the IScheduler interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
STL class.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void addAlg(Algorithm *, EventContext *, pthread_t)
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:319
unsigned int m_algosInFlight
Number of algoritms presently in flight.
Manage the execution flow using an execution flow graph Once initialized, the graph is const and can ...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
bool operator<(const SchedulerState &rhs) const
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
StatusCode promoteToFinished(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
bool operator==(const SchedulerState &ss) const
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
STL class.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
std::function< StatusCode()> action
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
void activate()
Activate scheduler.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
T hex(T...args)
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
friend std::ostream & operator<<(std::ostream &os, const SchedulerState &ss)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
State
Execution states of the algorithms.
STL class.
STL class.
std::vector< std::vector< std::string > > m_algosDependencies
DEPRECATED!
StatusCode promoteToDataReady(unsigned int iAlgo, int si)