All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ParallelSequentialSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
2 #include "GaudiKernel/SvcFactory.h"
3 #include "GaudiKernel/IAlgorithm.h"
4 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
5 #include "GaudiKernel/IProperty.h"
6 #include "GaudiKernel/AppReturnCode.h"
7 #include "GaudiKernel/CommonMessaging.h"
8 #include "GaudiKernel/IDataManagerSvc.h"
9 
10 #include "GaudiKernel/ContextSpecificPtr.h"
11 
12 // C++
13 #include <list>
14 #include <thread>
15 
16 // Local
18 #include "AlgResourcePool.h"
19 #include "RetCodeGuard.h"
20 
21 // Instantiation of a static factory class used by clients to create instances of this service
23 
24 //===========================================================================
25 // Infrastructure methods
26 
28  base_class(name,svcLoc) {
29 
30  // Will disappear when dependencies are properly propagated into the C++ code of the algos
31  declareProperty("AlgosDependencies", m_algosDependencies);
32  declareProperty("UseTopAlgList", m_useTopAlgList = false);
33  declareProperty("ThreadPoolSize", m_threadPoolSize = -1);
34  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc");
35 
36 }
37 
38 //---------------------------------------------------------------------------
40 //---------------------------------------------------------------------------
41 
43 
44  // Initialise mother class (read properties, ...)
46  if (!sc.isSuccess())
47  warning () << "Base class could not be initialized" << endmsg;
48 
49  // Get the algo resource pool
50  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
51  if (!m_algResourcePool.isValid()){
52  error() << "Error retrieving AlgResourcePool" << endmsg;
53  return StatusCode::FAILURE;
54  }
55 
56  // Get the list of algorithms
58  info() << "Found " << m_algList.size() << " algorithms" << endmsg;
59 
60  // Get Whiteboard
61  m_whiteboard = serviceLocator()->service(m_whiteboardSvcName);
62  if (!m_whiteboard.isValid())
63  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
64 
65  // Check the MaxEventsInFlight parameters and react
66  // Deprecated for the moment
67  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
68 
69  // Set the number of free slots
70  m_freeSlots=numberOfWBSlots;
71 
72  info() << "Allowing " << m_freeSlots << " events in flight" << endmsg;
73 
74  if(m_threadPoolSize == -1)
75  m_threadPoolSize = numberOfWBSlots;
76 
77  debug() << "Initialising a TBB thread pool of size " << m_threadPoolSize << endmsg;
78  m_tbb_sched.reset(new tbb::task_scheduler_init(m_threadPoolSize));
79 
80  // Fill the containers to convert algo names to index
81  m_algname_index_map.reserve(m_algList.size());
82  m_algname_vect.reserve(m_algList.size());
83  unsigned int index=0;
84  for (IAlgorithm* algo : m_algList){
85  const std::string& name = algo->name();
87  m_algname_vect.emplace_back(name);
88  index++;
89  }
90 
91  //initialize control flow manager
92  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
93 
95 
96  const unsigned int algosDependenciesSize=m_algosDependencies.size();
97  info() << "Algodependecies size is " << algosDependenciesSize << endmsg;
98 
99  //get algorithm dependencies
100  /* Dependencies
101  0) Read deps from config file
102  1) Look for handles in algo, if none
103  2) Assume none are required
104  */
105  if (algosDependenciesSize == 0){
106  // Get the event root from the IDataManagerSvc interface of the WhiteBoard
108  std::string rootInTESName(dataMgrSvc->rootName());
109  if ("" != rootInTESName && '/'!=rootInTESName[rootInTESName.size()-1]){
110  rootInTESName = rootInTESName+"/";
111  }
112 
113  for (IAlgorithm* ialgoPtr : m_algList){
114  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr);
115  if (nullptr == algoPtr){
116  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
117  }
118 
119 #pragma GCC diagnostic push
120 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
121  const std::vector<MinimalDataObjectHandle*>& algoHandles(algoPtr->handles());
122 #pragma GCC diagnostic pop
123  std::vector<std::string> algoDependencies;
124  if (!algoHandles.empty()){
125 
126  info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg;
127  for (MinimalDataObjectHandle* handlePtr : algoHandles ){
128  if (handlePtr->accessType() == MinimalDataObjectHandle::AccessType::READ){
129  const std::string& productName = rootInTESName + handlePtr->dataProductName();
130  info() << " o READ Handle found for product " << productName << endmsg;
131  algoDependencies.emplace_back(productName);
132  }
133  }
134  } else {
135  info() << "Algorithm " << algoPtr->name() << " has no data dependencies." << endmsg;
136  }
137 
138  m_algosDependencies.emplace_back(algoDependencies);
139  }
140  }
141 
142  return StatusCode::SUCCESS;
143 
144 }
145 //---------------------------------------------------------------------------
146 
148  m_tbb_sched.reset();
149 
151  if (!sc.isSuccess())
152  warning () << "Base class could not be finalized" << endmsg;
153  return sc;
154 }
155 
160  std::vector<EventContext*> eventContexts;
161  eventContexts.push_back(eventContext);
162  eventContext->setFail(false);
163  return pushNewEvents(eventContexts);
164 }
165 
166 StatusCode ParallelSequentialSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){
167 
168  for(auto evt : eventContexts){
169  if(m_freeSlots.load() > 0){
170  //only one thread executes scheduler --> m_freeSlots can only grow if other thread finishes
171  m_freeSlots--;
172 
173  debug() << "Enqueuing event " << evt->evt() << " @ " << evt->slot() << endmsg;
174 
175  tbb::task* t = new( tbb::task::allocate_root() )
176  SequentialTask(serviceLocator(), evt, this, m_algResourcePool);
177  tbb::task::enqueue( *t);
178  } else {
179  return StatusCode::FAILURE;
180  }
181  }
182 
183  return StatusCode::SUCCESS;
184 
185 }
186 
187 //---------------------------------------------------------------------------
192 
193  m_finishedEvents.pop(eventContext);
194  debug() << "Popped slot " << eventContext->slot() << "(event "
195  << eventContext->evt() << ")" << endmsg;
196  m_freeSlots++;
197  return StatusCode::SUCCESS;
198 }
199 
200 //---------------------------------------------------------------------------
205  if (m_finishedEvents.try_pop(eventContext)){
206  debug() << "Try Pop successful slot " << eventContext->slot()
207  << "(event " << eventContext->evt() << ")" << endmsg;
208  m_freeSlots++;
209  return StatusCode::SUCCESS;
210  }
211  return StatusCode::FAILURE;
212 }
213 
214 //---------------------------------------------------------------------------
215 
220 
221 //---------------------------------------------------------------------------
222 
224 
225  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
226  const SmartIF<IProperty> appmgr(m_serviceLocator);
228  MsgStream log(messageSvc, "SequentialAlgoExecutionTask");
229  log.activate();
230 
231  StatusCode sc;
232 
233  //initialize control algorithm states and decisions
234  AlgsExecutionStates algStates(m_scheduler->m_algList.size(), messageSvc);
235  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_scheduler->m_algResourcePool.get());
236  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
237 
238  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
239  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
240 
241  //initialize data flow manager
242  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
243 
244  //intitialize context
245  // m_eventContext->m_thread_id = pthread_self();
246  bool eventFailed = false;
248 
249  // loop while algorithms are controlFlowReady and event has not failed
250  while(!eventFailed && algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
251 
252  //std::cout << "[" << m_eventContext->evt() << "] algorithms left" << std::endl;
253 
254  //std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
255 
256  //[&] (IAlgorithm* ialgorithm) {
257  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
258 
259  uint algIndex = *it;
260 
261  std::string algName = m_scheduler->m_algname_vect[algIndex];
262 
263  //promote algorithm to data ready
264  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
265 
266  //std::cout << "Running algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() << std::endl;
267  log << MSG::DEBUG << "Running algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() << endmsg;
268 
269  IAlgorithm* ialgoPtr=nullptr;
270  sc = m_algPool->acquireAlgorithm(algName,ialgoPtr, true); //blocking call
271 
272  if(sc.isFailure() || ialgoPtr == nullptr){
273  log << MSG::ERROR << "Could not acquire algorithm " << algName << endmsg;
274  m_eventContext->setFail(true);
275  } else { // we got an algorithm
276 
277  //promote algorithm to scheduled
278  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
279 
280  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
281  algoPtr->setContext(m_eventContext);
282 
283  // Call the execute() method
284  try {
286  sc = algoPtr->sysExecute();
287  if (UNLIKELY(!sc.isSuccess())) {
288  log << MSG::WARNING << "Execution of algorithm " << algName << " failed" << endmsg;
289  eventFailed = true;
290  }
291  rcg.ignore(); // disarm the guard
292  } catch ( const GaudiException& Exception ) {
293  log << MSG::ERROR << ".executeEvent(): Exception with tag=" << Exception.tag()
294  << " thrown by " << algName << endmsg;
295  log << MSG::ERROR << Exception << endmsg;
296  } catch ( const std::exception& Exception ) {
297  log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by "
298  << algName << endmsg;
299  log << MSG::ERROR << Exception.what() << endmsg;
300  } catch(...) {
301  log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by "
302  << algName << endmsg;
303  }
304 
305  if(sc.isFailure()){
306  eventFailed = true;
307  }
308 
309  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
310  // << (eventFailed ? " failed" : " succeeded") << std::endl;
311  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
312  << (eventFailed ? " failed" : " succeded") << endmsg;
313 
315  if (ialgoPtr->filterPassed()){
316  state = AlgsExecutionStates::State::EVTACCEPTED;
317  } else {
318  state = AlgsExecutionStates::State::EVTREJECTED;
319  }
320 
321  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
322  // << (ialgoPtr->filterPassed() ? " passed" : " rejected") << std::endl;
323  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
324  << (ialgoPtr->filterPassed() ? " passed" : " rejected") << endmsg;
325 
326  sc = m_algPool->releaseAlgorithm(algName,ialgoPtr);
327 
328  algStates.updateState(algIndex,state);
329 
330  //just for debug: look at products -- not thread safe
331  // Update the catalog: some new products may be there
332  /*m_scheduler->m_whiteboard->selectStore(m_eventContext->slot()).ignore();
333 
334  // update prods in the dataflow
335  // DP: Handles could be used. Just update what the algo wrote
336  std::vector<std::string> new_products;
337  m_scheduler->m_whiteboard->getNewDataObjects(new_products).ignore();
338  for (const auto& new_product : new_products)
339  std::cout << "Found in WB: " << new_product << std::endl;
340  //dataFlow.updateDataObjectsCatalog(new_products);*/
341  }
342 
343  }
344  //);
345 
346  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
347  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
348 
349  if(eventFailed){
350  m_eventContext->setFail(eventFailed);
351  //std::cout << "ERROR: " << "event " << m_eventContext->evt() << " failed" << std::endl;
352  break;
353  }
354 
355  if(!algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) && !algStates.allAlgsExecuted()){
356  //std::cout << "WARNING: " << " not all algorithms executed for event " << m_eventContext->evt() << std::endl;
357 
358  /*std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
359 
360  [&] (IAlgorithm* ialgorithm) {
361  uint algIndex = m_scheduler->m_algname_index_map[ialgorithm->name()];
362 
363  if(AlgsExecutionStates::State::SCHEDULED >= algStates.algorithmState(algIndex))
364  std::cout << "Event [" << m_eventContext->evt() << "] algorithm " << ialgorithm->name()
365  << " NOT executed" << std::endl;
366 
367  });*/
368  }
369  }
370 
372 
373  return nullptr;
374 
375 }
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:63
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
virtual tbb::task * execute()
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::unique_ptr< tbb::task_scheduler_init > m_tbb_sched
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
STL namespace.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
virtual std::list< IAlgorithm * > getTopAlgList()=0
Get top list of algorithms.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
SmartIF< IAlgResourcePool > m_algPool
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode sysExecute() override
The actions to be performed by the algorithm on an event.
Definition: Algorithm.cpp:652
long int evt() const
Definition: EventContext.h:37
void setFail(const bool &b=true)
Definition: EventContext.h:54
std::string m_whiteboardSvcName
The whiteboard name.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual const std::string & tag() const
name tag for the exception, or exception type
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:354
virtual unsigned int freeSlots()
Get free slots number.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual const std::string & rootName() const =0
Get Name of root Event.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
ID_type slot() const
Definition: EventContext.h:38
#define UNLIKELY(x)
Definition: Kernel.h:126
SmartIF< ISvcLocator > m_serviceLocator
void ignore()
Definition: RetCodeGuard.h:13
void activate()
Activate MsgStream.
Definition: MsgStream.h:120
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
concurrency::ExecutionFlowManager m_controlFlow
This SchedulerSvc implements the IScheduler interface.
virtual StatusCode initialize()
Initialise.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.