All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
HiveEventLoopMgr.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 #include <tuple>
3 
4 #include "GaudiKernel/SmartIF.h"
5 #include "GaudiKernel/Incident.h"
6 #include "GaudiKernel/MsgStream.h"
7 #include "GaudiKernel/SvcFactory.h"
8 #include "GaudiKernel/DataObject.h"
9 #include "GaudiKernel/IAlgManager.h"
10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IDataManagerSvc.h"
13 #include "GaudiKernel/IDataProviderSvc.h"
14 #include "GaudiKernel/IConversionSvc.h"
15 #include "GaudiKernel/AppReturnCode.h"
16 #include "GaudiKernel/DataSvc.h"
17 
18 #include "HistogramAgent.h"
19 
20 // For concurrency
23 
24 #include "tbb/task_scheduler_init.h"
25 #include "tbb/task.h"
26 #include "tbb/tick_count.h"
27 
28 #include "GaudiKernel/EventContext.h"
29 #include "GaudiKernel/Algorithm.h"
30 
31 #include <GaudiKernel/GaudiException.h>
32 
33 #include <pthread.h> // only for the tID!
34 
35 
36 #include <sys/resource.h>
37 #include <sys/times.h>
38 
39 // Instantiation of a static factory class used by clients to create instances of this service
41 
42 
43 #define ON_DEBUG if (UNLIKELY(outputLevel() <= MSG::DEBUG))
44 #define ON_VERBOSE if (UNLIKELY(outputLevel() <= MSG::VERBOSE))
45 
46 #define DEBMSG ON_DEBUG debug()
47 #define VERMSG ON_VERBOSE verbose()
48 
51 class HiveAlgoTask : public tbb::task {
52 public:
55  HiveEventLoopMgr* eventloopmanager): m_algorithm(algorithm),
56  m_scheduler(scheduler),
57  m_eventloopmanager(eventloopmanager){};
58  tbb::task* execute();
62 };
63 
64 tbb::task* HiveAlgoTask::execute() {
65  // Algorithm* this_algo = dynamic_cast<Algorithm*>(m_algorithm);
66  // this_algo->getContext()->m_thread_id = pthread_self();
69  // put back the algo into the hive algorithm manager
70  m_eventloopmanager->taskFinished(m_algorithm); // TODO do this with index: put index in context?
71  return NULL;
72 }
73 
75 
76 //--------------------------------------------------------------------------------------------
77 // Standard Constructor
78 //--------------------------------------------------------------------------------------------
79 HiveEventLoopMgr::HiveEventLoopMgr(const std::string& nam, ISvcLocator* svcLoc)
80 : MinimalEventLoopMgr(nam, svcLoc)
81 {
83  m_histoPersSvc = 0;
84  m_evtDataMgrSvc = 0;
85  m_evtDataSvc = 0;
86  m_evtSelector = 0;
87  m_evtContext = 0;
89  m_endEventFired = true;
91  m_max_parallel = 1;
92  m_evts_parallel = 1;
93  m_num_threads = 1;
94  m_DumpQueues = true;
95  m_nProducts = 0;
96 
97  // Declare properties
98  declareProperty("HistogramPersistency", m_histPersName = "");
99  declareProperty("EvtSel", m_evtsel );
100  declareProperty("Warnings",m_warnings=true,
101  "Set this property to false to suppress warning messages");
102  declareProperty("MaxAlgosParallel", m_max_parallel );
103  declareProperty("MaxEventsParallel", m_evts_parallel);
104  declareProperty("NumThreads", m_num_threads);
105  declareProperty("DumpQueues", m_DumpQueues= false);
106  declareProperty("CloneAlgorithms", m_CloneAlgorithms= false);
107  declareProperty("AlgosDependencies", m_AlgosDependencies);
108 }
109 
110 //--------------------------------------------------------------------------------------------
111 // Standard Destructor
112 //--------------------------------------------------------------------------------------------
119  if( m_evtContext ) delete m_evtContext;
120 }
121 
122 //--------------------------------------------------------------------------------------------
123 // implementation of IAppMgrUI::initialize
124 //--------------------------------------------------------------------------------------------
126  // Initialize the base class
128  if( !sc.isSuccess() ) {
129  DEBMSG << "Error Initializing base class MinimalEventLoopMgr." << endmsg;
130  return sc;
131  }
132 
134 
135  // Setup access to event data services
136  m_evtDataMgrSvc = serviceLocator()->service("EventDataSvc");
137  if( !m_evtDataMgrSvc.isValid() ) {
138  fatal() << "Error retrieving EventDataSvc interface IDataManagerSvc." << endmsg;
139  return StatusCode::FAILURE;
140  }
141  m_evtDataSvc = serviceLocator()->service("EventDataSvc");
142  if( !m_evtDataSvc.isValid() ) {
143  fatal() << "Error retrieving EventDataSvc interface IDataProviderSvc." << endmsg;
144  return StatusCode::FAILURE;
145  }
146  m_whiteboard = serviceLocator()->service("EventDataSvc");
147  if( !m_whiteboard.isValid() ) {
148  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
149  return StatusCode::FAILURE;
150  }
152 
153  // Obtain the IProperty of the ApplicationMgr
154  m_appMgrProperty = serviceLocator();
155  if ( ! m_appMgrProperty.isValid() ) {
156  fatal() << "IProperty interface not found in ApplicationMgr." << endmsg;
157  return StatusCode::FAILURE;
158  }
159 
160  // We do not expect a Event Selector necessarily being declared
161  setProperty(m_appMgrProperty->getProperty("EvtSel")).ignore();
162 
163  if( m_evtsel != "NONE" || m_evtsel.length() == 0) {
164  m_evtSelector = serviceLocator()->service("EventSelector");
165  if( m_evtSelector.isValid() ) {
166  // Setup Event Selector
168  if( !sc.isSuccess() ) {
169  fatal() << "Can not create the event selector Context." << endmsg;
170  return sc;
171  }
172  }
173  else {
174  fatal() << "EventSelector not found." << endmsg;
175  return sc;
176  }
177  }
178  else {
179  m_evtSelector = 0;
180  m_evtContext = 0;
181  if ( m_warnings ) {
182  warning() << "Unable to locate service \"EventSelector\" " << endmsg;
183  warning() << "No events will be processed from external input." << endmsg;
184  }
185  }
186 
187  // Setup access to histogramming services
188  m_histoDataMgrSvc = serviceLocator()->service("HistogramDataSvc");
189  if( !m_histoDataMgrSvc.isValid() ) {
190  fatal() << "Error retrieving HistogramDataSvc." << endmsg;
191  return sc;
192  }
193  // Setup histogram persistency
194  m_histoPersSvc = serviceLocator()->service("HistogramPersistencySvc");
195  if( !m_histoPersSvc.isValid() ) {
196  warning() << "Histograms cannot not be saved - though required." << endmsg;
197  return sc;
198  }
199 
200  // Setup algorithm resource pool
201  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
202  if( !m_algResourcePool.isValid() ) {
203  fatal() << "Error retrieving AlgResourcePool" << endmsg;
204  return StatusCode::FAILURE;
205  }
206 
207  // Setup tbb task scheduler
208  // TODO: shouldn't be in this case
209  // One more for the current thread
210  m_tbb_scheduler_init = new tbb::task_scheduler_init(m_num_threads+1);
211 
212  return StatusCode::SUCCESS;
213 }
214 //--------------------------------------------------------------------------------------------
215 // implementation of IService::reinitialize
216 //--------------------------------------------------------------------------------------------
218 
219  // Initialize the base class
221  if( !sc.isSuccess() ) {
222  DEBMSG << "Error Initializing base class MinimalEventLoopMgr." << endmsg;
223  return sc;
224  }
225 
226  // Check to see whether a new Event Selector has been specified
228  if( m_evtsel != "NONE" || m_evtsel.length() == 0) {
229  SmartIF<IService> theSvc(serviceLocator()->service("EventSelector"));
230  SmartIF<IEvtSelector> theEvtSel(theSvc);
231  if( theEvtSel.isValid() && ( theEvtSel.get() != m_evtSelector.get() ) ) {
232  // Setup Event Selector
233  if ( m_evtSelector.get() && m_evtContext ) {
234  // Need to release context before switching to new event selector
236  m_evtContext = 0;
237  }
238  m_evtSelector = theEvtSel;
239  if (theSvc->FSMState() == Gaudi::StateMachine::INITIALIZED) {
240  sc = theSvc->reinitialize();
241  if( !sc.isSuccess() ) {
242  error() << "Failure Reinitializing EventSelector "
243  << theSvc->name( ) << endmsg;
244  return sc;
245  }
246  }
247  else {
248  sc = theSvc->sysInitialize();
249  if( !sc.isSuccess() ) {
250  error() << "Failure Initializing EventSelector "
251  << theSvc->name( ) << endmsg;
252  return sc;
253  }
254  }
256  if( !sc.isSuccess() ) {
257  error() << "Can not create Context " << theSvc->name( ) << endmsg;
258  return sc;
259  }
260  info() << "EventSelector service changed to "
261  << theSvc->name( ) << endmsg;
262  }
263  else if ( m_evtSelector.isValid() ) {
264  if ( m_evtContext ) {
266  m_evtContext = 0;
267  }
269  if( !sc.isSuccess() ) {
270  error() << "Can not create Context " << theSvc->name( ) << endmsg;
271  return sc;
272  }
273  }
274  }
275  else if ( m_evtSelector.isValid() && m_evtContext ) {
277  m_evtSelector = 0;
278  m_evtContext = 0;
279  }
280  return StatusCode::SUCCESS;
281 }
282 
283 
284 //--------------------------------------------------------------------------------------------
285 // implementation of IService::stop
286 //--------------------------------------------------------------------------------------------
288  if ( ! m_endEventFired ) {
289  // Fire pending EndEvent incident
290  m_incidentSvc->fireIncident(Incident(name(),IncidentType::EndEvent));
291  m_endEventFired = true;
292  }
293  return MinimalEventLoopMgr::stop();
294 }
295 
296 //--------------------------------------------------------------------------------------------
297 // implementation of IAppMgrUI::finalize
298 //--------------------------------------------------------------------------------------------
300  StatusCode sc;
301 
302  // Finalize base class
304  if (! sc.isSuccess()) {
305  error() << "Error finalizing base class" << endmsg;
306  return sc;
307  }
308 
309  // Save Histograms Now
310  if ( m_histoPersSvc != 0 ) {
311  HistogramAgent agent;
312  sc = m_histoDataMgrSvc->traverseTree( &agent );
313  if( sc.isSuccess() ) {
314  IDataSelector* objects = agent.selectedObjects();
315  // skip /stat entry!
316  if ( objects->size() > 0 ) {
317  IDataSelector::iterator i;
318  for ( i = objects->begin(); i != objects->end(); i++ ) {
319  IOpaqueAddress* pAddr = 0;
320  StatusCode iret = m_histoPersSvc->createRep(*i, pAddr);
321  if ( iret.isSuccess() ) {
322  (*i)->registry()->setAddress(pAddr);
323  }
324  else {
325  sc = iret;
326  }
327  }
328  for ( i = objects->begin(); i != objects->end(); i++ ) {
329  IRegistry* reg = (*i)->registry();
330  StatusCode iret = m_histoPersSvc->fillRepRefs(reg->address(), *i);
331  if ( !iret.isSuccess() ) {
332  sc = iret;
333  }
334  }
335  }
336  if ( sc.isSuccess() ) {
337  info() << "Histograms converted successfully according to request." << endmsg;
338  }
339  else {
340  error() << "Error while saving Histograms." << endmsg;
341  }
342  }
343  else {
344  error() << "Error while traversing Histogram data store" << endmsg;
345  }
346  }
347 
348  // Release event selector context
349  if ( m_evtSelector && m_evtContext ) {
351  m_evtContext = 0;
352  }
353 
354  // Release all interfaces...
355  m_histoDataMgrSvc = 0;
356  m_histoPersSvc = 0;
357 
358  m_evtSelector = 0;
359  m_evtDataSvc = 0;
360  m_evtDataMgrSvc = 0;
361 
362  delete m_tbb_scheduler_init;
363 
364  return StatusCode::SUCCESS;
365 }
366 
367 //--------------------------------------------------------------------------------------------
368 // implementation of executeEvent(void* par)
369 //--------------------------------------------------------------------------------------------
371 
372  // Fire BeginEvent "Incident"
373  m_incidentSvc->fireIncident(Incident(name(),IncidentType::BeginEvent));
374  // An incident may schedule a stop, in which case is better to exit before the actual execution.
375  if ( m_scheduledStop ) {
376  always() << "Terminating event processing loop due to a stop scheduled by an incident listener" << endmsg;
377  return StatusCode::SUCCESS;
378  }
379 
380  // Execute Algorithms
381  m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginProcessing));
382 
383  // Prepare the event context for concurrency
384 
385 
386  // Call the resetExecuted() method of ALL "known" algorithms
387  // (before we were reseting only the topalgs)
388  SmartIF<IAlgManager> algMan(serviceLocator());
389  if (LIKELY(algMan.isValid())) {
390  for(auto ialg: algMan->getAlgorithms()) {
391  if (LIKELY(0 != ialg)) ialg->resetExecuted();
392  }
393  }
394 
395  bool eventfailed = false;//run_parallel();
396 
397  // ensure that the abortEvent flag is cleared before the next event
398  if (UNLIKELY(m_abortEvent)) {
399  DEBMSG << "AbortEvent incident fired by " << m_abortEventSource << endmsg;
400  m_abortEvent = false;
401  }
402 
403  // Call the execute() method of all output streams
404  for (ListAlg::iterator ito = m_outStreamList.begin(); ito != m_outStreamList.end(); ito++ ) {
405  (*ito)->resetExecuted();
406  StatusCode sc;
407  sc = (*ito)->sysExecute();
408  if (UNLIKELY(!sc.isSuccess())) {
409  warning() << "Execution of output stream " << (*ito)->name() << " failed" << endmsg;
410  eventfailed = true;
411  }
412  }
413 
414  m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing));
415 
416  // Check if there was an error processing current event
417  if (UNLIKELY(eventfailed)){
418  error() << "Error processing event loop." << endmsg;
419  return StatusCode(StatusCode::FAILURE,true);
420  }
421  return StatusCode(StatusCode::SUCCESS,true);
422 
423 }
424 
425 //--------------------------------------------------------------------------------------------
426 // implementation of IEventProcessing::executeRun
427 //--------------------------------------------------------------------------------------------
429  StatusCode sc;
430  // initialize the base class
431  sc = MinimalEventLoopMgr::executeRun(maxevt);
432  return sc;
433 }
434 
435 //--------------------------------------------------------------------------------------------
436 // implementation of IAppMgrUI::nextEvent
437 //--------------------------------------------------------------------------------------------
438 // Here the loop on the events takes place.
439 // This is also the natural place to put the preparation of the algorithms
440 // contexts, which contain the event specific data.
441 
443  // Collapse executeEvent and run_parallel in the same method
444  // TODO _very_ sporty on conditions and checks!!
445 
446  auto start_time = tbb::tick_count::now();
447  auto secsFromStart = [&start_time]()->double{
448  return (tbb::tick_count::now()-start_time).seconds();
449  };
450 
451  typedef std::tuple<EventContext*,EventSchedulingState*> contextSchedState_tuple;
452 
453  MsgStream log(msgSvc(), name());
454 
455 
456  // Reset the application return code.
458 
459  // Lambda to check if an event has finished
460  auto has_finished = [] // acquire nothing
461  (contextSchedState_tuple evtContext_evtstate) // argument is a tuple
462  { return std::get<1>(evtContext_evtstate)->hasFinished();}; // true if finished
463 
464  // Useful for the Logs
465  always() << "Running with "
466  << m_evts_parallel << " parallel events, "
467  << m_max_parallel << " max concurrent algorithms, "
468  << m_num_threads << " threads."
469  << endmsg;
470 
471  int n_processed_events = 0;
472  bool eof = false;
473  StatusCode sc;
474 
475  // Events in flight
476  std::list<contextSchedState_tuple> events_in_flight;
477 
478  // Loop until no more evts are there
479 
480  while( maxevt == -1 ? !eof : n_processed_events < maxevt ){// TODO Fix the condition in case of -1
481 
482  const unsigned int n_events_in_flight = events_in_flight.size();
483  const unsigned int n_evts_to_process = maxevt - n_processed_events - n_events_in_flight;
484 
485  unsigned int n_acquirable_events = m_evts_parallel - n_events_in_flight ;
486  if (n_acquirable_events > n_evts_to_process)
487  n_acquirable_events = n_evts_to_process;
488 
489  log << MSG::INFO << "Evts in flight: " << n_events_in_flight << endmsg;
490  log << MSG::INFO << "Evts processed: " << n_processed_events<< endmsg;
491  log << MSG::INFO << "Evts parallel: " << m_evts_parallel << endmsg;
492  log << MSG::INFO << "Acquirable Events are " << n_acquirable_events << endmsg;
493 
494  // Initialisation section ------------------------------------------------
495 
496  // Loop on events to be initialised
497  for (unsigned int offset=0; offset< n_acquirable_events; ++offset){
498 
499  EventContext* evtContext(new EventContext);
500  const int evt_num = n_processed_events + offset + n_events_in_flight;
501  evtContext->set(evt_num, m_whiteboard->allocateStore(evt_num) );
502  m_whiteboard->selectStore(evtContext->slot()).ignore();
503 
504  if( m_evtContext ) {
505  //---This is the "event iterator" context from EventSelector
506  IOpaqueAddress* pAddr = 0;
507  sc = getEventRoot(pAddr);
508  if( !sc.isSuccess() ) {
509  info() << "No more events in event selection " << endmsg;
510  eof = true;
511  maxevt = evt_num; // Set the maxevt to the determined maximum
512  break;
513  }
514  sc = m_evtDataMgrSvc->setRoot ("/Event", pAddr);
515  if( !sc.isSuccess() ) {
516  warning() << "Error declaring event root address." << endmsg;
517  }
518  }
519  else {
520  //---In case of no event selector----------------
521  sc = m_evtDataMgrSvc->setRoot ("/Event", new DataObject());
522  if( !sc.isSuccess() ) {
523  warning() << "Error declaring event root DataObject" << endmsg;
524  }
525  }
526 
528  events_in_flight.push_back(std::make_tuple(evtContext,event_state));
529  info() << "Started event " << evt_num << " at " << secsFromStart() << endmsg;
530 
531  }// End initialisation loop on acquired events
532 
533  // End initialisation section --------------------------------------------
534 
535  // Scheduling section ----------------------------------------------------
536  auto in_flight_end = events_in_flight.end();
537  auto in_flight_begin = events_in_flight.begin();
538  // loop until at least one evt finished
539  while (in_flight_end == find_if(in_flight_begin, in_flight_end ,has_finished)){
540  bool no_algo_can_run = true;
541  for (auto& evtContext_evtstate : events_in_flight){ // loop on evts
542 
543  EventContext* event_Context = std::get<0>(evtContext_evtstate);
544  EventSchedulingState* event_state = std::get<1>(evtContext_evtstate);
545 
546  for (unsigned int algo_counter=0; algo_counter<m_topAlgList.size(); algo_counter++) { // loop on algos
547  // check whether all requirements/dependencies for the algorithm are fulfilled...
548  const state_type& algo_requirements = m_all_requirements[algo_counter];
549  // Very verbose!
550  // log << MSG::VERBOSE << "Checking dependencies for algo " << algo_counter << ":\n"
551  // << " o Requirements: " << algo_requirements << std::endl
552  // << " o State: " << event_state->state() << endmsg;
553 
554  // ...and whether the algorithm was already started and if it can be started
555  bool algo_not_started_and_dependencies_there = (algo_requirements.is_subset_of(event_state->state()) &&
556  (event_state->hasStarted(algo_counter) ) == false);
557 
558  // It could run, just the maximum number of algos in flight has been reached
559  if (algo_not_started_and_dependencies_there)
560  no_algo_can_run = false;
561  if (algo_not_started_and_dependencies_there &&
563  // Pick the algorithm if available and if not and requested create one
564  IAlgorithm* ialgo=NULL;
565  // To be transferred to the algomanager, this is inefficient
566  ListAlg::iterator algoIt = m_topAlgList.begin();
567  std::advance(algoIt, algo_counter);
568  if(m_algResourcePool->acquireAlgorithm(algoIt->get()->name(),ialgo)){
569  log << MSG::INFO << "Launching algo " << algo_counter<< " on event " << event_Context->evt() << endmsg;
570  // Attach context to the algo
571  Algorithm* algo = dynamic_cast<Algorithm*> (ialgo);
572  algo->setContext(event_Context);
573 
574  tbb::task* t = new( tbb::task::allocate_root() ) HiveAlgoTask(ialgo, event_state, this);
575  tbb::task::enqueue( *t);
576 
577  event_state->algoStarts(algo_counter);
579 
580  log << MSG::INFO << "Algos in flight: " << m_total_algos_in_flight << endmsg;
581  }
582  } // End scheduling if block
583 
584  }// end loop on algo indices
585 
586  // update the event state with what has been put into the DataSvc
587  std::vector<std::string> new_products;
588  m_whiteboard->selectStore(event_Context->slot()).ignore();
589  sc = m_whiteboard->getNewDataObjects(new_products);
590  if( !sc.isSuccess() ){
591  warning() << "Error getting recent new products (since last time called)" << endmsg;
592  }
593  for (const auto& newProduct : new_products) {
594  log << MSG::DEBUG << "New Product: " << newProduct << " in the store." << endmsg;
595  if (m_product_indices.count( newProduct ) == 1) { // only products with dependencies upon need to be announced to other algos
596  log << MSG::DEBUG << " - Used as input by some algorithm. Updating the event state." << endmsg;
597  event_state->update_state(m_product_indices[newProduct]);
598  }
599  }
600 
601 
602  /* Check if we stall on the current event
603  * One should check if:
604  * - Nothing can run
605  * - Nothing is running
606  * - No new product is available
607  * - The event is not finished
608  * At this point one could claim a stall.
609  * The implementation poses a challenge though, which resides in the
610  * asyncronous termination of algorithm and potential writing in the
611  * store. Therefore one checks the 4 aforementioned conditions.
612  * Then, the store is again checked (without removing the new
613  * elements). If something new is there the stall is not sure
614  * anymore.
615  * Another possibility could be to check if any algo terminated
616  * during the checks made to the wb probably.
617  */
618  if (no_algo_can_run && // nothing could run
619  m_total_algos_in_flight==0 && // nothing is running
620  new_products.size() == 0 && // no new product available
621  ! event_state->hasFinished() ){ // the event is not finished
622 
623  // Check if something arrived on the wb meanwhile
625 
626  std::string errorMessage("No algorithm can run, "
627  "no algorithm in flight, "
628  "no new products in the store, "
629  "event not complete: this is a stall.");
630  fatal() << errorMessage << std::endl
631  << "Algorithms that ran for event " << event_Context->evt() << std::endl;
632  unsigned int algo_counter=0;
633  for (auto& algo : m_topAlgList){
634  bool has_started = event_state->hasStarted(algo_counter);
635  if (has_started)
636  fatal() << " o " << algo->name() << " could run" << std::endl;
637  else
638  fatal() << " o " << algo->name() << " could NOT run" << std::endl;
639  algo_counter++;
640  } // End ofloop on algos
641  fatal() << endmsg;
642  throw GaudiException (errorMessage,"HiveEventLoopMgr",StatusCode::FAILURE);
643  }
644  }
645  }// end loop on evts in flight
646  }// end loop until at least one evt in flight finished
647 
648  // Remove from the in flight events the finished ones
649  std::list<contextSchedState_tuple>::iterator it=events_in_flight.begin();
650 
651  while (it!=events_in_flight.end()){
652  // Now proceed to deletion
653  if (std::get<1>(*it)->hasFinished()){
654  const unsigned int evt_num = std::get<0>(*it)->evt();
655  const unsigned int evt_slot = std::get<0>(*it)->slot();
656  log << MSG::INFO << "Event "<< evt_num << " finished. Events in flight are "
657  << events_in_flight.size() << ". Processed events are "
658  << n_processed_events << endmsg;
659  info() << "Event "<< evt_num << " finished. now is " << secsFromStart() << endmsg;
660 
661  // Calculate min and max event num
662  unsigned int min_event_num=0xFFFFFFFF;
663  unsigned int max_event_num=0;
664 
665  for (auto& evtContext_evtstate : events_in_flight){
666  const unsigned int evt_num = std::get<0>(evtContext_evtstate)->evt();
667  // Update min and max for backlog calculation
668  if (evt_num > max_event_num) max_event_num=evt_num;
669  if (evt_num < min_event_num) min_event_num=evt_num;
670  }
671  unsigned int evt_backlog=max_event_num-min_event_num;
672  info() << "Event backlog (max= " << max_event_num << ", min= "
673  << min_event_num<<" ) = " << evt_backlog << endmsg;
674 
675 
676  // Output
677  // Call the execute() method of all output streams
678  for (ListAlg::iterator ito = m_outStreamList.begin(); ito != m_outStreamList.end(); ito++ ) {
679  (*ito)->resetExecuted();
680  StatusCode sc;
681  sc = (*ito)->sysExecute();
682  if (UNLIKELY(!sc.isSuccess())) {
683  warning() << "Execution of output stream " << (*ito)->name() << " failed" << endmsg;
684  }
685  }
686 
687  sc = m_whiteboard->clearStore(evt_slot);
688  if( !sc.isSuccess() ) {
689  warning() << "Clear of Event data store failed" << endmsg;
690  }
691  else {
692  info() << "Cleared store " << evt_slot << endmsg;
693  }
694  m_whiteboard->freeStore(evt_slot).ignore();
695 
696  delete std::get<0>(*it);
697  delete std::get<1>(*it);
698  it=events_in_flight.erase(it) ;
699 
700  n_processed_events++;
701 
702  } else{
703  ++it;
704  }
705  }
706  // End scheduling session ------------------------------------------------
707 
708  } // End while loop on events
709 
710  always() << "---> Loop Finished (seconds): " << secsFromStart() <<endmsg;
711 
712  return StatusCode::SUCCESS;
713 }
714 
717  refpAddr = 0;
719  if ( !sc.isSuccess() ) {
720  return sc;
721  }
722  // Create root address and assign address to data service
723  sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
724  if( !sc.isSuccess() ) {
726  if ( sc.isSuccess() ) {
727  sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
728  if ( !sc.isSuccess() ) {
729  warning() << "Error creating IOpaqueAddress." << endmsg;
730  }
731  }
732  }
733  return sc;
734 }
735 
736 
737 // Here because temporary
738 #include <iostream>
739 
740 
742 void
744 
745  // Count how many products are actually requested
746  for (auto& thisAlgoDependencies : m_AlgosDependencies){
747  m_nProducts += thisAlgoDependencies.size();
748  }
749  const unsigned int n_algos = m_topAlgList.size();
750  std::vector<state_type> all_requirements(n_algos,state_type(m_nProducts));
751 
752  unsigned int algo_counter=0;
753  unsigned int input_counter=0;
754 
755  MsgStream log(msgSvc(), name());
756  // loop on the dependencies
757  for (const auto& algoDependencies : m_AlgosDependencies){ // loop on algo dependencies lists
758  state_type requirements(m_nProducts);
759  log << MSG::DEBUG << "Algorithm " << algo_counter << " dependencies: " << endmsg;
760  for (const auto& dependency : algoDependencies){ // loop on dependencies
761  log << MSG::DEBUG << " - " << dependency << endmsg;
762  auto ret = m_product_indices.insert(std::pair<std::string, unsigned int>("/Event/"+dependency,input_counter));
763  // insert successful means == wasn't known before. So increment counter
764  if (ret.second==true) ++input_counter;
765  // in any case the return value holds the proper product index
766  requirements[ret.first->second] = true;
767  log << MSG::DEBUG << " - Requirements now: " << requirements[ret.first->second] << endmsg;
768  }// end loop on single dependencies
769 
770  all_requirements[algo_counter] = requirements;
771  ++algo_counter;
772  } // end loop on algo dependencies lists
773 
774  // Loop on the product indices
775  log << MSG::DEBUG << "Product indices:" << endmsg;
776  for (auto& prod_index: m_product_indices)
777  log << MSG::DEBUG << " - " << prod_index.first << " " << prod_index.second << endmsg;
778 
779  m_numberOfAlgos = algo_counter;
780  m_all_requirements = all_requirements;
781 
782 }
783 
784 //---------------------------------------------------------------------------
785 
787  m_algResourcePool->releaseAlgorithm(algo->name(),algo);
789  MsgStream log(msgSvc(), name());
790  log << MSG::DEBUG << "[taskFinished] Algos in flight: " << m_total_algos_in_flight << endmsg;
791 }
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
HiveEventLoopMgr(const std::string &nam, ISvcLocator *svcLoc)
Standard Constructor.
HiveAlgoTask(IAlgorithm *algorithm, EventSchedulingState *scheduler, HiveEventLoopMgr *eventloopmanager)
virtual StatusCode getNewDataObjects(std::vector< std::string > &products)=0
Get the latest new data objects registred in store.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
IAlgorithm * m_algorithm
void set(const long int &e=0, const ID_type &s=0, const bool f=false)
Definition: EventContext.h:42
unsigned int m_nProducts
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Histogram Persistency Service.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
virtual StatusCode getProperty(Property *p) const =0
Get the property by property.
void find_dependencies()
Get the input and output collections.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
implementation of IService::finalize
unsigned int m_numberOfAlgos
Total number of algos.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode setNumberOfStores(size_t slots)=0
Set the number of 'slots'.
unsigned int m_max_parallel
Maximum number of parallel running algorithms.
void taskFinished(IAlgorithm *&algo)
Decrement the number of algos in flight and put algo back in manager - maybe private.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
HistogramAgent base in charge of collecting all the refereces to DataObjects in a transient store tha...
StatusCode initialize() override
implementation of IService::initialize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
bool m_CloneAlgorithms
Clone algorithms to run them simultaneously.
virtual StatusCode stop()
implementation of IService::stop
bool m_endEventFired
Flag to avoid to fire the EnvEvent incident twice in a row (and also not before the first event) ...
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress)=0
Convert the transient object to the requested representation.
virtual const std::vector< IAlgorithm * > & getAlgorithms() const =0
Return the list of Algorithms.
SmartIF< IDataManagerSvc > m_evtDataMgrSvc
Reference to the Event Data Service's IDataManagerSvc interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
std::vector< DataObject * > IDataSelector
This is only a placeholder to allow me compiling until the responsible guy does his work! M...
Definition: IDataSelector.h:8
StatusCode reinitialize() override
implementation of IService::reinitialize
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
virtual StatusCode sysExecute()=0
System execution. This method invokes the execute() method of a concrete algorithm.
StatusCode stop() override
implementation of IService::stop
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
virtual StatusCode sysInitialize()=0
Initialize Service.
#define DEBMSG
std::string m_abortEventSource
Source of the AbortEvent incident.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
SmartIF< IConversionSvc > m_histoPersSvc
Reference to the Histogram Persistency Service.
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
SmartIF< IDataManagerSvc > m_histoDataMgrSvc
Reference to the Histogram Data Service.
virtual StatusCode clearStore(size_t partitionIndex)=0
Clear an given 'slot'.
virtual StatusCode traverseTree(IDataStoreAgent *pAgent)=0
Analyse by traversing all data objects in the data store.
virtual StatusCode finalize()
implementation of IService::finalize
unsigned int m_evts_parallel
Number of events in parallel.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
virtual StatusCode next(Context &c) const =0
Fetch the next event or the first event if it will be use soon after the creation of the context...
long int evt() const
Definition: EventContext.h:37
virtual StatusCode createContext(Context *&c) const =0
Create and return a context object that will keep track of the state of selection.
SmartIF< IEvtSelector > m_evtSelector
Reference to the Event Selector.
virtual StatusCode nextEvent(int maxevt)
implementation of IService::nextEvent
bool PyHelper() setProperty(IInterface *p, char *name, char *value)
Definition: Bootstrap.cpp:254
virtual StatusCode releaseContext(Context *&) const =0
Release the Context object.
std::string m_evtsel
Event selector.
HiveEventLoopMgr * m_eventloopmanager
The IRegistry represents the entry door to the environment any data object residing in a transient da...
Definition: IRegistry.h:22
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:354
std::vector< state_type > m_all_requirements
All requirements.
tbb::task * execute()
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
IDataSelector * selectedObjects()
Return the set of selected DataObjects.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:50
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
std::map< std::string, unsigned int > m_product_indices
Register of input products.
virtual StatusCode setRoot(std::string root_name, DataObject *pObject)=0
Initialize data store for new event by giving new event path.
SmartIF< IDataProviderSvc > m_evtDataSvc
Reference to the Event Data Service's IDataProviderSvc interface.
boost::dynamic_bitset state_type
virtual bool newDataObjectsPresent()=0
Check if something is new in the whiteboard without getting the products.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
virtual ~HiveEventLoopMgr()
Standard Destructor.
virtual unsigned long release()=0
Release Interface instance.
ListAlg m_outStreamList
List of output streams.
dirty place for adding an AlgoTask wrapper
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
unsigned int m_num_threads
Total numbers of threads.
virtual size_t allocateStore(int evtnumber)=0
Allocate a store partition for new event.
EventSchedulingState * m_scheduler
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
ID_type slot() const
Definition: EventContext.h:38
virtual StatusCode executeRun(int maxevt)
implementation of IEventProcessor::executeRun()
virtual StatusCode executeEvent(void *par)
implementation of IEventProcessor::executeEvent(void* par)
Base class for all Incidents (computing events).
Definition: Incident.h:16
virtual StatusCode freeStore(size_t partitionIndex)=0
Free a store partition.
#define UNLIKELY(x)
Definition: Kernel.h:126
std::atomic_uint m_total_algos_in_flight
Total number of algos in flight across all events.
bool m_warnings
Flag to disable warning messages when using external input.
algosDependenciesCollection m_AlgosDependencies
std::string m_histPersName
Name of the Hist Pers type.
This is the default processing manager of the application manager.
IEvtSelector::Context * m_evtContext
Event Iterator.
virtual StatusCode reinitialize()
implementation of IService::reinitialize
tbb::task_scheduler_init * m_tbb_scheduler_init
Pointer to tbb task scheduler.
Opaque address interface definition.
void ignore() const
Definition: StatusCode.h:108
constexpr int Success
Definition: AppReturnCode.h:16
virtual StatusCode initialize()
implementation of IService::initialize
virtual StatusCode createAddress(const Context &c, IOpaqueAddress *&iop) const =0
Create an IOpaqueAddress object from the event fetched.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun( )
#define LIKELY(x)
Definition: Kernel.h:125
list i
Definition: ana.py:128
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject)=0
Resolve the references of the converted object.
ListAlg m_topAlgList
List of top level algorithms.
bool m_abortEvent
Flag signalling that the event being processedhas to be aborted (skip all following top algs)...
bool m_DumpQueues
Dump the algorithm queues.