ParallelSequentialSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
4 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
9 
12 
13 // C++
14 #include <list>
15 #include <thread>
16 
17 // Local
19 #include "AlgResourcePool.h"
20 #include "RetCodeGuard.h"
21 
22 // Instantiation of a static factory class used by clients to create instances of this service
24 
25 //===========================================================================
26 // Infrastructure methods
27 
29  base_class(name,svcLoc) {
30 
31  declareProperty("UseTopAlgList", m_useTopAlgList = false);
32  declareProperty("ThreadPoolSize", m_threadPoolSize = -1);
33  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc");
34 
35 }
36 
37 //---------------------------------------------------------------------------
39 //---------------------------------------------------------------------------
40 
42 
43  // Initialise mother class (read properties, ...)
45  if (!sc.isSuccess())
46  warning () << "Base class could not be initialized" << endmsg;
47 
48  // Get the algo resource pool
49  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
50  if (!m_algResourcePool.isValid()){
51  error() << "Error retrieving AlgResourcePool" << endmsg;
52  return StatusCode::FAILURE;
53  }
54 
55  // Get the list of algorithms
57  info() << "Found " << m_algList.size() << " algorithms" << endmsg;
58 
59  // Get Whiteboard
61  if (!m_whiteboard.isValid())
62  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
63 
64  // Check the MaxEventsInFlight parameters and react
65  // Deprecated for the moment
66  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
67 
68  // Set the number of free slots
69  m_freeSlots=numberOfWBSlots;
70 
71  info() << "Allowing " << m_freeSlots << " events in flight" << endmsg;
72 
73  if(m_threadPoolSize == -1)
74  m_threadPoolSize = numberOfWBSlots;
75 
76  debug() << "Initialising a TBB thread pool of size " << m_threadPoolSize << endmsg;
77  m_tbb_sched.reset(new tbb::task_scheduler_init(m_threadPoolSize));
78 
79  // Fill the containers to convert algo names to index
82  unsigned int index=0;
83  for (IAlgorithm* algo : m_algList){
84  const std::string& name = algo->name();
87  index++;
88  }
89 
90  //initialize control flow manager
91  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
92 
94 
95  const unsigned int algosDependenciesSize=0;
96  info() << "Algodependecies size is " << algosDependenciesSize << endmsg;
97 
98  //get algorithm dependencies
99  /* Dependencies
100  0) Read deps from config file
101  1) Look for handles in algo, if none
102  2) Assume none are required
103  */
104  if (algosDependenciesSize == 0){
105  // Get the event root from the IDataManagerSvc interface of the WhiteBoard
107  std::string rootInTESName(dataMgrSvc->rootName());
108  if ("" != rootInTESName && '/'!=rootInTESName[rootInTESName.size()-1]){
109  rootInTESName = rootInTESName+"/";
110  }
111 
112  for (IAlgorithm* ialgoPtr : m_algList){
113  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr);
114  if (nullptr == algoPtr){
115  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
116  }
117 
118  std::vector<Gaudi::DataHandle*> algoHandles(algoPtr->inputHandles());
119  DataObjIDColl algoDependencies;
120  if (!algoHandles.empty()){
121  info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg;
122 
123  DataObjIDColl inputObjs, outputObjs;
124  DHHVisitor avis(inputObjs, outputObjs);
125 
126  algoPtr->acceptDHVisitor( &avis );
127 
128  for (auto id : inputObjs) {
129  const std::string& productName = rootInTESName + id.key();
130  info() << " o Input dep for " << productName << endmsg;
131  algoDependencies.insert(id);
132  }
133 
134 
135  } else {
136  info() << "Algorithm " << algoPtr->name() << " has no data dependencies."
137  << endmsg;
138  }
139 
140  }
141  }
142 
143  return StatusCode::SUCCESS;
144 
145 }
146 //---------------------------------------------------------------------------
147 
149  m_tbb_sched.reset();
150 
152  if (!sc.isSuccess())
153  warning () << "Base class could not be finalized" << endmsg;
154  return sc;
155 }
156 
161  std::vector<EventContext*> eventContexts;
162  eventContexts.push_back(eventContext);
163  eventContext->setFail(false);
164  return pushNewEvents(eventContexts);
165 }
166 
168 
169  for(auto evt : eventContexts){
170  if(m_freeSlots.load() > 0){
171  //only one thread executes scheduler --> m_freeSlots can only grow if other thread finishes
172  m_freeSlots--;
173 
174  debug() << "Enqueuing event " << evt->evt() << " @ " << evt->slot() << endmsg;
175 
176  tbb::task* t = new( tbb::task::allocate_root() )
178  tbb::task::enqueue( *t);
179  } else {
180  return StatusCode::FAILURE;
181  }
182  }
183 
184  return StatusCode::SUCCESS;
185 
186 }
187 
188 //---------------------------------------------------------------------------
193 
194  m_finishedEvents.pop(eventContext);
195  debug() << "Popped slot " << eventContext->slot() << "(event "
196  << eventContext->evt() << ")" << endmsg;
197  m_freeSlots++;
198  return StatusCode::SUCCESS;
199 }
200 
201 //---------------------------------------------------------------------------
206  if (m_finishedEvents.try_pop(eventContext)){
207  debug() << "Try Pop successful slot " << eventContext->slot()
208  << "(event " << eventContext->evt() << ")" << endmsg;
209  m_freeSlots++;
210  return StatusCode::SUCCESS;
211  }
212  return StatusCode::FAILURE;
213 }
214 
215 //---------------------------------------------------------------------------
216 
221 
222 //---------------------------------------------------------------------------
223 
225 
226  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
227  const SmartIF<IProperty> appmgr(m_serviceLocator);
229  MsgStream log(messageSvc, "SequentialAlgoExecutionTask");
230  log.activate();
231 
232  StatusCode sc;
233 
234  //initialize control algorithm states and decisions
235  AlgsExecutionStates algStates(m_scheduler->m_algList.size(), messageSvc);
236  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_scheduler->m_algResourcePool.get());
237  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
238 
239  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
240  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
241 
242  //initialize data flow manager
243  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
244 
245  //intitialize context
246  // m_eventContext->m_thread_id = pthread_self();
247  bool eventFailed = false;
249 
250  // loop while algorithms are controlFlowReady and event has not failed
251  while(!eventFailed && algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
252 
253  //std::cout << "[" << m_eventContext->evt() << "] algorithms left" << std::endl;
254 
255  //std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
256 
257  //[&] (IAlgorithm* ialgorithm) {
258  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
259 
260  uint algIndex = *it;
261 
262  std::string algName = m_scheduler->m_algname_vect[algIndex];
263 
264  //promote algorithm to data ready
265  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
266 
267  //std::cout << "Running algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() << std::endl;
268  log << MSG::DEBUG << "Running algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() << endmsg;
269 
270  IAlgorithm* ialgoPtr=nullptr;
271  sc = m_algPool->acquireAlgorithm(algName,ialgoPtr, true); //blocking call
272 
273  if(sc.isFailure() || ialgoPtr == nullptr){
274  log << MSG::ERROR << "Could not acquire algorithm " << algName << endmsg;
275  m_eventContext->setFail(true);
276  } else { // we got an algorithm
277 
278  //promote algorithm to scheduled
279  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
280 
281  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
282  algoPtr->setContext(m_eventContext);
283 
284  // Call the execute() method
285  try {
287  sc = algoPtr->sysExecute();
288  if (UNLIKELY(!sc.isSuccess())) {
289  log << MSG::WARNING << "Execution of algorithm " << algName << " failed" << endmsg;
290  eventFailed = true;
291  }
292  rcg.ignore(); // disarm the guard
293  } catch ( const GaudiException& Exception ) {
294  log << MSG::ERROR << ".executeEvent(): Exception with tag=" << Exception.tag()
295  << " thrown by " << algName << endmsg;
296  log << MSG::ERROR << Exception << endmsg;
297  } catch ( const std::exception& Exception ) {
298  log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by "
299  << algName << endmsg;
300  log << MSG::ERROR << Exception.what() << endmsg;
301  } catch(...) {
302  log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by "
303  << algName << endmsg;
304  }
305 
306  if(sc.isFailure()){
307  eventFailed = true;
308  }
309 
310  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
311  // << (eventFailed ? " failed" : " succeeded") << std::endl;
312  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
313  << (eventFailed ? " failed" : " succeded") << endmsg;
314 
316  if (ialgoPtr->filterPassed()){
317  state = AlgsExecutionStates::State::EVTACCEPTED;
318  } else {
319  state = AlgsExecutionStates::State::EVTREJECTED;
320  }
321 
322  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
323  // << (ialgoPtr->filterPassed() ? " passed" : " rejected") << std::endl;
324  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
325  << (ialgoPtr->filterPassed() ? " passed" : " rejected") << endmsg;
326 
327  sc = m_algPool->releaseAlgorithm(algName,ialgoPtr);
328 
329  algStates.updateState(algIndex,state);
330 
331  //just for debug: look at products -- not thread safe
332  // Update the catalog: some new products may be there
333  /*m_scheduler->m_whiteboard->selectStore(m_eventContext->slot()).ignore();
334 
335  // update prods in the dataflow
336  // DP: Handles could be used. Just update what the algo wrote
337  std::vector<std::string> new_products;
338  m_scheduler->m_whiteboard->getNewDataObjects(new_products).ignore();
339  for (const auto& new_product : new_products)
340  std::cout << "Found in WB: " << new_product << std::endl;
341  //dataFlow.updateDataObjectsCatalog(new_products);*/
342  }
343 
344  }
345  //);
346 
347  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
348  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
349 
350  if(eventFailed){
351  m_eventContext->setFail(eventFailed);
352  //std::cout << "ERROR: " << "event " << m_eventContext->evt() << " failed" << std::endl;
353  break;
354  }
355 
356  if(!algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) && !algStates.allAlgsExecuted()){
357  //std::cout << "WARNING: " << " not all algorithms executed for event " << m_eventContext->evt() << std::endl;
358 
359  /*std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
360 
361  [&] (IAlgorithm* ialgorithm) {
362  uint algIndex = m_scheduler->m_algname_index_map[ialgorithm->name()];
363 
364  if(AlgsExecutionStates::State::SCHEDULED >= algStates.algorithmState(algIndex))
365  std::cout << "Event [" << m_eventContext->evt() << "] algorithm " << ialgorithm->name()
366  << " NOT executed" << std::endl;
367 
368  });*/
369  }
370  }
371 
373 
374  return nullptr;
375 
376 }
#define UNLIKELY(x)
Definition: Kernel.h:126
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:68
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:324
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
virtual tbb::task * execute()
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
StatusCode finalize() override
Definition: Service.cpp:193
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
ContextID_t slot() const
Definition: EventContext.h:41
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::unique_ptr< tbb::task_scheduler_init > m_tbb_sched
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
STL namespace.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:574
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:78
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:319
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
virtual std::list< IAlgorithm * > getTopAlgList()=0
Get top list of algorithms.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
SmartIF< IAlgResourcePool > m_algPool
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T what(T...args)
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode sysExecute() override
The actions to be performed by the algorithm on an event.
Definition: Algorithm.cpp:567
void setFail(const bool &b=true)
Definition: EventContext.h:65
std::string m_whiteboardSvcName
The whiteboard name.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual const std::string & tag() const
name tag for the exception, or exception type
T reset(T...args)
virtual unsigned int freeSlots()
Get free slots number.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:361
STL class.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
virtual const std::string & rootName() const =0
Get Name of root Event.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
T insert(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T size(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:226
SmartIF< ISvcLocator > m_serviceLocator
void ignore()
Definition: RetCodeGuard.h:13
void activate()
Activate MsgStream.
Definition: MsgStream.h:120
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
concurrency::ExecutionFlowManager m_controlFlow
This SchedulerSvc implements the IScheduler interface.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
virtual StatusCode initialize()
Initialise.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
T reserve(T...args)
virtual std::vector< Gaudi::DataHandle * > inputHandles() const override
Definition: Algorithm.h:587
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
T emplace_back(T...args)