26 #include <condition_variable> 30 #include "boost/algorithm/string.hpp" 31 #include "boost/thread.hpp" 32 #include "boost/tokenizer.hpp" 33 #include "tbb/task_scheduler_init.h" 42 struct DataObjIDSorter {
62 error() <<
"Failed to initialize Service Base class." <<
endmsg;
66 m_evtDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"EventDataSvc" );
67 if ( !m_evtDataMgrSvc ) {
68 fatal() <<
"Error retrieving EventDataSvc interface IDataManagerSvc." <<
endmsg;
71 m_whiteboard = serviceLocator()->service<
IHiveWhiteBoard>(
"EventDataSvc" );
72 if ( !m_whiteboard ) {
73 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
78 if ( !appMgrProperty ) {
79 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
85 if ( m_evtsel !=
"NONE" || m_evtsel.length() == 0 ) {
86 m_evtSelector = serviceLocator()->service<
IEvtSelector>(
"EventSelector" );
89 warning() <<
"Unable to locate service \"EventSelector\" " <<
endmsg;
90 warning() <<
"No events will be processed from external input." <<
endmsg;
94 m_histoDataMgrSvc = serviceLocator()->service<
IDataManagerSvc>(
"HistogramDataSvc" );
95 if ( !m_histoDataMgrSvc ) {
96 fatal() <<
"Error retrieving HistogramDataSvc." <<
endmsg;
100 m_histoPersSvc = serviceLocator()->service<
IConversionSvc>(
"HistogramPersistencySvc" );
101 if ( !m_histoPersSvc ) {
102 warning() <<
"Histograms cannot not be saved - though required." <<
endmsg;
106 m_algExecStateSvc = serviceLocator()->service<
IAlgExecStateSvc>(
"AlgExecStateSvc" );
107 if ( !m_algExecStateSvc ) {
108 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
113 m_precSvc = serviceLocator()->service<
IPrecedenceSvc>(
"PrecedenceSvc" );
115 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
120 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
124 m_algExecStateSvc = serviceLocator()->service<
IAlgExecStateSvc>(
"AlgExecStateSvc" );
125 if ( !m_algExecStateSvc ) {
126 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
132 if ( !algResourcePool ) {
133 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
137 m_algos.push_back( alg );
139 const unsigned int algsNumber = m_algos.size();
152 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
155 auto r = globalOutp.
insert(
id );
156 producers[id] = algoPtr;
158 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths " 159 "though, or control flow may guarantee only one runs...!" 168 info() <<
"Data Dependencies for Algorithms:";
172 info() <<
"\n " << algoPtr->
name();
178 info() <<
"\n o INPUT " << id;
179 if (
id.key().find(
":" ) != std::string::npos ) {
180 info() <<
" contains alternatives which require resolution...\n";
181 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.
key(), boost::char_separator<char>{
":"}};
185 if ( itok != tokens.end() ) {
186 info() <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
187 id.updateKey( *itok );
189 error() <<
"failed to find alternate in global output list" 190 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
193 algoDependencies.
insert(
id );
197 info() <<
"\n o OUTPUT " << *id;
198 if ( id->key().find(
":" ) != std::string::npos ) {
199 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 207 algo2Index[ialgoPtr] =
n;
214 for (
auto o : globalInp ) {
215 if ( globalOutp.
find( o ) == globalOutp.
end() ) {
219 if ( unmetDep.
size() > 0 ) {
221 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
222 ost <<
"\n o " << *o <<
" required by Algorithm: ";
223 for (
size_t i = 0; i < algosDependencies.
size(); ++i ) {
224 if ( algosDependencies[i].find( *o ) != algosDependencies[i].
end() ) {
225 ost <<
"\n * " << m_algname_vect[i];
229 fatal() <<
"The following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
232 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
236 auto messageSvc = serviceLocator()->service<
IMessageSvc>(
"MessageSvc" );
237 m_eventSlots.assign( m_whiteboard->getNumberOfStores(),
241 info() <<
"Concurrency level information:" <<
endmsg;
242 info() <<
" o Number of events slots: " << m_whiteboard->getNumberOfStores() <<
endmsg;
243 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
246 auto start = m_algos.begin();
247 auto end = m_algos.end();
249 return algosDependencies[algo2Index[algo]].
empty();
253 while ( current !=
end ) {
255 current,
end, [
start, current, &producers, &algosDependencies, &algo2Index](
const IAlgorithm* algo ) {
256 return std::none_of( algosDependencies[algo2Index[algo]].
begin(), algosDependencies[algo2Index[algo]].
end(),
264 debug() <<
"Order of algo execution :" <<
endmsg;
265 m_algname_vect.resize( algsNumber );
269 m_algname_index_map[
name] = index;
270 m_algname_vect.at( index ) =
name;
271 debug() <<
" . " << algo->name() <<
endmsg;
281 if ( m_histoPersSvc ) {
283 sc = m_histoDataMgrSvc->traverseTree( &agent );
289 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
290 if ( iret.
isSuccess() ) i->registry()->setAddress( pAddr );
298 if ( sc.isSuccess() ) {
299 info() <<
"Histograms converted successfully according to request." <<
endmsg;
301 error() <<
"Error while saving Histograms." <<
endmsg;
304 error() <<
"Error while traversing Histogram data store" <<
endmsg;
315 int& createdEvts = *( (
int*)createdEvts_IntPtr );
318 evtContext->
set( createdEvts, m_whiteboard->allocateStore( createdEvts ) );
319 m_algExecStateSvc->reset( *evtContext.
get() );
323 fatal() <<
"Slot " << evtContext->
slot() <<
" could not be selected for the WhiteBoard\n" 324 <<
"Impossible to create event context" <<
endmsg;
328 StatusCode declEvtRootSc = declareEventRootAddress();
335 verbose() <<
"Adding event " << evtContext->
evt() <<
", slot " << evtContext->
slot() <<
" to the scheduler" <<
endmsg;
338 const unsigned int thisSlotNum = evtContext->
slot();
339 m_eventSlots[thisSlotNum].reset( evtContext.
get() );
342 this->promoteToExecuted(
std::move( evtContext ) );
346 m_algos,
std::move( evtContext ), serviceLocator(), m_algExecStateSvc, promote2ExecutedClosure );
347 tbb::task::enqueue( *task );
350 debug() <<
"All algorithms were submitted on event " << evtContext->
evt() <<
" in slot " << thisSlotNum <<
endmsg;
359 auto appmgr = serviceLocator()->as<
IProperty>();
372 auto appmgr = serviceLocator()->as<
IProperty>();
378 StatusCode sc = m_evtSelector->createContext( m_evtSelContext );
380 fatal() <<
"Can not create the event selector Context." <<
endmsg;
385 tbb::task_scheduler_init tbbSchedInit( m_threadPoolSize.value() + 1 );
389 bool newEvtAllowed =
false;
391 info() <<
"Starting loop on events" <<
endmsg;
392 auto start_time = Clock::now();
393 while ( maxevt < 0 || m_finishedEvt < (
unsigned int)maxevt ) {
395 if ( !( ( newEvtAllowed || createdEvts == 0 ) &&
399 ( createdEvts < maxevt || maxevt < 0 ) &&
401 m_whiteboard->freeSlots() > 0 ) ) {
404 using namespace std::chrono_literals;
405 m_createEventCond.wait_for( lock, 1
ms, [
this, newEvtAllowed, createdEvts, maxevt] {
406 return ( newEvtAllowed || createdEvts == 0 ) &&
410 ( createdEvts < maxevt || maxevt < 0 ) &&
412 this->m_whiteboard->freeSlots() > 0;
416 if ( 1 == createdEvts )
417 start_time = Clock::now();
421 sc = executeEvent( &createdEvts );
425 newEvtAllowed =
true;
427 auto end_time = Clock::now();
429 delete m_evtSelContext;
430 m_evtSelContext =
nullptr;
432 constexpr
double oneOver1024 = 1. / 1024.;
433 info() <<
"---> Loop Finished (skipping 1st evt) - " 442 StatusCode sc = m_evtSelector->next( *m_evtSelContext );
445 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
447 sc = m_evtSelector->next( *m_evtSelContext );
449 sc = m_evtSelector->createAddress( *m_evtSelContext, pAddr );
450 if ( !sc.
isSuccess() ) warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
455 info() <<
"No more events in event selection " <<
endmsg;
458 sc = m_evtDataMgrSvc->setRoot(
"/Event", pAddr );
460 warning() <<
"Error declaring event root address." <<
endmsg;
472 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " << eventContext->
slot() <<
" failed! ***" <<
endmsg;
474 m_algExecStateSvc->dump( ost, *eventContext );
475 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot() <<
":\n" << ost.
str() <<
endmsg;
483 eventFailed( eventContext.
get() ).ignore();
484 int si = eventContext->
slot();
487 debug() <<
"Event " << eventContext->
evt() <<
" executed in slot " << si <<
"." <<
endmsg;
492 debug() <<
"Event " << eventContext->
evt() <<
" finished (slot " << si <<
")." <<
endmsg;
494 fatal() <<
"Failed event detected on " << *eventContext <<
endmsg;
497 debug() <<
"Clearing slot " << si <<
" (event " << eventContext->
evt() <<
") of the whiteboard" <<
endmsg;
499 StatusCode sc = m_whiteboard->clearStore( si );
501 warning() <<
"Clear of Event data store failed" <<
endmsg;
503 m_eventSlots[si].eventContext =
nullptr;
504 sc = m_whiteboard->freeStore( si );
506 error() <<
"Whiteboard slot " << eventContext->
slot() <<
" could not be properly cleared";
509 m_createEventCond.notify_all();
514 bool eventfailed =
false;
531 m_aess->algExecState( ialg, *m_evtCtx ).setState( AlgExecState::State::Executing );
532 sc = ialg->sysExecute( *m_evtCtx );
539 log() <<
MSG::FATAL <<
".executeEvent(): Exception with tag=" << Exception.
tag() <<
" thrown by " << ialg->name()
543 log() <<
MSG::FATAL <<
".executeEvent(): Standard std::exception thrown by " << ialg->name() <<
endmsg 547 log() <<
MSG::FATAL <<
".executeEvent(): UNKNOWN Exception thrown by " << ialg->name() <<
endmsg;
557 m_aess->algExecState( ialg, *m_evtCtx ).setState( AlgExecState::State::Done, sc );
558 m_aess->updateEventStatus( eventfailed, *m_evtCtx );
566 m_promote2ExecutedClosure(
std::move( m_evtCtx ) );
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
StatusCode setProperty(IProperty *component, const std::string &name, const TYPE &value, const std::string &doc)
simple function to set the property of the given object from the value
constexpr static const auto FAILURE
StatusCode initialize() override
const unsigned int & getAlgoIndex() const
Get algorithm index.
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
void commitHandles() override
const std::string & name() const override
The identifying name of the algorithm object.
void set(const ContextEvt_t &e=0, const ContextID_t &s=INVALID_CONTEXT_ID)
StatusCode finalize() override
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The Event Selector Interface.
StatusCode finalize() override
implementation of IService::finalize
const DataObjIDColl & outputDataObjs() const override
HistogramAgent base in charge of collecting all the refereces to DataObjects in a transient store tha...
A service to resolve the task execution precedence.
Header file for class GaudiAlgorithm.
Abstract interface for a service that manages tasks' precedence.
T duration_cast(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
This class represents an entry point to all the event specific data.
constexpr int ScheduledStop
#define DECLARE_COMPONENT(type)
The IAlgResourcePool is the interface for managing algorithm instances, in particular if clones of th...
constexpr int UnhandledException
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
tbb::task * execute() override
Abstract interface for a service that manages the Algorithm execution states.
constexpr static const auto RECOVERABLE
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
const std::string & key() const
The IMessage is the interface implemented by the message service.
The IRegistry represents the entry door to the environment any data object residing in a transient da...
virtual const std::string & tag() const
name tag for the exception, or exception type
IDataSelector * selectedObjects()
Return the set of selected DataObjects.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr static const auto SUCCESS
SmartIF< IHiveWhiteBoard > & whiteboard() const
Base class from which all concrete algorithm classes should be derived.
GAUDI_API long mappedMemory(MemoryUnit unit=kByte, InfoType fetch=Memory, long pid=-1)
Basic Process Information: priority boost.
StatusCode nextEvent(int maxevt) override
implementation of IEventProcessor::nextEvent
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
StatusCode initialize() override
implementation of IService::initialize
const StatusCode & ignore() const
Ignore/check StatusCode.
bool filterPassed() const override
Did this algorithm pass or fail its filter criterion for the last event?
Class representing the event slot.
StatusCode executeEvent(void *par) override
implementation of IEventProcessor::executeEvent(void* par)
GAUDI_API void setCurrentContextEvt(long int evtN)
void promoteToExecuted(std::unique_ptr< EventContext > eventContext)
Algorithm promotion.
StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Opaque address interface definition.
StatusCode declareEventRootAddress()
Declare the root address of the event.
The IProperty is the basic interface for all components which have properties that can be set or get...
std::string fullKey() const
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.