The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
QueueingEventLoopMgr.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
14#include <GaudiKernel/ThreadLocalContext.h> // for Gaudi::Hive::setCurrentContext
15#include <tbb/concurrent_queue.h>
16#include <thread>
17
18#define ON_DEBUG if ( outputLevel() <= MSG::DEBUG )
19#define ON_VERBOSE if ( outputLevel() <= MSG::VERBOSE )
20
21#define DEBMSG ON_DEBUG debug()
22#define VERMSG ON_VERBOSE verbose()
23
24namespace Gaudi::TestSuite {
26 : public extends<MinimalEventLoopMgr, Gaudi::Interfaces::IQueueingEventProcessor> {
27 public:
28 using extends::extends;
29
30 StatusCode start() override;
31 StatusCode stop() override;
32
33 // backward compatible EventProcessor implementation.
35 using namespace std::chrono_literals;
36 push( std::move( ctx ) );
37 std::optional<ResultType> result;
38 while ( !( result = pop() ) ) std::this_thread::sleep_for( 1ms );
39 return std::get<0>( std::move( *result ) );
40 }
41
42 void push( EventContext&& ctx ) override { m_incoming.push( std::move( ctx ) ); }
43
45 bool empty() const override {
46 // because of the way we count "in flight" (+1 while waiting that we get something from the queue)
47 // and the way tbb:concurrent_bounded_queue reports the size while waiting for pop to return (-1)
48 // this is a correct definition of "empty" (nothing pending, nothing being processed and no results
49 // to be popped)
50 return !( m_inFlight + m_done.size() + m_incoming.size() );
51 }
52
54 std::optional<ResultType> pop() override {
55 ResultType out;
56 if ( m_done.try_pop( out ) )
57 return out;
58 else
59 return std::nullopt;
60 }
61
62 private:
63 tbb::concurrent_bounded_queue<EventContext> m_incoming;
64 tbb::concurrent_bounded_queue<ResultType> m_done;
65 std::atomic<std::size_t> m_inFlight{ 0 };
66
67 // our capacity is the input queue capacity + 1 (N events pending + 1 being processed)
69 this, "Capacity", m_incoming.capacity() + 1,
70 [this]( Gaudi::Details::PropertyBase& ) { m_incoming.set_capacity( m_queueCapacity - 1 ); } };
71
72 std::tuple<StatusCode, EventContext> processEvent( EventContext&& context );
73
74 std::thread m_evtLoopThread;
75 };
76} // namespace Gaudi::TestSuite
77
79
80using namespace Gaudi::TestSuite;
81
82namespace {
85 class RetCodeGuard {
86 public:
87 inline RetCodeGuard( SmartIF<IProperty> appmgr, int retcode )
88 : m_appmgr( std::move( appmgr ) ), m_retcode( retcode ) {}
90 inline ~RetCodeGuard() {
92 }
93
94 private:
95 SmartIF<IProperty> m_appmgr;
96 int m_retcode;
97 };
98} // namespace
99
101 auto ok = base_class::start();
102 if ( !ok ) return ok;
103
105
106 m_evtLoopThread = std::thread( [this]() {
107 while ( true ) {
108 EventContext ctx;
109 ++m_inFlight; // yes, this is not accurate.
110 m_incoming.pop( ctx );
111 if ( ctx.valid() ) {
112 // the sleep is not strictly needed, but it should make the output more stable for the test
113 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
114 m_done.push( processEvent( std::move( ctx ) ) );
115 --m_inFlight; // yes, this is not accurate.
116 } else {
117 DEBMSG << "exiting event loop thread" << endmsg;
118 m_done.emplace( StatusCode::SUCCESS, std::move( ctx ) );
119 --m_inFlight; // yes, this is not accurate.
120 break;
121 }
122 }
123 } );
124
125 return ok;
126}
127
128std::tuple<StatusCode, EventContext> QueueingEventLoopMgr::processEvent( EventContext&& context ) {
129 DEBMSG << "processing " << context << endmsg;
130
131 bool eventfailed = false;
132
133 m_aess->reset( context );
135
136 // select the appropriate store
137 if ( m_WB.isValid() ) m_WB->selectStore( context.slot() ).ignore();
138
139 // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
140 const auto appmgr = serviceLocator()->as<IProperty>();
141 // Call the execute() method of all top algorithms
142 for ( auto& ita : m_topAlgList ) {
144 try {
145 if ( m_abortEventListener.abortEvent ) {
146 DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
147 m_abortEventListener.abortEvent = false;
148 sc.ignore();
149 break;
150 }
151 RetCodeGuard rcg( appmgr, Gaudi::ReturnCode::UnhandledException );
152 sc = ita->sysExecute( context );
153 rcg.ignore(); // disarm the guard
154 } catch ( const GaudiException& Exception ) {
155 fatal() << ".executeEvent(): Exception with tag=" << Exception.tag() << " thrown by " << ita->name() << endmsg;
156 error() << Exception << endmsg;
157 } catch ( const std::exception& Exception ) {
158 fatal() << ".executeEvent(): Standard std::exception thrown by " << ita->name() << endmsg;
159 error() << Exception.what() << endmsg;
160 } catch ( ... ) { fatal() << ".executeEvent(): UNKNOWN Exception thrown by " << ita->name() << endmsg; }
161
162 if ( !sc.isSuccess() ) {
163 warning() << "Execution of algorithm " << ita->name() << " failed" << endmsg;
164 eventfailed = true;
165 }
166 }
167
168 m_aess->updateEventStatus( eventfailed, context );
169
170 // ensure that the abortEvent flag is cleared before the next event
171 if ( m_abortEventListener.abortEvent ) {
172 DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
173 m_abortEventListener.abortEvent = false;
174 }
175
176 // Call the execute() method of all output streams
177 for ( auto& ito : m_outStreamList ) {
178 AlgExecStateRef state = m_aess->algExecState( ito, context );
179 state.setFilterPassed( true );
180 StatusCode sc = ito->sysExecute( context );
181 if ( !sc.isSuccess() ) {
182 warning() << "Execution of output stream " << ito->name() << " failed" << endmsg;
183 eventfailed = true;
184 }
185 }
186
188 // Check if there was an error processing current event
189 if ( eventfailed ) {
190 error() << "Error processing event loop." << endmsg;
191 std::ostringstream ost;
192 m_aess->dump( ost, context );
193 DEBMSG << "Dumping AlgExecStateSvc status:\n" << ost.str() << endmsg;
194 outcome = StatusCode::FAILURE;
195 }
196
197 return { std::move( outcome ), std::move( context ) };
198}
199
201 // Send an invalid context to stop the processing thread
202 push( EventContext{} );
203 m_evtLoopThread.join();
204
205 return base_class::stop();
206}
#define DEBMSG
#define GAUDI_API
Definition Kernel.h:49
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define DECLARE_COMPONENT(type)
wrapper on an Algorithm state.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
This class represents an entry point to all the event specific data.
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Implementation of property with value of concrete type.
Definition PropertyFwd.h:27
tbb::concurrent_bounded_queue< ResultType > m_done
void push(EventContext &&ctx) override
bool empty() const override
Tell if the processor has events in the queues.
tbb::concurrent_bounded_queue< EventContext > m_incoming
Gaudi::Property< std::size_t > m_queueCapacity
std::optional< ResultType > pop() override
Get the next available result, if any.
std::tuple< StatusCode, EventContext > processEvent(EventContext &&context)
StatusCode executeEvent(EventContext &&ctx) override
Define general base for Gaudi exception.
The IProperty is the basic interface for all components which have properties that can be set or get.
Definition IProperty.h:32
SmartIF< IFace > as()
Definition ISvcLocator.h:64
ListAlg m_outStreamList
List of output streams.
AbortEventListener m_abortEventListener
Instance of the incident listener waiting for AbortEvent.
SmartIF< IAlgExecStateSvc > m_aess
List of top level algorithms.
SmartIF< IHiveWhiteBoard > m_WB
< Event data service (whiteboard)
RetCodeGuard(const SmartIF< IProperty > &appmgr, int retcode)
SmartIF< IProperty > m_appmgr
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition StatusCode.h:139
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
Base class used to extend a class implementing other interfaces.
Definition extends.h:19
constexpr int UnhandledException
GAUDI_API void setCurrentContext(const EventContext *ctx)
constexpr int Success
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.