ForwardSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_FORWARDSCHEDULERSVC_H
2 #define GAUDIHIVE_FORWARDSCHEDULERSVC_H
3 
4 // Framework include files
5 #include "GaudiKernel/IScheduler.h"
6 #include "GaudiKernel/IRunable.h"
7 #include "GaudiKernel/Service.h"
8 #include "GaudiKernel/IAlgResourcePool.h"
9 #include "GaudiKernel/IHiveWhiteBoard.h"
10 
11 // Local includes
12 #include "AlgsExecutionStates.h"
13 #include "EventSlot.h"
14 #include "ExecutionFlowManager.h"
15 #include "DataFlowManager.h"
16 
17 // C++ include files
18 #include <vector>
19 #include <string>
20 #include <unordered_map>
21 #include <functional>
22 #include <thread>
23 
24 // External libs
25 #include "tbb/concurrent_queue.h"
26 
28 
29 //---------------------------------------------------------------------------
30 
72 class ForwardSchedulerSvc: public extends1<Service, IScheduler> {
73 public:
75  ForwardSchedulerSvc( const std::string& name, ISvcLocator* svc );
76 
79 
81  virtual StatusCode initialize();
82 
84  virtual StatusCode finalize();
85 
87  virtual StatusCode pushNewEvent(EventContext* eventContext);
88 
89  // Make multiple events available to the scheduler
90  virtual StatusCode pushNewEvents(std::vector<EventContext*>& eventContexts);
91 
93  virtual StatusCode popFinishedEvent(EventContext*& eventContext);
94 
96  virtual StatusCode tryPopFinishedEvent(EventContext*& eventContext);
97 
99  virtual unsigned int freeSlots();
100 
101 
102 private:
103 
104  // Utils and shortcuts ----------------------------------------------------
105 
107  void activate();
108 
111 
114 
116  std::thread m_thread;
117 
119  inline unsigned int algname2index(const std::string& algoname);
120 
122  std::unordered_map<std::string,unsigned int> m_algname_index_map;
123 
125  inline const std::string& index2algname (unsigned int index);
126 
128  std::vector<std::string> m_algname_vect;
129 
132 
134  std::string m_whiteboardSvcName;
135 
137  std::vector<EventSlot> m_eventSlots;
138 
141 
143  std::atomic_int m_freeSlots;
144 
146  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
147 
149  StatusCode eventFailed(EventContext* eventContext);
150 
151 
152  // States management ------------------------------------------------------
153 
155  unsigned int m_maxAlgosInFlight;
156 
158  unsigned int m_algosInFlight;
159 
162  StatusCode updateStates(int si=-1, const std::string& algo_name=std::string());
163 
165  StatusCode promoteToControlReady(unsigned int iAlgo, int si);
166  StatusCode promoteToDataReady(unsigned int iAlgo, int si);
167  StatusCode promoteToScheduled(unsigned int iAlgo, int si);
168  StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm* algo);
169  StatusCode promoteToFinished(unsigned int iAlgo, int si);
170 
172  StatusCode isStalled(int si);
173 
175  void dumpSchedulerState(int iSlot);
176 
179 
180  // Algos Management -------------------------------------------------------
183 
185  std::vector<std::vector<std::string>> m_algosDependencies;
186 
189 
192 
193  // Actions management -----------------------------------------------------
194 
195  typedef std::function<StatusCode ()> action;
196 
198  tbb::concurrent_bounded_queue<action> m_actionsQueue;
199 
202  // XXX: CF tests. Temporary property to switch between ControlFlow implementations
203  bool m_CFNext;
204  // XXX: CF tests. Temporary property to switch between DataFlow implementations
205  bool m_DFNext;
206  // Flag to perform single-pass simulation of execution flow before the actual execution
208  // Optimization mode in which algorithms, ready for execution, are prioritized in special way
209  std::string m_optimizationMode;
210  // Dump intra-event concurrency dynamics to csv file
212 
213  // Needed to queue actions on algorithm finishing and decrement algos in flight
214  friend class AlgoExecutionTask;
215 
216 };
217 
218 #endif // GAUDIHIVE_FORWARDSCHEDULERSVC_H
StatusCode deactivate()
Deactivate scheduler.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
ForwardSchedulerSvc(const std::string &name, ISvcLocator *svc)
Constructor.
virtual StatusCode initialize()
Initialise.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
bool m_isActive
Flag to track if the scheduler is active or not.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
AlgsExecutionStates::State State
The SchedulerSvc implements the IScheduler interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
Manage the execution flow using an execution flow graph Once initialized, the graph is const and can ...
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
StatusCode promoteToFinished(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
std::function< StatusCode()> action
void activate()
Activate scheduler.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
State
Execution states of the algorithms.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)