4 #include "GaudiKernel/SmartIF.h"
5 #include "GaudiKernel/Incident.h"
6 #include "GaudiKernel/MsgStream.h"
7 #include "GaudiKernel/SvcFactory.h"
8 #include "GaudiKernel/DataObject.h"
9 #include "GaudiKernel/IAlgManager.h"
10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IDataManagerSvc.h"
13 #include "GaudiKernel/IDataProviderSvc.h"
14 #include "GaudiKernel/IConversionSvc.h"
15 #include "GaudiKernel/AppReturnCode.h"
16 #include "GaudiKernel/DataSvc.h"
24 #include "tbb/task_scheduler_init.h"
26 #include "tbb/tick_count.h"
28 #include "GaudiKernel/EventContext.h"
29 #include "GaudiKernel/Algorithm.h"
31 #include <GaudiKernel/GaudiException.h>
36 #include <sys/resource.h>
37 #include <sys/times.h>
43 #define ON_DEBUG if (UNLIKELY(outputLevel() <= MSG::DEBUG))
44 #define ON_VERBOSE if (UNLIKELY(outputLevel() <= MSG::VERBOSE))
46 #define DEBMSG ON_DEBUG debug()
47 #define VERMSG ON_VERBOSE verbose()
99 declareProperty(
"EvtSel",
m_evtsel );
101 "Set this property to false to suppress warning messages");
129 DEBMSG <<
"Error Initializing base class MinimalEventLoopMgr." <<
endmsg;
138 fatal() <<
"Error retrieving EventDataSvc interface IDataManagerSvc." <<
endmsg;
141 m_evtDataSvc = serviceLocator()->service(
"EventDataSvc");
143 fatal() <<
"Error retrieving EventDataSvc interface IDataProviderSvc." <<
endmsg;
146 m_whiteboard = serviceLocator()->service(
"EventDataSvc");
148 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
156 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
169 fatal() <<
"Can not create the event selector Context." <<
endmsg;
174 fatal() <<
"EventSelector not found." <<
endmsg;
182 warning() <<
"Unable to locate service \"EventSelector\" " <<
endmsg;
183 warning() <<
"No events will be processed from external input." <<
endmsg;
190 fatal() <<
"Error retrieving HistogramDataSvc." <<
endmsg;
194 m_histoPersSvc = serviceLocator()->service(
"HistogramPersistencySvc");
196 warning() <<
"Histograms cannot not be saved - though required." <<
endmsg;
203 fatal() <<
"Error retrieving AlgResourcePool" <<
endmsg;
222 DEBMSG <<
"Error Initializing base class MinimalEventLoopMgr." <<
endmsg;
240 sc = theSvc->reinitialize();
242 error() <<
"Failure Reinitializing EventSelector "
243 << theSvc->name( ) <<
endmsg;
250 error() <<
"Failure Initializing EventSelector "
251 << theSvc->name( ) <<
endmsg;
257 error() <<
"Can not create Context " << theSvc->name( ) <<
endmsg;
260 info() <<
"EventSelector service changed to "
261 << theSvc->name( ) <<
endmsg;
270 error() <<
"Can not create Context " << theSvc->name( ) <<
endmsg;
305 error() <<
"Error finalizing base class" <<
endmsg;
316 if ( objects->size() > 0 ) {
317 IDataSelector::iterator
i;
318 for ( i = objects->begin(); i != objects->end(); i++ ) {
322 (*i)->registry()->setAddress(pAddr);
328 for ( i = objects->begin(); i != objects->end(); i++ ) {
337 info() <<
"Histograms converted successfully according to request." <<
endmsg;
340 error() <<
"Error while saving Histograms." <<
endmsg;
344 error() <<
"Error while traversing Histogram data store" <<
endmsg;
376 always() <<
"Terminating event processing loop due to a stop scheduled by an incident listener" <<
endmsg;
391 if (
LIKELY(0 != ialg)) ialg->resetExecuted();
395 bool eventfailed =
false;
405 (*ito)->resetExecuted();
407 sc = (*ito)->sysExecute();
409 warning() <<
"Execution of output stream " << (*ito)->name() <<
" failed" <<
endmsg;
418 error() <<
"Error processing event loop." <<
endmsg;
446 auto start_time = tbb::tick_count::now();
447 auto secsFromStart = [&start_time]()->
double{
448 return (tbb::tick_count::now()-start_time).seconds();
451 typedef std::tuple<EventContext*,EventSchedulingState*> contextSchedState_tuple;
460 auto has_finished = []
461 (contextSchedState_tuple evtContext_evtstate)
462 {
return std::get<1>(evtContext_evtstate)->hasFinished();};
465 always() <<
"Running with "
471 int n_processed_events = 0;
476 std::list<contextSchedState_tuple> events_in_flight;
480 while( maxevt == -1 ? !eof : n_processed_events < maxevt ){
482 const unsigned int n_events_in_flight = events_in_flight.size();
483 const unsigned int n_evts_to_process = maxevt - n_processed_events - n_events_in_flight;
485 unsigned int n_acquirable_events =
m_evts_parallel - n_events_in_flight ;
486 if (n_acquirable_events > n_evts_to_process)
487 n_acquirable_events = n_evts_to_process;
489 log <<
MSG::INFO <<
"Evts in flight: " << n_events_in_flight <<
endmsg;
492 log <<
MSG::INFO <<
"Acquirable Events are " << n_acquirable_events <<
endmsg;
497 for (
unsigned int offset=0; offset< n_acquirable_events; ++offset){
500 const int evt_num = n_processed_events + offset + n_events_in_flight;
509 info() <<
"No more events in event selection " <<
endmsg;
516 warning() <<
"Error declaring event root address." <<
endmsg;
523 warning() <<
"Error declaring event root DataObject" <<
endmsg;
528 events_in_flight.push_back(std::make_tuple(evtContext,event_state));
529 info() <<
"Started event " << evt_num <<
" at " << secsFromStart() <<
endmsg;
536 auto in_flight_end = events_in_flight.end();
537 auto in_flight_begin = events_in_flight.begin();
539 while (in_flight_end == find_if(in_flight_begin, in_flight_end ,has_finished)){
540 bool no_algo_can_run =
true;
541 for (
auto& evtContext_evtstate : events_in_flight){
543 EventContext* event_Context = std::get<0>(evtContext_evtstate);
546 for (
unsigned int algo_counter=0; algo_counter<
m_topAlgList.size(); algo_counter++) {
555 bool algo_not_started_and_dependencies_there = (algo_requirements.is_subset_of(event_state->state()) &&
556 (event_state->hasStarted(algo_counter) ) ==
false);
559 if (algo_not_started_and_dependencies_there)
560 no_algo_can_run =
false;
561 if (algo_not_started_and_dependencies_there &&
567 std::advance(algoIt, algo_counter);
569 log <<
MSG::INFO <<
"Launching algo " << algo_counter<<
" on event " << event_Context->
evt() <<
endmsg;
574 tbb::task* t =
new( tbb::task::allocate_root() )
HiveAlgoTask(ialgo, event_state,
this);
575 tbb::task::enqueue( *t);
577 event_state->algoStarts(algo_counter);
587 std::vector<std::string> new_products;
591 warning() <<
"Error getting recent new products (since last time called)" <<
endmsg;
593 for (
const auto& newProduct : new_products) {
594 log <<
MSG::DEBUG <<
"New Product: " << newProduct <<
" in the store." <<
endmsg;
596 log <<
MSG::DEBUG <<
" - Used as input by some algorithm. Updating the event state." <<
endmsg;
618 if (no_algo_can_run &&
620 new_products.size() == 0 &&
621 ! event_state->hasFinished() ){
626 std::string errorMessage(
"No algorithm can run, "
627 "no algorithm in flight, "
628 "no new products in the store, "
629 "event not complete: this is a stall.");
630 fatal() << errorMessage << std::endl
631 <<
"Algorithms that ran for event " << event_Context->
evt() << std::endl;
632 unsigned int algo_counter=0;
634 bool has_started = event_state->hasStarted(algo_counter);
636 fatal() <<
" o " << algo->name() <<
" could run" << std::endl;
638 fatal() <<
" o " << algo->name() <<
" could NOT run" << std::endl;
649 std::list<contextSchedState_tuple>::iterator it=events_in_flight.begin();
651 while (it!=events_in_flight.end()){
653 if (std::get<1>(*it)->hasFinished()){
654 const unsigned int evt_num = std::get<0>(*it)->evt();
655 const unsigned int evt_slot = std::get<0>(*it)->slot();
656 log <<
MSG::INFO <<
"Event "<< evt_num <<
" finished. Events in flight are "
657 << events_in_flight.size() <<
". Processed events are "
658 << n_processed_events <<
endmsg;
659 info() <<
"Event "<< evt_num <<
" finished. now is " << secsFromStart() <<
endmsg;
662 unsigned int min_event_num=0xFFFFFFFF;
663 unsigned int max_event_num=0;
665 for (
auto& evtContext_evtstate : events_in_flight){
666 const unsigned int evt_num = std::get<0>(evtContext_evtstate)->
evt();
668 if (evt_num > max_event_num) max_event_num=evt_num;
669 if (evt_num < min_event_num) min_event_num=evt_num;
671 unsigned int evt_backlog=max_event_num-min_event_num;
672 info() <<
"Event backlog (max= " << max_event_num <<
", min= "
673 << min_event_num<<
" ) = " << evt_backlog <<
endmsg;
679 (*ito)->resetExecuted();
681 sc = (*ito)->sysExecute();
683 warning() <<
"Execution of output stream " << (*ito)->name() <<
" failed" <<
endmsg;
689 warning() <<
"Clear of Event data store failed" <<
endmsg;
692 info() <<
"Cleared store " << evt_slot <<
endmsg;
696 delete std::get<0>(*it);
697 delete std::get<1>(*it);
698 it=events_in_flight.erase(it) ;
700 n_processed_events++;
710 always() <<
"---> Loop Finished (seconds): " << secsFromStart() <<
endmsg;
729 warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
752 unsigned int algo_counter=0;
753 unsigned int input_counter=0;
757 for (
const auto& algoDependencies : m_AlgosDependencies){
759 log <<
MSG::DEBUG <<
"Algorithm " << algo_counter <<
" dependencies: " <<
endmsg;
760 for (
const auto& dependency : algoDependencies){
762 auto ret =
m_product_indices.insert(std::pair<std::string, unsigned int>(
"/Event/"+dependency,input_counter));
764 if (ret.second==
true) ++input_counter;
766 requirements[ret.first->second] =
true;
767 log <<
MSG::DEBUG <<
" - Requirements now: " << requirements[ret.first->second] <<
endmsg;
770 all_requirements[algo_counter] = requirements;
777 log <<
MSG::DEBUG <<
" - " << prod_index.first <<
" " << prod_index.second <<
endmsg;
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
HiveEventLoopMgr(const std::string &nam, ISvcLocator *svcLoc)
Standard Constructor.
HiveAlgoTask(IAlgorithm *algorithm, EventSchedulingState *scheduler, HiveEventLoopMgr *eventloopmanager)
virtual StatusCode getNewDataObjects(std::vector< std::string > &products)=0
Get the latest new data objects registred in store.
Definition of the MsgStream class used to transmit messages.
void set(const long int &e=0, const ID_type &s=0, const bool f=false)
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Histogram Persistency Service.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
virtual StatusCode getProperty(Property *p) const =0
Get the property by property.
void find_dependencies()
Get the input and output collections.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode finalize() override
implementation of IService::finalize
unsigned int m_numberOfAlgos
Total number of algos.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode setNumberOfStores(size_t slots)=0
Set the number of 'slots'.
unsigned int m_max_parallel
Maximum number of parallel running algorithms.
void taskFinished(IAlgorithm *&algo)
Decrement the number of algos in flight and put algo back in manager - maybe private.
bool isSuccess() const
Test for a status code of SUCCESS.
HistogramAgent base in charge of collecting all the refereces to DataObjects in a transient store tha...
StatusCode initialize() override
implementation of IService::initialize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
bool m_CloneAlgorithms
Clone algorithms to run them simultaneously.
virtual StatusCode stop()
implementation of IService::stop
bool m_endEventFired
Flag to avoid to fire the EnvEvent incident twice in a row (and also not before the first event) ...
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress)=0
Convert the transient object to the requested representation.
virtual const std::vector< IAlgorithm * > & getAlgorithms() const =0
Return the list of Algorithms.
SmartIF< IDataManagerSvc > m_evtDataMgrSvc
Reference to the Event Data Service's IDataManagerSvc interface.
This class represents an entry point to all the event specific data.
std::vector< DataObject * > IDataSelector
This is only a placeholder to allow me compiling until the responsible guy does his work! M...
StatusCode reinitialize() override
implementation of IService::reinitialize
void setContext(EventContext *context)
set the context
virtual StatusCode sysExecute()=0
System execution. This method invokes the execute() method of a concrete algorithm.
StatusCode stop() override
implementation of IService::stop
TYPE * get() const
Get interface pointer.
virtual StatusCode sysInitialize()=0
Initialize Service.
std::string m_abortEventSource
Source of the AbortEvent incident.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
SmartIF< IConversionSvc > m_histoPersSvc
Reference to the Histogram Persistency Service.
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
SmartIF< IDataManagerSvc > m_histoDataMgrSvc
Reference to the Histogram Data Service.
virtual StatusCode clearStore(size_t partitionIndex)=0
Clear an given 'slot'.
virtual StatusCode traverseTree(IDataStoreAgent *pAgent)=0
Analyse by traversing all data objects in the data store.
virtual StatusCode finalize()
implementation of IService::finalize
unsigned int m_evts_parallel
Number of events in parallel.
This class is used for returning status codes from appropriate routines.
virtual StatusCode next(Context &c) const =0
Fetch the next event or the first event if it will be use soon after the creation of the context...
virtual StatusCode createContext(Context *&c) const =0
Create and return a context object that will keep track of the state of selection.
SmartIF< IEvtSelector > m_evtSelector
Reference to the Event Selector.
virtual StatusCode nextEvent(int maxevt)
implementation of IService::nextEvent
bool PyHelper() setProperty(IInterface *p, char *name, char *value)
virtual StatusCode releaseContext(Context *&) const =0
Release the Context object.
std::string m_evtsel
Event selector.
HiveEventLoopMgr * m_eventloopmanager
The IRegistry represents the entry door to the environment any data object residing in a transient da...
#define DECLARE_SERVICE_FACTORY(x)
std::vector< state_type > m_all_requirements
All requirements.
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
IDataSelector * selectedObjects()
Return the set of selected DataObjects.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
The IAlgorithm is the interface implemented by the Algorithm base class.
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
std::map< std::string, unsigned int > m_product_indices
Register of input products.
virtual StatusCode setRoot(std::string root_name, DataObject *pObject)=0
Initialize data store for new event by giving new event path.
SmartIF< IDataProviderSvc > m_evtDataSvc
Reference to the Event Data Service's IDataProviderSvc interface.
boost::dynamic_bitset state_type
virtual bool newDataObjectsPresent()=0
Check if something is new in the whiteboard without getting the products.
Base class from which all concrete algorithm classes should be derived.
virtual ~HiveEventLoopMgr()
Standard Destructor.
virtual unsigned long release()=0
Release Interface instance.
ListAlg m_outStreamList
List of output streams.
dirty place for adding an AlgoTask wrapper
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
unsigned int m_num_threads
Total numbers of threads.
virtual size_t allocateStore(int evtnumber)=0
Allocate a store partition for new event.
EventSchedulingState * m_scheduler
bool isValid() const
Allow for check if smart pointer is valid.
virtual StatusCode executeRun(int maxevt)
implementation of IEventProcessor::executeRun()
virtual StatusCode executeEvent(void *par)
implementation of IEventProcessor::executeEvent(void* par)
Base class for all Incidents (computing events).
virtual StatusCode freeStore(size_t partitionIndex)=0
Free a store partition.
std::atomic_uint m_total_algos_in_flight
Total number of algos in flight across all events.
bool m_warnings
Flag to disable warning messages when using external input.
algosDependenciesCollection m_AlgosDependencies
std::string m_histPersName
Name of the Hist Pers type.
This is the default processing manager of the application manager.
IEvtSelector::Context * m_evtContext
Event Iterator.
virtual StatusCode reinitialize()
implementation of IService::reinitialize
tbb::task_scheduler_init * m_tbb_scheduler_init
Pointer to tbb task scheduler.
Opaque address interface definition.
virtual StatusCode initialize()
implementation of IService::initialize
virtual StatusCode createAddress(const Context &c, IOpaqueAddress *&iop) const =0
Create an IOpaqueAddress object from the event fetched.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun( )
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject)=0
Resolve the references of the converted object.
ListAlg m_topAlgList
List of top level algorithms.
bool m_abortEvent
Flag signalling that the event being processedhas to be aborted (skip all following top algs)...
bool m_DumpQueues
Dump the algorithm queues.