26 #include <condition_variable> 31 #include "boost/algorithm/string.hpp" 32 #include "boost/optional.hpp" 33 #include "boost/thread.hpp" 34 #include "boost/tokenizer.hpp" 35 #include "tbb/task_scheduler_init.h" 44 struct DataObjIDDotRepr {
48 return os <<
'\"' << repr.parent.fullKey() <<
'\"';
52 struct DataObjIDSorter {
78 : m_algorithms( algorithms )
79 , m_evtCtx(
std::move( ctx ) )
81 , m_serviceLocator( svcLocator )
89 return MsgStream( messageSvc,
"HLTExecutionTask" );
94 bool eventfailed =
false;
102 ialg->whiteboard()->selectStore( m_evtCtx->
valid() ? m_evtCtx->
slot() : 0 ).ignore();
108 sc = ialg->sysExecute( *m_evtCtx );
115 log() <<
MSG::FATAL <<
".executeEvent(): Exception with tag=" << Exception.
tag() <<
" thrown by " 119 log() <<
MSG::FATAL <<
".executeEvent(): Standard std::exception thrown by " << ialg->name() <<
endmsg 123 log() <<
MSG::FATAL <<
".executeEvent(): UNKNOWN Exception thrown by " << ialg->name() <<
endmsg;
134 if ( !ialg->filterPassed() )
break;
148 error() <<
"Failed to initialize Service Base class." <<
endmsg;
152 m_evtDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"EventDataSvc" );
153 if ( !m_evtDataMgrSvc ) {
154 fatal() <<
"Error retrieving EventDataSvc interface IDataManagerSvc." <<
endmsg;
157 m_whiteboard = serviceLocator()->service<
IHiveWhiteBoard>(
"EventDataSvc" );
158 if ( !m_whiteboard ) {
159 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
164 if ( !appMgrProperty ) {
165 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
171 if ( m_evtsel !=
"NONE" || m_evtsel.length() == 0 ) {
172 m_evtSelector = serviceLocator()->service<
IEvtSelector>(
"EventSelector" );
174 m_evtSelector =
nullptr;
175 warning() <<
"Unable to locate service \"EventSelector\" " <<
endmsg;
176 warning() <<
"No events will be processed from external input." <<
endmsg;
180 m_histoDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"HistogramDataSvc" );
181 if ( !m_histoDataMgrSvc ) {
182 fatal() <<
"Error retrieving HistogramDataSvc." <<
endmsg;
186 m_histoPersSvc = serviceLocator()->service<
IConversionSvc>(
"HistogramPersistencySvc" );
187 if ( !m_histoPersSvc ) {
188 warning() <<
"Histograms cannot not be saved - though required." <<
endmsg;
192 m_algExecStateSvc = serviceLocator()->service<
IAlgExecStateSvc>(
"AlgExecStateSvc" );
193 if ( !m_algExecStateSvc ) {
194 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
198 if ( !m_topAlgs.empty() ) {
199 auto databroker = serviceLocator()->service<
IDataBroker>(
"HiveDataBrokerSvc" );
203 for (
const auto& alg : m_topAlgs.value() ) {
212 if ( !algResourcePool ) {
213 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
219 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
221 m_algos.push_back( algoPtr );
229 boost::optional<std::ofstream> dot{boost::in_place_init_if, !m_dotfile.
empty(), m_dotfile.value()};
232 *dot <<
"digraph G {\n";
233 for (
auto* a : m_algos ) {
234 bool is_consumer = a->outputDataObjs().empty();
235 *dot <<
'\"' << a->name() <<
"\" [shape=box" << ( is_consumer ?
",style=filled" :
"" ) <<
"];\n";
242 for (
auto id : algoPtr->outputDataObjs() ) {
243 auto r = globalOutp.
insert(
id );
244 producers[id] = algoPtr;
246 warning() <<
"multiple algorithms declare " <<
id 247 <<
" as output! could be a single instance in multiple paths " 248 "though, or control flow may guarantee only one runs...!" 256 algosDependencies.
reserve( m_algos.size() );
258 info() <<
"Data Dependencies for Algorithms:";
261 info() <<
"\n " << algoPtr->name();
264 if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
265 for (
const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
267 info() <<
"\n o INPUT " << id;
268 if (
id.key().find(
":" ) != std::string::npos ) {
269 info() <<
" contains alternatives which require resolution...\n";
270 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.
key(), boost::char_separator<char>{
":"}};
274 if ( itok != tokens.end() ) {
275 info() <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
276 id.updateKey( *itok );
278 error() <<
"failed to find alternate in global output list" 279 <<
" for id: " <<
id <<
" in Alg " << algoPtr->name() <<
endmsg;
282 algoDependencies.
insert(
id );
283 if ( dot ) *dot << DataObjIDDotRepr{
id} <<
" -> \"" << algoPtr->name() <<
"\";\n";
286 for (
const DataObjID*
id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
287 info() <<
"\n o OUTPUT " << *id;
288 if ( id->key().find(
":" ) != std::string::npos ) {
289 error() <<
" in Alg " << algoPtr->name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 292 if ( dot ) *dot <<
'\"' << algoPtr->name() <<
"\" -> " << DataObjIDDotRepr{*
id} <<
";\n";
298 algo2Deps[algoPtr] = &algosDependencies.
back();
301 for (
const auto& t : globalOutp ) {
302 if ( globalInp.
find( t ) == globalInp.
end() ) *dot << DataObjIDDotRepr{t} <<
" [style=filled];\n";
310 for (
auto o : globalInp ) {
311 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
313 if ( unmetDep.
size() > 0 ) {
315 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
316 ost <<
"\n o " << *o <<
" required by Algorithm: ";
317 for (
const auto& i : algo2Deps ) {
318 if ( i.second->find( *o ) != i.second->end() ) {
319 ost <<
"\n * " << i.first->name();
323 fatal() <<
"The following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
326 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
330 auto start = m_algos.begin();
331 auto end = m_algos.end();
336 while ( current !=
end ) {
347 info() <<
"Concurrency level information:" <<
endmsg;
348 info() <<
" o Number of events slots: " << m_whiteboard->getNumberOfStores() <<
endmsg;
349 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
352 debug() <<
"Order of algo execution :" <<
endmsg;
353 for (
const Algorithm* algo : m_algos ) debug() <<
" . " << algo->name() <<
endmsg;
362 if ( m_histoPersSvc ) {
364 sc = m_histoDataMgrSvc->traverseTree( &agent );
370 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
371 if ( iret.
isSuccess() ) i->registry()->setAddress( pAddr );
379 if ( sc.isSuccess() ) {
380 info() <<
"Histograms converted successfully according to request." <<
endmsg;
382 error() <<
"Error while saving Histograms." <<
endmsg;
385 error() <<
"Error while traversing Histogram data store" <<
endmsg;
396 int& createdEvts = *
reinterpret_cast<int*
>( createdEvts_IntPtr );
398 auto evtContext = std::make_unique<EventContext>();
399 evtContext->set( createdEvts, m_whiteboard->allocateStore( createdEvts ) );
400 m_algExecStateSvc->reset( *evtContext );
402 StatusCode sc = m_whiteboard->selectStore( evtContext->slot() );
404 fatal() <<
"Slot " << evtContext->slot() <<
" could not be selected for the WhiteBoard\n" 405 <<
"Impossible to create event context" <<
endmsg;
409 StatusCode declEvtRootSc = declareEventRootAddress();
417 debug() <<
"Event " << evtContext->evt() <<
" submitting in slot " << evtContext->slot() <<
endmsg;
420 tbb::task* task =
new ( tbb::task::allocate_root() )
422 tbb::task::enqueue( *task );
425 debug() <<
"All algorithms were submitted on event " << evtContext->evt() <<
" in slot " << evtContext->slot()
435 auto appmgr = serviceLocator()->as<
IProperty>();
448 auto appmgr = serviceLocator()->as<
IProperty>();
454 StatusCode sc = m_evtSelector->createContext( m_evtSelContext );
456 fatal() <<
"Can not create the event selector Context." <<
endmsg;
461 tbb::task_scheduler_init tbbSchedInit( m_threadPoolSize.value() + 1 );
465 bool newEvtAllowed =
false;
467 info() <<
"Starting loop on events" <<
endmsg;
468 auto start_time = Clock::now();
469 while ( maxevt < 0 || m_finishedEvt < (
unsigned int)maxevt ) {
471 if ( !( ( newEvtAllowed || createdEvts == 0 ) &&
475 ( createdEvts < maxevt || maxevt < 0 ) &&
477 m_whiteboard->freeSlots() > 0 ) ) {
480 using namespace std::chrono_literals;
481 m_createEventCond.wait_for( lock, 1
ms, [
this, newEvtAllowed, createdEvts, maxevt] {
482 return ( newEvtAllowed || createdEvts == 0 ) &&
486 ( createdEvts < maxevt || maxevt < 0 ) &&
488 this->m_whiteboard->freeSlots() > 0;
492 if ( 1 == createdEvts )
493 start_time = Clock::now();
497 sc = executeEvent( &createdEvts );
499 newEvtAllowed =
true;
501 auto end_time = Clock::now();
503 delete m_evtSelContext;
504 m_evtSelContext =
nullptr;
506 constexpr
double oneOver1024 = 1. / 1024.;
507 info() <<
"---> Loop Finished (skipping 1st evt) - " 516 StatusCode sc = m_evtSelector->next( *m_evtSelContext );
519 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
521 sc = m_evtSelector->next( *m_evtSelContext );
523 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
524 if ( !sc.
isSuccess() ) warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
529 info() <<
"No more events in event selection " <<
endmsg;
532 sc = m_evtDataMgrSvc->setRoot(
"/Event", pAddr );
534 warning() <<
"Error declaring event root address." <<
endmsg;
546 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
548 m_algExecStateSvc->dump( ost, *eventContext );
549 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
557 eventFailed( eventContext.
get() ).ignore();
558 int si = eventContext->
slot();
566 debug() <<
"Event " << eventContext->
evt() <<
" finished (slot " << si <<
")." <<
endmsg;
568 fatal() <<
"Failed event detected on " << *eventContext <<
endmsg;
571 debug() <<
"Clearing slot " << si <<
" (event " << eventContext->
evt() <<
") of the whiteboard" <<
endmsg;
573 StatusCode sc = m_whiteboard->clearStore( si );
574 if ( !sc.
isSuccess() ) warning() <<
"Clear of Event data store failed" <<
endmsg;
575 sc = m_whiteboard->freeStore( si );
576 if ( !sc.
isSuccess() ) error() <<
"Whiteboard slot " << eventContext->
slot() <<
" could not be properly cleared";
578 m_createEventCond.notify_all();
IAlgExecStateSvc * m_aess
StatusCode setProperty(IProperty *component, const std::string &name, const TYPE &value, const std::string &doc)
simple function to set the property of the given object from the value
constexpr static const auto FAILURE
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
StatusCode finalize() override
The Event Selector Interface.
StatusCode finalize() override
implementation of IService::finalize
HistogramAgent base in charge of collecting all the references to DataObjects in a transient store th...
Header file for class GaudiAlgorithm.
T duration_cast(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
This class represents an entry point to all the event specific data.
constexpr int ScheduledStop
The IAlgResourcePool is the interface for managing algorithm instances, in particular if clones of th...
#define DECLARE_COMPONENT(type)
constexpr int UnhandledException
tbb::task * execute() override
GAUDI_API ISvcLocator * svcLocator()
Abstract interface for a service that manages the Algorithm execution states.
constexpr static const auto RECOVERABLE
This class is used for returning status codes from appropriate routines.
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
const std::string & key() const
The IRegistry represents the entry door to the environment any data object residing in a transient da...
const IDataSelector & selectedObjects() const
Return the set of selected DataObjects.
virtual const std::string & tag() const
name tag for the exception, or exception type
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
HLTExecutionTask(std::vector< Algorithm * > &algorithms, std::unique_ptr< EventContext > ctx, ISvcLocator *svcLocator, IAlgExecStateSvc *aem, const HLTEventLoopMgr *parent)
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr static const auto SUCCESS
virtual void updateEventStatus(const bool &b, const EventContext &ctx)=0
const HLTEventLoopMgr * m_parent
Base class from which all concrete algorithm classes should be derived.
SmartIF< ISvcLocator > m_serviceLocator
void promoteToExecuted(std::unique_ptr< EventContext > eventContext) const
GAUDI_API long mappedMemory(MemoryUnit unit=kByte, InfoType fetch=Memory, long pid=-1)
Basic Process Information: priority boost.
virtual std::vector< Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const =0
Get the (ordered!) list of algorithms required to provide a given DataObjIDColl.
StatusCode nextEvent(int maxevt) override
implementation of IEventProcessor::nextEvent
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
StatusCode initialize() override
implementation of IService::initialize
T back_inserter(T...args)
const StatusCode & ignore() const
Ignore/check StatusCode.
StatusCode executeEvent(void *par) override
implementation of IEventProcessor::executeEvent(void* par)
GAUDI_API void setCurrentContextEvt(long int evtN)
StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Opaque address interface definition.
StatusCode declareEventRootAddress()
Declare the root address of the event.
The IProperty is the basic interface for all components which have properties that can be set or get...
std::string fullKey() const
StatusCode eventFailed(EventContext *eventContext) const
Method to check if an event failed and take appropriate actions.
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
std::vector< Algorithm * > & m_algorithms
std::unique_ptr< EventContext > m_evtCtx
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.