SequentialTask Class Reference

#include <src/ParallelSequentialSchedulerSvc.h>

Inheritance diagram for SequentialTask:
Collaboration diagram for SequentialTask:

Public Member Functions

 SequentialTask (ISvcLocator *svcLocator, EventContext *eventContext, ParallelSequentialSchedulerSvc *scheduler, IAlgResourcePool *algPool)
 
virtual tbb::task * execute ()
 

Private Attributes

SmartIF< ISvcLocatorm_serviceLocator
 
EventContextm_eventContext
 
SmartIF< ParallelSequentialSchedulerSvcm_scheduler
 
SmartIF< IAlgResourcePoolm_algPool
 

Detailed Description

Definition at line 116 of file ParallelSequentialSchedulerSvc.h.

Constructor & Destructor Documentation

SequentialTask::SequentialTask ( ISvcLocator svcLocator,
EventContext eventContext,
ParallelSequentialSchedulerSvc scheduler,
IAlgResourcePool algPool 
)
inline

Definition at line 118 of file ParallelSequentialSchedulerSvc.h.

121  :
122 
123  m_serviceLocator(svcLocator),
124  m_eventContext(eventContext),
125  m_scheduler(scheduler),
126  m_algPool(algPool){
127 
128  };
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
SmartIF< IAlgResourcePool > m_algPool
SmartIF< ISvcLocator > m_serviceLocator

Member Function Documentation

tbb::task * SequentialTask::execute ( )
virtual

Definition at line 224 of file ParallelSequentialSchedulerSvc.cpp.

224  {
225 
226  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
227  const SmartIF<IProperty> appmgr(m_serviceLocator);
229  MsgStream log(messageSvc, "SequentialAlgoExecutionTask");
230  log.activate();
231 
232  StatusCode sc;
233 
234  //initialize control algorithm states and decisions
235  AlgsExecutionStates algStates(m_scheduler->m_algList.size(), messageSvc);
236  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_scheduler->m_algResourcePool.get());
237  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
238 
239  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
240  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
241 
242  //initialize data flow manager
243  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
244 
245  //intitialize context
246  // m_eventContext->m_thread_id = pthread_self();
247  bool eventFailed = false;
249 
250  // loop while algorithms are controlFlowReady and event has not failed
251  while(!eventFailed && algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
252 
253  //std::cout << "[" << m_eventContext->evt() << "] algorithms left" << std::endl;
254 
255  //std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
256 
257  //[&] (IAlgorithm* ialgorithm) {
258  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
259 
260  uint algIndex = *it;
261 
262  std::string algName = m_scheduler->m_algname_vect[algIndex];
263 
264  //promote algorithm to data ready
265  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
266 
267  //std::cout << "Running algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() << std::endl;
268  log << MSG::DEBUG << "Running algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() << endmsg;
269 
270  IAlgorithm* ialgoPtr=nullptr;
271  sc = m_algPool->acquireAlgorithm(algName,ialgoPtr, true); //blocking call
272 
273  if(sc.isFailure() || ialgoPtr == nullptr){
274  log << MSG::ERROR << "Could not acquire algorithm " << algName << endmsg;
275  m_eventContext->setFail(true);
276  } else { // we got an algorithm
277 
278  //promote algorithm to scheduled
279  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
280 
281  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
282  algoPtr->setContext(m_eventContext);
283 
284  // Call the execute() method
285  try {
287  sc = algoPtr->sysExecute();
288  if (UNLIKELY(!sc.isSuccess())) {
289  log << MSG::WARNING << "Execution of algorithm " << algName << " failed" << endmsg;
290  eventFailed = true;
291  }
292  rcg.ignore(); // disarm the guard
293  } catch ( const GaudiException& Exception ) {
294  log << MSG::ERROR << ".executeEvent(): Exception with tag=" << Exception.tag()
295  << " thrown by " << algName << endmsg;
296  log << MSG::ERROR << Exception << endmsg;
297  } catch ( const std::exception& Exception ) {
298  log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by "
299  << algName << endmsg;
300  log << MSG::ERROR << Exception.what() << endmsg;
301  } catch(...) {
302  log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by "
303  << algName << endmsg;
304  }
305 
306  if(sc.isFailure()){
307  eventFailed = true;
308  }
309 
310  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
311  // << (eventFailed ? " failed" : " succeeded") << std::endl;
312  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
313  << (eventFailed ? " failed" : " succeded") << endmsg;
314 
316  if (ialgoPtr->filterPassed()){
317  state = AlgsExecutionStates::State::EVTACCEPTED;
318  } else {
319  state = AlgsExecutionStates::State::EVTREJECTED;
320  }
321 
322  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
323  // << (ialgoPtr->filterPassed() ? " passed" : " rejected") << std::endl;
324  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
325  << (ialgoPtr->filterPassed() ? " passed" : " rejected") << endmsg;
326 
327  sc = m_algPool->releaseAlgorithm(algName,ialgoPtr);
328 
329  algStates.updateState(algIndex,state);
330 
331  //just for debug: look at products -- not thread safe
332  // Update the catalog: some new products may be there
333  /*m_scheduler->m_whiteboard->selectStore(m_eventContext->slot()).ignore();
334 
335  // update prods in the dataflow
336  // DP: Handles could be used. Just update what the algo wrote
337  std::vector<std::string> new_products;
338  m_scheduler->m_whiteboard->getNewDataObjects(new_products).ignore();
339  for (const auto& new_product : new_products)
340  std::cout << "Found in WB: " << new_product << std::endl;
341  //dataFlow.updateDataObjectsCatalog(new_products);*/
342  }
343 
344  }
345  //);
346 
347  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
348  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
349 
350  if(eventFailed){
351  m_eventContext->setFail(eventFailed);
352  //std::cout << "ERROR: " << "event " << m_eventContext->evt() << " failed" << std::endl;
353  break;
354  }
355 
356  if(!algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) && !algStates.allAlgsExecuted()){
357  //std::cout << "WARNING: " << " not all algorithms executed for event " << m_eventContext->evt() << std::endl;
358 
359  /*std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
360 
361  [&] (IAlgorithm* ialgorithm) {
362  uint algIndex = m_scheduler->m_algname_index_map[ialgorithm->name()];
363 
364  if(AlgsExecutionStates::State::SCHEDULED >= algStates.algorithmState(algIndex))
365  std::cout << "Event [" << m_eventContext->evt() << "] algorithm " << ialgorithm->name()
366  << " NOT executed" << std::endl;
367 
368  });*/
369  }
370  }
371 
373 
374  return nullptr;
375 
376 }
#define UNLIKELY(x)
Definition: Kernel.h:126
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
ContextID_t slot() const
Definition: EventContext.h:41
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:574
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
SmartIF< IAlgResourcePool > m_algPool
T what(T...args)
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode sysExecute() override
The actions to be performed by the algorithm on an event.
Definition: Algorithm.cpp:567
void setFail(const bool &b=true)
Definition: EventContext.h:65
virtual const std::string & tag() const
name tag for the exception, or exception type
STL class.
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:25
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T size(T...args)
SmartIF< ISvcLocator > m_serviceLocator
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
concurrency::ExecutionFlowManager m_controlFlow
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.

Member Data Documentation

SmartIF<IAlgResourcePool> SequentialTask::m_algPool
private

Definition at line 134 of file ParallelSequentialSchedulerSvc.h.

EventContext* SequentialTask::m_eventContext
private

Definition at line 132 of file ParallelSequentialSchedulerSvc.h.

SmartIF<ParallelSequentialSchedulerSvc> SequentialTask::m_scheduler
private

Definition at line 133 of file ParallelSequentialSchedulerSvc.h.

SmartIF<ISvcLocator> SequentialTask::m_serviceLocator
private

Definition at line 131 of file ParallelSequentialSchedulerSvc.h.


The documentation for this class was generated from the following files: