26 #include <condition_variable> 31 #include "boost/algorithm/string.hpp" 32 #include "boost/optional.hpp" 33 #include "boost/thread.hpp" 34 #include "boost/tokenizer.hpp" 35 #include "tbb/task_scheduler_init.h" 44 struct DataObjIDDotRepr {
48 return os <<
'\"' << repr.parent.fullKey() <<
'\"';
52 struct DataObjIDSorter {
78 : m_algorithms( algorithms )
79 , m_evtCtx(
std::move( ctx ) )
81 , m_serviceLocator( svcLocator )
89 return MsgStream( messageSvc,
"HLTExecutionTask" );
94 bool eventfailed =
false;
102 ialg->whiteboard()->selectStore( m_evtCtx->
valid() ? m_evtCtx->
slot() : 0 ).ignore();
108 sc = ialg->sysExecute( *m_evtCtx );
115 log() <<
MSG::FATAL <<
".executeEvent(): Exception with tag=" << Exception.
tag() <<
" thrown by " 119 log() <<
MSG::FATAL <<
".executeEvent(): Standard std::exception thrown by " << ialg->name() <<
endmsg 123 log() <<
MSG::FATAL <<
".executeEvent(): UNKNOWN Exception thrown by " << ialg->name() <<
endmsg;
134 if ( !ialg->filterPassed() )
break;
148 error() <<
"Failed to initialize Service Base class." <<
endmsg;
152 m_evtDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"EventDataSvc" );
153 if ( !m_evtDataMgrSvc ) {
154 fatal() <<
"Error retrieving EventDataSvc interface IDataManagerSvc." <<
endmsg;
157 m_whiteboard = serviceLocator()->service<
IHiveWhiteBoard>(
"EventDataSvc" );
158 if ( !m_whiteboard ) {
159 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
164 if ( !appMgrProperty ) {
165 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
171 if ( m_evtsel !=
"NONE" || m_evtsel.length() == 0 ) {
172 m_evtSelector = serviceLocator()->service<
IEvtSelector>(
"EventSelector" );
174 m_evtSelector =
nullptr;
175 warning() <<
"Unable to locate service \"EventSelector\" " <<
endmsg;
176 warning() <<
"No events will be processed from external input." <<
endmsg;
180 m_histoDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"HistogramDataSvc" );
181 if ( !m_histoDataMgrSvc ) {
182 fatal() <<
"Error retrieving HistogramDataSvc." <<
endmsg;
186 m_histoPersSvc = serviceLocator()->service<
IConversionSvc>(
"HistogramPersistencySvc" );
187 if ( !m_histoPersSvc ) {
188 warning() <<
"Histograms cannot not be saved - though required." <<
endmsg;
192 m_algExecStateSvc = serviceLocator()->service<
IAlgExecStateSvc>(
"AlgExecStateSvc" );
193 if ( !m_algExecStateSvc ) {
194 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
198 if ( !m_topAlgs.empty() ) {
199 auto databroker = serviceLocator()->service<
IDataBroker>(
"HiveDataBrokerSvc" );
203 for (
const auto& alg : m_topAlgs.value() ) {
212 if ( !algResourcePool ) {
213 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
219 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
221 m_algos.push_back( algoPtr );
231 boost::optional<std::ofstream> dot{boost::in_place_init_if, !m_dotfile.empty(), m_dotfile.value()};
234 *dot <<
"digraph G {\n";
235 for (
auto* a : m_algos ) {
236 bool is_consumer = a->dataDependencies( AccessMode::Write ).
empty();
237 *dot <<
'\"' << a->name() <<
"\" [shape=box" << ( is_consumer ?
",style=filled" :
"" ) <<
"];\n";
244 for (
auto id : algoPtr->dataDependencies( AccessMode::Write ) ) {
245 auto r = globalOutp.
insert(
id );
246 producers[id] = algoPtr;
248 warning() <<
"multiple algorithms declare " <<
id 249 <<
" as output! could be a single instance in multiple paths " 250 "though, or control flow may guarantee only one runs...!" 258 algosDependencies.
reserve( m_algos.size() );
260 info() <<
"Data Dependencies for Algorithms:";
263 info() <<
"\n " << algoPtr->name();
266 if ( !algoPtr->dataDependencies( AccessMode::Read ).empty() ||
267 !algoPtr->dataDependencies( AccessMode::Write ).empty() ) {
268 for (
const DataObjID* idp : sortedDataObjIDColl( algoPtr->dataDependencies( AccessMode::Read ) ) ) {
270 info() <<
"\n o INPUT " << id;
271 if (
id.key().find(
":" ) != std::string::npos ) {
272 info() <<
" contains alternatives which require resolution...\n";
273 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.
key(), boost::char_separator<char>{
":"}};
277 if ( itok != tokens.end() ) {
278 info() <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
279 id.updateKey( *itok );
281 error() <<
"failed to find alternate in global output list" 282 <<
" for id: " <<
id <<
" in Alg " << algoPtr->name() <<
endmsg;
285 algoDependencies.
insert(
id );
286 if ( dot ) *dot << DataObjIDDotRepr{
id} <<
" -> \"" << algoPtr->name() <<
"\";\n";
289 for (
const DataObjID*
id : sortedDataObjIDColl( algoPtr->dataDependencies( AccessMode::Write ) ) ) {
290 info() <<
"\n o OUTPUT " << *id;
291 if ( id->key().find(
":" ) != std::string::npos ) {
292 error() <<
" in Alg " << algoPtr->name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 295 if ( dot ) *dot <<
'\"' << algoPtr->name() <<
"\" -> " << DataObjIDDotRepr{*
id} <<
";\n";
301 algo2Deps[algoPtr] = &algosDependencies.
back();
304 for (
const auto& t : globalOutp ) {
305 if ( globalInp.
find( t ) == globalInp.
end() ) *dot << DataObjIDDotRepr{t} <<
" [style=filled];\n";
313 for (
auto o : globalInp ) {
314 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
316 if ( unmetDep.
size() > 0 ) {
318 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
319 ost <<
"\n o " << *o <<
" required by Algorithm: ";
320 for (
const auto& i : algo2Deps ) {
321 if ( i.second->find( *o ) != i.second->end() ) {
322 ost <<
"\n * " << i.first->name();
326 fatal() <<
"The following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
329 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
333 auto start = m_algos.begin();
334 auto end = m_algos.end();
339 while ( current !=
end ) {
350 info() <<
"Concurrency level information:" <<
endmsg;
351 info() <<
" o Number of events slots: " << m_whiteboard->getNumberOfStores() <<
endmsg;
352 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
355 debug() <<
"Order of algo execution :" <<
endmsg;
356 for (
const Algorithm* algo : m_algos ) debug() <<
" . " << algo->name() <<
endmsg;
365 if ( m_histoPersSvc ) {
367 sc = m_histoDataMgrSvc->traverseTree( &agent );
373 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
374 if ( iret.
isSuccess() ) i->registry()->setAddress( pAddr );
382 if ( sc.isSuccess() ) {
383 info() <<
"Histograms converted successfully according to request." <<
endmsg;
385 error() <<
"Error while saving Histograms." <<
endmsg;
388 error() <<
"Error while traversing Histogram data store" <<
endmsg;
399 int& createdEvts = *
reinterpret_cast<int*
>( createdEvts_IntPtr );
401 auto evtContext = std::make_unique<EventContext>();
402 evtContext->set( createdEvts, m_whiteboard->allocateStore( createdEvts ) );
403 m_algExecStateSvc->reset( *evtContext );
405 StatusCode sc = m_whiteboard->selectStore( evtContext->slot() );
407 fatal() <<
"Slot " << evtContext->slot() <<
" could not be selected for the WhiteBoard\n" 408 <<
"Impossible to create event context" <<
endmsg;
412 StatusCode declEvtRootSc = declareEventRootAddress();
420 debug() <<
"Event " << evtContext->evt() <<
" submitting in slot " << evtContext->slot() <<
endmsg;
423 tbb::task* task =
new ( tbb::task::allocate_root() )
425 tbb::task::enqueue( *task );
428 debug() <<
"All algorithms were submitted on event " << evtContext->evt() <<
" in slot " << evtContext->slot()
438 auto appmgr = serviceLocator()->as<
IProperty>();
451 auto appmgr = serviceLocator()->as<
IProperty>();
457 StatusCode sc = m_evtSelector->createContext( m_evtSelContext );
459 fatal() <<
"Can not create the event selector Context." <<
endmsg;
464 tbb::task_scheduler_init tbbSchedInit( m_threadPoolSize.value() + 1 );
468 bool newEvtAllowed =
false;
470 info() <<
"Starting loop on events" <<
endmsg;
471 auto start_time = Clock::now();
472 while ( maxevt < 0 || m_finishedEvt < (
unsigned int)maxevt ) {
474 if ( !( ( newEvtAllowed || createdEvts == 0 ) &&
478 ( createdEvts < maxevt || maxevt < 0 ) &&
480 m_whiteboard->freeSlots() > 0 ) ) {
483 using namespace std::chrono_literals;
484 m_createEventCond.wait_for( lock, 1
ms, [
this, newEvtAllowed, createdEvts, maxevt] {
485 return ( newEvtAllowed || createdEvts == 0 ) &&
489 ( createdEvts < maxevt || maxevt < 0 ) &&
491 this->m_whiteboard->freeSlots() > 0;
495 if ( 1 == createdEvts )
496 start_time = Clock::now();
500 sc = executeEvent( &createdEvts );
502 newEvtAllowed =
true;
504 auto end_time = Clock::now();
506 delete m_evtSelContext;
507 m_evtSelContext =
nullptr;
509 constexpr
double oneOver1024 = 1. / 1024.;
510 info() <<
"---> Loop Finished (skipping 1st evt) - " 519 StatusCode sc = m_evtSelector->next( *m_evtSelContext );
522 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
524 sc = m_evtSelector->next( *m_evtSelContext );
526 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
527 if ( !sc.
isSuccess() ) warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
532 info() <<
"No more events in event selection " <<
endmsg;
535 sc = m_evtDataMgrSvc->setRoot(
"/Event", pAddr );
537 warning() <<
"Error declaring event root address." <<
endmsg;
549 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
551 m_algExecStateSvc->dump( ost, *eventContext );
552 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
560 eventFailed( eventContext.
get() ).ignore();
561 int si = eventContext->
slot();
569 debug() <<
"Event " << eventContext->
evt() <<
" finished (slot " << si <<
")." <<
endmsg;
571 fatal() <<
"Failed event detected on " << *eventContext <<
endmsg;
574 debug() <<
"Clearing slot " << si <<
" (event " << eventContext->
evt() <<
") of the whiteboard" <<
endmsg;
576 StatusCode sc = m_whiteboard->clearStore( si );
577 if ( !sc.
isSuccess() ) warning() <<
"Clear of Event data store failed" <<
endmsg;
578 sc = m_whiteboard->freeStore( si );
579 if ( !sc.
isSuccess() ) error() <<
"Whiteboard slot " << eventContext->
slot() <<
" could not be properly cleared";
581 m_createEventCond.notify_all();
IAlgExecStateSvc * m_aess
StatusCode setProperty(IProperty *component, const std::string &name, const TYPE &value, const std::string &doc)
simple function to set the property of the given object from the value
constexpr static const auto FAILURE
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
StatusCode finalize() override
IDataHandleMetadata::AccessMode AccessMode
The Event Selector Interface.
StatusCode finalize() override
implementation of IService::finalize
HistogramAgent base in charge of collecting all the references to DataObjects in a transient store th...
Header file for class GaudiAlgorithm.
T duration_cast(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
This class represents an entry point to all the event specific data.
constexpr int ScheduledStop
The IAlgResourcePool is the interface for managing algorithm instances, in particular if clones of th...
#define DECLARE_COMPONENT(type)
constexpr int UnhandledException
tbb::task * execute() override
GAUDI_API ISvcLocator * svcLocator()
Abstract interface for a service that manages the Algorithm execution states.
constexpr static const auto RECOVERABLE
This class is used for returning status codes from appropriate routines.
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
const std::string & key() const
The IRegistry represents the entry door to the environment any data object residing in a transient da...
const IDataSelector & selectedObjects() const
Return the set of selected DataObjects.
virtual const std::string & tag() const
name tag for the exception, or exception type
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
HLTExecutionTask(std::vector< Algorithm * > &algorithms, std::unique_ptr< EventContext > ctx, ISvcLocator *svcLocator, IAlgExecStateSvc *aem, const HLTEventLoopMgr *parent)
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr static const auto SUCCESS
virtual void updateEventStatus(const bool &b, const EventContext &ctx)=0
const HLTEventLoopMgr * m_parent
Base class from which all concrete algorithm classes should be derived.
SmartIF< ISvcLocator > m_serviceLocator
void promoteToExecuted(std::unique_ptr< EventContext > eventContext) const
GAUDI_API long mappedMemory(MemoryUnit unit=kByte, InfoType fetch=Memory, long pid=-1)
Basic Process Information: priority boost.
virtual std::vector< Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const =0
Get the (ordered!) list of algorithms required to provide a given DataObjIDColl.
StatusCode nextEvent(int maxevt) override
implementation of IEventProcessor::nextEvent
bool empty() const
Tell if this DataObjID is has an empty key.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
StatusCode initialize() override
implementation of IService::initialize
T back_inserter(T...args)
const StatusCode & ignore() const
Ignore/check StatusCode.
StatusCode executeEvent(void *par) override
implementation of IEventProcessor::executeEvent(void* par)
GAUDI_API void setCurrentContextEvt(long int evtN)
StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Opaque address interface definition.
StatusCode declareEventRootAddress()
Declare the root address of the event.
The IProperty is the basic interface for all components which have properties that can be set or get...
std::string fullKey() const
StatusCode eventFailed(EventContext *eventContext) const
Method to check if an event failed and take appropriate actions.
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
std::vector< Algorithm * > & m_algorithms
std::unique_ptr< EventContext > m_evtCtx
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.