The Gaudi Framework  v29r0 (ff2e7097)
ForwardSchedulerSvc.h
Go to the documentation of this file.
1 #ifndef GAUDIHIVE_FORWARDSCHEDULERSVC_H
2 #define GAUDIHIVE_FORWARDSCHEDULERSVC_H
3 
4 // Local includes
5 #include "AlgsExecutionStates.h"
6 #include "DataFlowManager.h"
7 #include "EventSlot.h"
8 #include "ExecutionFlowManager.h"
9 
10 // Framework include files
14 #include "GaudiKernel/IRunable.h"
15 #include "GaudiKernel/IScheduler.h"
17 #include "GaudiKernel/Service.h"
18 
19 // C++ include files
20 #include <functional>
21 #include <string>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25 
26 // External libs
27 #include "tbb/concurrent_queue.h"
28 #include "tbb/task.h"
29 
32 
33 //---------------------------------------------------------------------------
34 
83 class ForwardSchedulerSvc : public extends<Service, IScheduler>
84 {
85 public:
87  using extends::extends;
88 
90  ~ForwardSchedulerSvc() override = default;
91 
93  StatusCode initialize() override;
94 
96  StatusCode finalize() override;
97 
99  StatusCode pushNewEvent( EventContext* eventContext ) override;
100 
101  // Make multiple events available to the scheduler
102  StatusCode pushNewEvents( std::vector<EventContext*>& eventContexts ) override;
103 
105  StatusCode popFinishedEvent( EventContext*& eventContext ) override;
106 
108  StatusCode tryPopFinishedEvent( EventContext*& eventContext ) override;
109 
111  unsigned int freeSlots() override;
112 
113 private:
114  enum ActivationState { INACTIVE = 0, ACTIVE = 1, FAILURE = 2 };
115 
116  Gaudi::Property<int> m_maxEventsInFlight{this, "MaxEventsInFlight", 0,
117  "Maximum number of event processed simultaneously"};
119  this, "ThreadPoolSize", -1,
120  "Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose"};
121  Gaudi::Property<std::string> m_whiteboardSvcName{this, "WhiteboardSvc", "EventDataSvc", "The whiteboard name"};
123  "[[deprecated]] Taken from the whiteboard"};
125  this, "AlgosDependencies", {}, "[[deprecated]]"};
126  Gaudi::Property<bool> m_checkDeps{this, "CheckDependencies", false, "Runtime check of Algorithm Data Dependencies"};
128  "Attribute unmet input dependencies to this DataLoader Algorithm"};
129 
130  Gaudi::Property<bool> m_showDataDeps{this, "ShowDataDependencies", true,
131  "Show the INPUT and OUTPUT data dependencies of Algorithms"};
132  Gaudi::Property<bool> m_showDataFlow{this, "ShowDataFlow", false,
133  "Show the configuration of DataFlow between Algorithms"};
134  Gaudi::Property<bool> m_showControlFlow{this, "ShowControlFlow", false,
135  "Show the configuration of all Algorithms and Sequences"};
136 
137  // Utils and shortcuts ----------------------------------------------------
138 
140  void activate();
141 
144 
147 
150 
152  inline unsigned int algname2index( const std::string& algoname );
153 
156 
158  inline const std::string& index2algname( unsigned int index );
159 
162 
165 
168 
170  std::atomic_int m_freeSlots;
171 
173  tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents;
174 
176  StatusCode eventFailed( EventContext* eventContext );
177 
180 
181  // States management ------------------------------------------------------
182 
184  unsigned int m_algosInFlight = 0;
185 
188  StatusCode updateStates( int si = -1 );
189 
191  StatusCode promoteToControlReady( unsigned int iAlgo, int si );
192  StatusCode promoteToDataReady( unsigned int iAlgo, int si );
193  StatusCode promoteToScheduled( unsigned int iAlgo, int si );
194  StatusCode promoteToExecuted( unsigned int iAlgo, int si, IAlgorithm* algo, EventContext* );
195  StatusCode promoteToFinished( unsigned int iAlgo, int si );
196 
198  StatusCode isStalled( int si );
199 
201  void dumpSchedulerState( int iSlot );
202 
204  bool m_updateNeeded = true;
205 
206  // Algos Management -------------------------------------------------------
209 
212 
213  // Actions management -----------------------------------------------------
214 
216  tbb::concurrent_bounded_queue<action> m_actionsQueue;
217 
218  // helper task to enqueue the scheduler's actions (closures)
219  struct enqueueSchedulerActionTask : public tbb::task {
220 
223 
225  : m_closure( _closure ), m_scheduler( scheduler )
226  {
227  }
228 
229  tbb::task* execute() override
230  {
231  m_scheduler->m_actionsQueue.push( m_closure );
232  return nullptr;
233  }
234  };
235 
236  // ------------------------------------------------------------------------
237 
240 
241  // Service for thread pool initialization
243 
244  bool m_first = true;
245 
247  {
248 
249  public:
250  SchedulerState( Algorithm* a, EventContext* e, pthread_t t ) : m_a( a ), m_e( *e ), m_t( t ) {}
251 
252  Algorithm* alg() const { return m_a; }
253  EventContext ctx() const { return m_e; }
254  pthread_t thread() const { return m_t; }
255 
257  {
258  os << ss.ctx() << " a: " << ss.alg()->name() << " [" << std::hex << ss.alg() << std::dec << "] t: 0x"
259  << std::hex << ss.thread() << std::dec;
260  return os;
261  }
262 
263  bool operator==( const SchedulerState& ss ) const { return ( m_a == ss.alg() ); }
264 
265  bool operator==( Algorithm* a ) const { return ( m_a == a ); }
266 
267  bool operator<( const SchedulerState& rhs ) const { return ( m_a < rhs.alg() ); }
268 
269  private:
272  pthread_t m_t;
273  };
274 
277 
278 public:
279  void addAlg( Algorithm*, EventContext*, pthread_t );
280  bool delAlg( Algorithm* );
281  void dumpState() override;
282 
283 private:
284  void dumpState( std::ostringstream& );
286 };
287 
288 #endif // GAUDIHIVE_FORWARDSCHEDULERSVC_H
StatusCode deactivate()
Deactivate scheduler.
SchedulerState(Algorithm *a, EventContext *e, pthread_t t)
Gaudi::Property< bool > m_showControlFlow
StatusCode initialize() override
Initialise.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
Implementation of property with value of concrete type.
Definition: Property.h:319
concurrency::recursive_CF::ExecutionFlowManager m_efManager
Member to take care of the control flow.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
StatusCode updateStates(int si=-1)
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
StatusCode finalize() override
Finalise.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
AlgsExecutionStates::State State
Gaudi::Property< bool > m_checkDeps
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::vector< std::string > > > m_algosDependencies
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
enqueueSchedulerActionTask(ForwardSchedulerSvc *scheduler, std::function< StatusCode()> _closure)
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
void addAlg(Algorithm *, EventContext *, pthread_t)
Gaudi::Property< bool > m_showDataFlow
unsigned int m_algosInFlight
Number of algoritms presently in flight.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
bool operator<(const SchedulerState &rhs) const
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
static std::list< SchedulerState > m_sState
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
StatusCode promoteToFinished(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
bool operator==(Algorithm *a) const
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
bool operator==(const SchedulerState &ss) const
~ForwardSchedulerSvc() override=default
Destructor.
unsigned int freeSlots() override
Get free slots number.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
Gaudi::Property< int > m_maxEventsInFlight
STL class.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
concurrency::recursive_CF::ControlFlowGraph * m_efg
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
STL class.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
T hex(T...args)
std::function< StatusCode()> action
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
bool delAlg(Algorithm *)
friend std::ostream & operator<<(std::ostream &os, const SchedulerState &ss)
Gaudi::Property< bool > m_showDataDeps
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
Manage control flow part of the execution flow.
State
Execution states of the algorithms.
Gaudi::Property< int > m_threadPoolSize
STL class.
STL class.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)