Go to the documentation of this file.
15 #include <tbb/concurrent_queue.h>
18 #define ON_DEBUG if ( outputLevel() <= MSG::DEBUG )
19 #define ON_VERBOSE if ( outputLevel() <= MSG::VERBOSE )
21 #define DEBMSG ON_DEBUG debug()
22 #define VERMSG ON_VERBOSE verbose()
26 :
public extends<MinimalEventLoopMgr, Gaudi::Interfaces::IQueueingEventProcessor> {
28 using extends::extends;
35 using namespace std::chrono_literals;
37 std::optional<ResultType> result;
39 return std::get<0>(
std::move( *result ) );
50 return !( m_inFlight + m_done.size() + m_incoming.size() );
54 std::optional<ResultType>
pop()
override {
56 if ( m_done.try_pop(
out ) )
64 tbb::concurrent_bounded_queue<ResultType>
m_done;
69 this,
"Capacity", m_incoming.capacity() + 1,
90 inline ~RetCodeGuard() {
102 if ( !ok )
return ok;
113 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
114 m_done.push( processEvent( std::move( ctx ) ) );
117 DEBMSG <<
"exiting event loop thread" << endmsg;
118 m_done.emplace( StatusCode::SUCCESS, std::move( ctx ) );
131 bool eventfailed =
false;
152 sc = ita->sysExecute( context );
155 fatal() <<
".executeEvent(): Exception with tag=" << Exception.tag() <<
" thrown by " << ita->name() <<
endmsg;
156 error() << Exception <<
endmsg;
158 fatal() <<
".executeEvent(): Standard std::exception thrown by " << ita->name() <<
endmsg;
159 error() << Exception.what() <<
endmsg;
160 }
catch ( ... ) { fatal() <<
".executeEvent(): UNKNOWN Exception thrown by " << ita->name() <<
endmsg; }
163 warning() <<
"Execution of algorithm " << ita->name() <<
" failed" <<
endmsg;
168 m_aess->updateEventStatus( eventfailed, context );
179 state.setFilterPassed(
true );
182 warning() <<
"Execution of output stream " << ito->name() <<
" failed" <<
endmsg;
190 error() <<
"Error processing event loop." <<
endmsg;
192 m_aess->dump( ost, context );
205 return base_class::stop();
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
GAUDI_API void setCurrentContext(const EventContext *ctx)
tbb::concurrent_bounded_queue< ResultType > m_done
std::string abortEventSource
Source of the AbortEvent incident.
bool abortEvent
Flag signalling that the event being processed has to be aborted (skip all following top algs).
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
std::thread m_evtLoopThread
StatusCode executeEvent(EventContext &&ctx) override
RetCodeGuard(const SmartIF< IProperty > &appmgr, int retcode)
SmartIF< IAlgExecStateSvc > m_aess
List of top level algorithms.
bool isValid() const
Allow for check if smart pointer is valid.
ListAlg m_outStreamList
List of output streams.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
StatusCode start() override
std::atomic< std::size_t > m_inFlight
constexpr int UnhandledException
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Base class used to extend a class implementing other interfaces.
SmartIF< IHiveWhiteBoard > m_WB
< Event data service (whiteboard)
tbb::concurrent_bounded_queue< EventContext > m_incoming
std::tuple< StatusCode, EventContext > processEvent(EventContext &&context)
void push(EventContext &&ctx) override
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
SmartIF< IFace > as() const
return a new SmartIF instance to another interface
constexpr static const auto SUCCESS
bool empty() const override
Tell if the processor has events in the queues.
#define DECLARE_COMPONENT(type)
Gaudi::Property< std::size_t > m_queueCapacity
AbortEventListener m_abortEventListener
Instance of the incident listener waiting for AbortEvent.
constexpr static const auto FAILURE
StatusCode stop() override
std::optional< ResultType > pop() override
Get the next available result, if any.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator