24 #include "tbb/task_scheduler_init.h"
26 #include "tbb/tick_count.h"
36 #include <sys/resource.h>
37 #include <sys/times.h>
43 #define ON_DEBUG if (UNLIKELY(outputLevel() <= MSG::DEBUG))
44 #define ON_VERBOSE if (UNLIKELY(outputLevel() <= MSG::VERBOSE))
46 #define DEBMSG ON_DEBUG debug()
47 #define VERMSG ON_VERBOSE verbose()
101 "Set this property to false to suppress warning messages");
129 DEBMSG <<
"Error Initializing base class MinimalEventLoopMgr." <<
endmsg;
138 fatal() <<
"Error retrieving EventDataSvc interface IDataManagerSvc." <<
endmsg;
143 fatal() <<
"Error retrieving EventDataSvc interface IDataProviderSvc." <<
endmsg;
148 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
156 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
169 fatal() <<
"Can not create the event selector Context." <<
endmsg;
182 warning() <<
"Unable to locate service \"EventSelector\" " <<
endmsg;
183 warning() <<
"No events will be processed from external input." <<
endmsg;
190 fatal() <<
"Error retrieving HistogramDataSvc." <<
endmsg;
196 warning() <<
"Histograms cannot not be saved - though required." <<
endmsg;
203 fatal() <<
"Error retrieving AlgResourcePool" <<
endmsg;
222 DEBMSG <<
"Error Initializing base class MinimalEventLoopMgr." <<
endmsg;
240 sc = theSvc->reinitialize();
242 error() <<
"Failure Reinitializing EventSelector "
243 << theSvc->name( ) <<
endmsg;
250 error() <<
"Failure Initializing EventSelector "
251 << theSvc->name( ) <<
endmsg;
257 error() <<
"Can not create Context " << theSvc->name( ) <<
endmsg;
260 info() <<
"EventSelector service changed to "
261 << theSvc->name( ) <<
endmsg;
270 error() <<
"Can not create Context " << theSvc->name( ) <<
endmsg;
305 error() <<
"Error finalizing base class" <<
endmsg;
316 if ( objects->
size() > 0 ) {
317 IDataSelector::iterator
i;
318 for ( i = objects->
begin(); i != objects->
end(); i++ ) {
322 (*i)->registry()->setAddress(pAddr);
328 for ( i = objects->
begin(); i != objects->
end(); i++ ) {
337 info() <<
"Histograms converted successfully according to request." <<
endmsg;
340 error() <<
"Error while saving Histograms." <<
endmsg;
344 error() <<
"Error while traversing Histogram data store" <<
endmsg;
376 always() <<
"Terminating event processing loop due to a stop scheduled by an incident listener" <<
endmsg;
391 if (
LIKELY(0 != ialg)) ialg->resetExecuted();
395 bool eventfailed =
false;
405 (*ito)->resetExecuted();
407 sc = (*ito)->sysExecute();
409 warning() <<
"Execution of output stream " << (*ito)->name() <<
" failed" <<
endmsg;
418 error() <<
"Error processing event loop." <<
endmsg;
446 auto start_time = tbb::tick_count::now();
447 auto secsFromStart = [&start_time]()->
double{
448 return (tbb::tick_count::now()-start_time).seconds();
460 auto has_finished = []
461 (contextSchedState_tuple evtContext_evtstate)
462 {
return std::get<1>(evtContext_evtstate)->hasFinished();};
465 always() <<
"Running with "
471 int n_processed_events = 0;
480 while( maxevt == -1 ? !eof : n_processed_events < maxevt ){
482 const unsigned int n_events_in_flight = events_in_flight.
size();
483 const unsigned int n_evts_to_process = maxevt - n_processed_events - n_events_in_flight;
485 unsigned int n_acquirable_events =
m_evts_parallel - n_events_in_flight ;
486 if (n_acquirable_events > n_evts_to_process)
487 n_acquirable_events = n_evts_to_process;
489 log <<
MSG::INFO <<
"Evts in flight: " << n_events_in_flight <<
endmsg;
492 log <<
MSG::INFO <<
"Acquirable Events are " << n_acquirable_events <<
endmsg;
497 for (
unsigned int offset=0; offset< n_acquirable_events; ++offset){
500 const int evt_num = n_processed_events + offset + n_events_in_flight;
509 info() <<
"No more events in event selection " <<
endmsg;
516 warning() <<
"Error declaring event root address." <<
endmsg;
523 warning() <<
"Error declaring event root DataObject" <<
endmsg;
529 info() <<
"Started event " << evt_num <<
" at " << secsFromStart() <<
endmsg;
536 auto in_flight_end = events_in_flight.
end();
537 auto in_flight_begin = events_in_flight.
begin();
539 while (in_flight_end == find_if(in_flight_begin, in_flight_end ,has_finished)){
540 bool no_algo_can_run =
true;
541 for (
auto& evtContext_evtstate : events_in_flight){
543 EventContext* event_Context = std::get<0>(evtContext_evtstate);
546 for (
unsigned int algo_counter=0; algo_counter<
m_topAlgList.
size(); algo_counter++) {
555 bool algo_not_started_and_dependencies_there = (algo_requirements.is_subset_of(event_state->state()) &&
556 (event_state->hasStarted(algo_counter) ) ==
false);
559 if (algo_not_started_and_dependencies_there)
560 no_algo_can_run =
false;
561 if (algo_not_started_and_dependencies_there &&
569 log <<
MSG::INFO <<
"Launching algo " << algo_counter<<
" on event " << event_Context->
evt() <<
endmsg;
574 tbb::task* t =
new( tbb::task::allocate_root() )
HiveAlgoTask(ialgo, event_state,
this);
575 tbb::task::enqueue( *t);
577 event_state->algoStarts(algo_counter);
592 warning() <<
"Error getting recent new products (since last time called)" <<
endmsg;
594 for (
const auto& newProduct : new_products) {
595 log <<
MSG::DEBUG <<
"New Product: " << newProduct <<
" in the store." <<
endmsg;
597 log <<
MSG::DEBUG <<
" - Used as input by some algorithm. Updating the event state." <<
endmsg;
619 if (no_algo_can_run &&
621 new_products.size() == 0 &&
622 ! event_state->hasFinished() ){
628 "no algorithm in flight, "
629 "no new products in the store, "
630 "event not complete: this is a stall.");
632 <<
"Algorithms that ran for event " << event_Context->
evt() <<
std::endl;
633 unsigned int algo_counter=0;
635 bool has_started = event_state->hasStarted(algo_counter);
639 fatal() <<
" o " << algo->name() <<
" could NOT run" <<
std::endl;
652 while (it!=events_in_flight.
end()){
654 if (std::get<1>(*it)->hasFinished()){
655 const unsigned int evt_num = std::get<0>(*it)->evt();
656 const unsigned int evt_slot = std::get<0>(*it)->slot();
657 log <<
MSG::INFO <<
"Event "<< evt_num <<
" finished. Events in flight are "
658 << events_in_flight.
size() <<
". Processed events are "
659 << n_processed_events <<
endmsg;
660 info() <<
"Event "<< evt_num <<
" finished. now is " << secsFromStart() <<
endmsg;
663 unsigned int min_event_num=0xFFFFFFFF;
664 unsigned int max_event_num=0;
666 for (
auto& evtContext_evtstate : events_in_flight){
667 const unsigned int evt_num = std::get<0>(evtContext_evtstate)->
evt();
669 if (evt_num > max_event_num) max_event_num=evt_num;
670 if (evt_num < min_event_num) min_event_num=evt_num;
672 unsigned int evt_backlog=max_event_num-min_event_num;
673 info() <<
"Event backlog (max= " << max_event_num <<
", min= "
674 << min_event_num<<
" ) = " << evt_backlog <<
endmsg;
680 (*ito)->resetExecuted();
682 sc = (*ito)->sysExecute();
684 warning() <<
"Execution of output stream " << (*ito)->name() <<
" failed" <<
endmsg;
693 info() <<
"Cleared store " << evt_slot <<
endmsg;
697 delete std::get<0>(*it);
698 delete std::get<1>(*it);
699 it=events_in_flight.erase(it) ;
701 n_processed_events++;
711 always() <<
"---> Loop Finished (seconds): " << secsFromStart() <<
endmsg;
753 unsigned int algo_counter=0;
754 unsigned int input_counter=0;
758 for (
const auto& algoDependencies : m_AlgosDependencies){
760 log <<
MSG::DEBUG <<
"Algorithm " << algo_counter <<
" dependencies: " <<
endmsg;
761 for (
const auto& dependency : algoDependencies){
765 if (ret.second==
true) ++input_counter;
767 requirements[ret.first->second] =
true;
768 log <<
MSG::DEBUG <<
" - Requirements now: " << requirements[ret.first->second] <<
endmsg;
771 all_requirements[algo_counter] = requirements;
778 log <<
MSG::DEBUG <<
" - " << prod_index.first <<
" " << prod_index.second <<
endmsg;
HiveEventLoopMgr(const std::string &nam, ISvcLocator *svcLoc)
Standard Constructor.
HiveAlgoTask(IAlgorithm *algorithm, EventSchedulingState *scheduler, HiveEventLoopMgr *eventloopmanager)
Definition of the MsgStream class used to transmit messages.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Histogram Persistency Service.
Define general base for Gaudi exception.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
virtual StatusCode getProperty(Property *p) const =0
Get the property by property.
void find_dependencies()
Get the input and output collections.
StatusCode finalize() override
implementation of IService::finalize
unsigned int m_numberOfAlgos
Total number of algos.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode setNumberOfStores(size_t slots)=0
Set the number of 'slots'.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
unsigned int m_max_parallel
Maximum number of parallel running algorithms.
void taskFinished(IAlgorithm *&algo)
Decrement the number of algos in flight and put algo back in manager - maybe private.
bool isSuccess() const
Test for a status code of SUCCESS.
HistogramAgent base in charge of collecting all the refereces to DataObjects in a transient store tha...
StatusCode initialize() override
implementation of IService::initialize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
StatusCode setProperty(const Property &p) override
bool m_CloneAlgorithms
Clone algorithms to run them simultaneously.
virtual StatusCode stop()
implementation of IService::stop
bool m_endEventFired
Flag to avoid to fire the EnvEvent incident twice in a row (and also not before the first event) ...
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress)=0
Convert the transient object to the requested representation.
virtual const std::vector< IAlgorithm * > & getAlgorithms() const =0
Return the list of Algorithms.
SmartIF< IDataManagerSvc > m_evtDataMgrSvc
Reference to the Event Data Service's IDataManagerSvc interface.
This class represents an entry point to all the event specific data.
StatusCode reinitialize() override
implementation of IService::reinitialize
void setContext(EventContext *context)
set the context
virtual StatusCode sysExecute()=0
System execution. This method invokes the execute() method of a concrete algorithm.
StatusCode stop() override
implementation of IService::stop
TYPE * get() const
Get interface pointer.
virtual StatusCode sysInitialize()=0
Initialize Service.
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
const std::string & name() const override
Retrieve name of the service.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
std::string m_abortEventSource
Source of the AbortEvent incident.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
SmartIF< IConversionSvc > m_histoPersSvc
Reference to the Histogram Persistency Service.
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
SmartIF< IDataManagerSvc > m_histoDataMgrSvc
Reference to the Histogram Data Service.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
virtual StatusCode clearStore(size_t partitionIndex)=0
Clear an given 'slot'.
virtual StatusCode traverseTree(IDataStoreAgent *pAgent)=0
Analyse by traversing all data objects in the data store.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
virtual StatusCode finalize()
implementation of IService::finalize
unsigned int m_evts_parallel
Number of events in parallel.
std::map< DataObjID, unsigned int > m_product_indices
Register of input products.
This class is used for returning status codes from appropriate routines.
virtual StatusCode next(Context &c) const =0
Fetch the next event or the first event if it will be use soon after the creation of the context...
virtual StatusCode createContext(Context *&c) const =0
Create and return a context object that will keep track of the state of selection.
SmartIF< IEvtSelector > m_evtSelector
Reference to the Event Selector.
virtual StatusCode nextEvent(int maxevt)
implementation of IService::nextEvent
virtual StatusCode releaseContext(Context *&) const =0
Release the Context object.
std::string m_evtsel
Event selector.
HiveEventLoopMgr * m_eventloopmanager
The IRegistry represents the entry door to the environment any data object residing in a transient da...
#define DECLARE_SERVICE_FACTORY(x)
std::vector< state_type > m_all_requirements
All requirements.
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
IDataSelector * selectedObjects()
Return the set of selected DataObjects.
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
The IAlgorithm is the interface implemented by the Algorithm base class.
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
virtual StatusCode setRoot(std::string root_name, DataObject *pObject)=0
Initialize data store for new event by giving new event path.
SmartIF< IDataProviderSvc > m_evtDataSvc
Reference to the Event Data Service's IDataProviderSvc interface.
boost::dynamic_bitset state_type
virtual bool newDataObjectsPresent()=0
Check if something is new in the whiteboard without getting the products.
Base class from which all concrete algorithm classes should be derived.
virtual ~HiveEventLoopMgr()
Standard Destructor.
virtual unsigned long release()=0
Release Interface instance.
ListAlg m_outStreamList
List of output streams.
dirty place for adding an AlgoTask wrapper
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
unsigned int m_num_threads
Total numbers of threads.
virtual size_t allocateStore(int evtnumber)=0
Allocate a store partition for new event.
EventSchedulingState * m_scheduler
bool isValid() const
Allow for check if smart pointer is valid.
virtual StatusCode executeRun(int maxevt)
implementation of IEventProcessor::executeRun()
virtual StatusCode executeEvent(void *par)
implementation of IEventProcessor::executeEvent(void* par)
Base class for all Incidents (computing events).
virtual StatusCode freeStore(size_t partitionIndex)=0
Free a store partition.
SmartIF< IMessageSvc > & msgSvc() const
The standard message service.
std::atomic_uint m_total_algos_in_flight
Total number of algos in flight across all events.
bool m_warnings
Flag to disable warning messages when using external input.
algosDependenciesCollection m_AlgosDependencies
StatusCode service(const std::string &name, const T *&psvc, bool createIf=true) const
Access a service by name, creating it if it doesn't already exist.
std::string m_histPersName
Name of the Hist Pers type.
This is the default processing manager of the application manager.
IEvtSelector::Context * m_evtContext
Event Iterator.
void set(const ContextEvt_t &e=0, const ContextID_t &s=INVALID_CONTEXT_ID, const bool f=false)
virtual StatusCode reinitialize()
implementation of IService::reinitialize
tbb::task_scheduler_init * m_tbb_scheduler_init
Pointer to tbb task scheduler.
Opaque address interface definition.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
Property * declareProperty(const std::string &name, T &property, const std::string &doc="none") const
Declare the named property.
virtual StatusCode initialize()
implementation of IService::initialize
virtual StatusCode createAddress(const Context &c, IOpaqueAddress *&iop) const =0
Create an IOpaqueAddress object from the event fetched.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
MsgStream & always() const
shortcut for the method msgStream(MSG::ALWAYS)
StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun( )
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject)=0
Resolve the references of the converted object.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
ListAlg m_topAlgList
List of top level algorithms.
bool m_abortEvent
Flag signalling that the event being processedhas to be aborted (skip all following top algs)...
bool m_DumpQueues
Dump the algorithm queues.