SequentialTask Class Reference

#include <src/ParallelSequentialSchedulerSvc.h>

Inheritance diagram for SequentialTask:
Collaboration diagram for SequentialTask:

Public Member Functions

 SequentialTask (ISvcLocator *svcLocator, EventContext *eventContext, ParallelSequentialSchedulerSvc *scheduler, IAlgResourcePool *algPool)
 
virtual tbb::task * execute ()
 

Private Attributes

SmartIF< ISvcLocatorm_serviceLocator
 
EventContextm_eventContext
 
SmartIF< ParallelSequentialSchedulerSvcm_scheduler
 
SmartIF< IAlgResourcePoolm_algPool
 

Detailed Description

Definition at line 118 of file ParallelSequentialSchedulerSvc.h.

Constructor & Destructor Documentation

SequentialTask::SequentialTask ( ISvcLocator svcLocator,
EventContext eventContext,
ParallelSequentialSchedulerSvc scheduler,
IAlgResourcePool algPool 
)
inline

Definition at line 120 of file ParallelSequentialSchedulerSvc.h.

123  :
124 
125  m_serviceLocator(svcLocator),
126  m_eventContext(eventContext),
127  m_scheduler(scheduler),
128  m_algPool(algPool){
129 
130  };
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
SmartIF< IAlgResourcePool > m_algPool
SmartIF< ISvcLocator > m_serviceLocator

Member Function Documentation

tbb::task * SequentialTask::execute ( )
virtual

Definition at line 223 of file ParallelSequentialSchedulerSvc.cpp.

223  {
224 
225  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
226  const SmartIF<IProperty> appmgr(m_serviceLocator);
228  MsgStream log(messageSvc, "SequentialAlgoExecutionTask");
229  log.activate();
230 
231  StatusCode sc;
232 
233  //initialize control algorithm states and decisions
234  AlgsExecutionStates algStates(m_scheduler->m_algList.size(), messageSvc);
235  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_scheduler->m_algResourcePool.get());
236  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
237 
238  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
239  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
240 
241  //initialize data flow manager
242  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
243 
244  //intitialize context
245  // m_eventContext->m_thread_id = pthread_self();
246  bool eventFailed = false;
248 
249  // loop while algorithms are controlFlowReady and event has not failed
250  while(!eventFailed && algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
251 
252  //std::cout << "[" << m_eventContext->evt() << "] algorithms left" << std::endl;
253 
254  //std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
255 
256  //[&] (IAlgorithm* ialgorithm) {
257  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
258 
259  uint algIndex = *it;
260 
261  std::string algName = m_scheduler->m_algname_vect[algIndex];
262 
263  //promote algorithm to data ready
264  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
265 
266  //std::cout << "Running algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() << std::endl;
267  log << MSG::DEBUG << "Running algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() << endmsg;
268 
269  IAlgorithm* ialgoPtr=nullptr;
270  sc = m_algPool->acquireAlgorithm(algName,ialgoPtr, true); //blocking call
271 
272  if(sc.isFailure() || ialgoPtr == nullptr){
273  log << MSG::ERROR << "Could not acquire algorithm " << algName << endmsg;
274  m_eventContext->setFail(true);
275  } else { // we got an algorithm
276 
277  //promote algorithm to scheduled
278  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
279 
280  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
281  algoPtr->setContext(m_eventContext);
282 
283  // Call the execute() method
284  try {
286  sc = algoPtr->sysExecute();
287  if (UNLIKELY(!sc.isSuccess())) {
288  log << MSG::WARNING << "Execution of algorithm " << algName << " failed" << endmsg;
289  eventFailed = true;
290  }
291  rcg.ignore(); // disarm the guard
292  } catch ( const GaudiException& Exception ) {
293  log << MSG::ERROR << ".executeEvent(): Exception with tag=" << Exception.tag()
294  << " thrown by " << algName << endmsg;
295  log << MSG::ERROR << Exception << endmsg;
296  } catch ( const std::exception& Exception ) {
297  log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by "
298  << algName << endmsg;
299  log << MSG::ERROR << Exception.what() << endmsg;
300  } catch(...) {
301  log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by "
302  << algName << endmsg;
303  }
304 
305  if(sc.isFailure()){
306  eventFailed = true;
307  }
308 
309  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
310  // << (eventFailed ? " failed" : " succeeded") << std::endl;
311  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
312  << (eventFailed ? " failed" : " succeded") << endmsg;
313 
315  if (ialgoPtr->filterPassed()){
316  state = AlgsExecutionStates::State::EVTACCEPTED;
317  } else {
318  state = AlgsExecutionStates::State::EVTREJECTED;
319  }
320 
321  //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt()
322  // << (ialgoPtr->filterPassed() ? " passed" : " rejected") << std::endl;
323  log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt()
324  << (ialgoPtr->filterPassed() ? " passed" : " rejected") << endmsg;
325 
326  sc = m_algPool->releaseAlgorithm(algName,ialgoPtr);
327 
328  algStates.updateState(algIndex,state);
329 
330  //just for debug: look at products -- not thread safe
331  // Update the catalog: some new products may be there
332  /*m_scheduler->m_whiteboard->selectStore(m_eventContext->slot()).ignore();
333 
334  // update prods in the dataflow
335  // DP: Handles could be used. Just update what the algo wrote
336  std::vector<std::string> new_products;
337  m_scheduler->m_whiteboard->getNewDataObjects(new_products).ignore();
338  for (const auto& new_product : new_products)
339  std::cout << "Found in WB: " << new_product << std::endl;
340  //dataFlow.updateDataObjectsCatalog(new_products);*/
341  }
342 
343  }
344  //);
345 
346  m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions);
347  m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
348 
349  if(eventFailed){
350  m_eventContext->setFail(eventFailed);
351  //std::cout << "ERROR: " << "event " << m_eventContext->evt() << " failed" << std::endl;
352  break;
353  }
354 
355  if(!algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) && !algStates.allAlgsExecuted()){
356  //std::cout << "WARNING: " << " not all algorithms executed for event " << m_eventContext->evt() << std::endl;
357 
358  /*std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(),
359 
360  [&] (IAlgorithm* ialgorithm) {
361  uint algIndex = m_scheduler->m_algname_index_map[ialgorithm->name()];
362 
363  if(AlgsExecutionStates::State::SCHEDULED >= algStates.algorithmState(algIndex))
364  std::cout << "Event [" << m_eventContext->evt() << "] algorithm " << ialgorithm->name()
365  << " NOT executed" << std::endl;
366 
367  });*/
368  }
369  }
370 
372 
373  return nullptr;
374 
375 }
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
SmartIF< ParallelSequentialSchedulerSvc > m_scheduler
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
SmartIF< IAlgResourcePool > m_algPool
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode sysExecute() override
The actions to be performed by the algorithm on an event.
Definition: Algorithm.cpp:652
long int evt() const
Definition: EventContext.h:37
void setFail(const bool &b=true)
Definition: EventContext.h:54
virtual const std::string & tag() const
name tag for the exception, or exception type
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
ID_type slot() const
Definition: EventContext.h:38
#define UNLIKELY(x)
Definition: Kernel.h:126
SmartIF< ISvcLocator > m_serviceLocator
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
concurrency::ExecutionFlowManager m_controlFlow
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.

Member Data Documentation

SmartIF<IAlgResourcePool> SequentialTask::m_algPool
private

Definition at line 136 of file ParallelSequentialSchedulerSvc.h.

EventContext* SequentialTask::m_eventContext
private

Definition at line 134 of file ParallelSequentialSchedulerSvc.h.

SmartIF<ParallelSequentialSchedulerSvc> SequentialTask::m_scheduler
private

Definition at line 135 of file ParallelSequentialSchedulerSvc.h.

SmartIF<ISvcLocator> SequentialTask::m_serviceLocator
private

Definition at line 133 of file ParallelSequentialSchedulerSvc.h.


The documentation for this class was generated from the following files: