The Gaudi Framework  master (37c0b60a)
QueueingEventLoopMgr.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
14 #include <GaudiKernel/ThreadLocalContext.h> // for Gaudi::Hive::setCurrentContext
15 #include <tbb/concurrent_queue.h>
16 #include <thread>
17 
18 #define ON_DEBUG if ( outputLevel() <= MSG::DEBUG )
19 #define ON_VERBOSE if ( outputLevel() <= MSG::VERBOSE )
20 
21 #define DEBMSG ON_DEBUG debug()
22 #define VERMSG ON_VERBOSE verbose()
23 
24 namespace Gaudi::TestSuite {
26  : public extends<MinimalEventLoopMgr, Gaudi::Interfaces::IQueueingEventProcessor> {
27  public:
28  using extends::extends;
29 
30  StatusCode start() override;
31  StatusCode stop() override;
32 
33  // backward compatible EventProcessor implementation.
35  using namespace std::chrono_literals;
36  push( std::move( ctx ) );
37  std::optional<ResultType> result;
38  while ( !( result = pop() ) ) std::this_thread::sleep_for( 1ms );
39  return std::get<0>( std::move( *result ) );
40  }
41 
42  void push( EventContext&& ctx ) override { m_incoming.push( std::move( ctx ) ); }
43 
45  bool empty() const override {
46  // because of the way we count "in flight" (+1 while waiting that we get something from the queue)
47  // and the way tbb:concurrent_bounded_queue reports the size while waiting for pop to return (-1)
48  // this is a correct definition of "empty" (nothing pending, nothing being processed and no results
49  // to be popped)
50  return !( m_inFlight + m_done.size() + m_incoming.size() );
51  }
52 
54  std::optional<ResultType> pop() override {
55  ResultType out;
56  if ( m_done.try_pop( out ) )
57  return out;
58  else
59  return std::nullopt;
60  }
61 
62  private:
63  tbb::concurrent_bounded_queue<EventContext> m_incoming;
64  tbb::concurrent_bounded_queue<ResultType> m_done;
65  std::atomic<std::size_t> m_inFlight{ 0 };
66 
67  // our capacity is the input queue capacity + 1 (N events pending + 1 being processed)
69  this, "Capacity", m_incoming.capacity() + 1,
70  [this]( Gaudi::Details::PropertyBase& ) { m_incoming.set_capacity( m_queueCapacity - 1 ); } };
71 
72  std::tuple<StatusCode, EventContext> processEvent( EventContext&& context );
73 
75  };
76 } // namespace Gaudi::TestSuite
77 
79 
80 using namespace Gaudi::TestSuite;
81 
82 namespace {
85  class RetCodeGuard {
86  public:
87  inline RetCodeGuard( SmartIF<IProperty> appmgr, int retcode )
88  : m_appmgr( std::move( appmgr ) ), m_retcode( retcode ) {}
89  inline void ignore() { m_retcode = Gaudi::ReturnCode::Success; }
90  inline ~RetCodeGuard() {
91  if ( Gaudi::ReturnCode::Success != m_retcode ) { Gaudi::setAppReturnCode( m_appmgr, m_retcode ).ignore(); }
92  }
93 
94  private:
95  SmartIF<IProperty> m_appmgr;
96  int m_retcode;
97  };
98 } // namespace
99 
101  auto ok = base_class::start();
102  if ( !ok ) return ok;
103 
104  info() << m_queueCapacity << endmsg;
105 
106  m_evtLoopThread = std::thread( [this]() {
107  while ( true ) {
109  ++m_inFlight; // yes, this is not accurate.
110  m_incoming.pop( ctx );
111  if ( ctx.valid() ) {
112  // the sleep is not strictly needed, but it should make the output more stable for the test
113  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
114  m_done.push( processEvent( std::move( ctx ) ) );
115  --m_inFlight; // yes, this is not accurate.
116  } else {
117  DEBMSG << "exiting event loop thread" << endmsg;
118  m_done.emplace( StatusCode::SUCCESS, std::move( ctx ) );
119  --m_inFlight; // yes, this is not accurate.
120  break;
121  }
122  }
123  } );
124 
125  return ok;
126 }
127 
129  DEBMSG << "processing " << context << endmsg;
130 
131  bool eventfailed = false;
132 
133  m_aess->reset( context );
135 
136  // select the appropriate store
137  if ( m_WB.isValid() ) m_WB->selectStore( context.slot() ).ignore();
138 
139  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
140  const auto appmgr = serviceLocator()->as<IProperty>();
141  // Call the execute() method of all top algorithms
142  for ( auto& ita : m_topAlgList ) {
144  try {
146  DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
148  sc.ignore();
149  break;
150  }
151  RetCodeGuard rcg( appmgr, Gaudi::ReturnCode::UnhandledException );
152  sc = ita->sysExecute( context );
153  rcg.ignore(); // disarm the guard
154  } catch ( const GaudiException& Exception ) {
155  fatal() << ".executeEvent(): Exception with tag=" << Exception.tag() << " thrown by " << ita->name() << endmsg;
156  error() << Exception << endmsg;
157  } catch ( const std::exception& Exception ) {
158  fatal() << ".executeEvent(): Standard std::exception thrown by " << ita->name() << endmsg;
159  error() << Exception.what() << endmsg;
160  } catch ( ... ) { fatal() << ".executeEvent(): UNKNOWN Exception thrown by " << ita->name() << endmsg; }
161 
162  if ( !sc.isSuccess() ) {
163  warning() << "Execution of algorithm " << ita->name() << " failed" << endmsg;
164  eventfailed = true;
165  }
166  }
167 
168  m_aess->updateEventStatus( eventfailed, context );
169 
170  // ensure that the abortEvent flag is cleared before the next event
172  DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
174  }
175 
176  // Call the execute() method of all output streams
177  for ( auto& ito : m_outStreamList ) {
178  AlgExecState& state = m_aess->algExecState( ito, context );
179  state.setFilterPassed( true );
180  StatusCode sc = ito->sysExecute( context );
181  if ( !sc.isSuccess() ) {
182  warning() << "Execution of output stream " << ito->name() << " failed" << endmsg;
183  eventfailed = true;
184  }
185  }
186 
188  // Check if there was an error processing current event
189  if ( eventfailed ) {
190  error() << "Error processing event loop." << endmsg;
191  std::ostringstream ost;
192  m_aess->dump( ost, context );
193  DEBMSG << "Dumping AlgExecStateSvc status:\n" << ost.str() << endmsg;
194  outcome = StatusCode::FAILURE;
195  }
196 
197  return { std::move( outcome ), std::move( context ) };
198 }
199 
201  // Send an invalid context to stop the processing thread
202  push( EventContext{} );
204 
205  return base_class::stop();
206 }
Gaudi::ReturnCode::Success
constexpr int Success
Definition: AppReturnCode.h:26
Gaudi::Details::PropertyBase
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: PropertyBase.h:35
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
std::this_thread::sleep_for
T sleep_for(T... args)
Gaudi::TestSuite::QueueingEventLoopMgr::m_done
tbb::concurrent_bounded_queue< ResultType > m_done
Definition: QueueingEventLoopMgr.cpp:64
std::exception
STL class.
AppReturnCode.h
std::move
T move(T... args)
MinimalEventLoopMgr::AbortEventListener::abortEventSource
std::string abortEventSource
Source of the AbortEvent incident.
Definition: MinimalEventLoopMgr.h:51
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
MinimalEventLoopMgr::AbortEventListener::abortEvent
bool abortEvent
Flag signalling that the event being processed has to be aborted (skip all following top algs).
Definition: MinimalEventLoopMgr.h:49
SmartIF::reset
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
Definition: SmartIF.h:96
GaudiException
Definition: GaudiException.h:31
IQueueingEventProcessor.h
gaudirun.retcode
retcode
Definition: gaudirun.py:651
std::tuple
IOTest.start
start
Definition: IOTest.py:110
Gaudi::TestSuite::QueueingEventLoopMgr::m_evtLoopThread
std::thread m_evtLoopThread
Definition: QueueingEventLoopMgr.cpp:74
Gaudi::Units::ms
constexpr double ms
Definition: SystemOfUnits.h:154
IProperty
Definition: IProperty.h:33
Gaudi::TestSuite::QueueingEventLoopMgr::executeEvent
StatusCode executeEvent(EventContext &&ctx) override
Definition: QueueingEventLoopMgr.cpp:34
RetCodeGuard::RetCodeGuard
RetCodeGuard(const SmartIF< IProperty > &appmgr, int retcode)
Definition: RetCodeGuard.h:21
MinimalEventLoopMgr::m_aess
SmartIF< IAlgExecStateSvc > m_aess
List of top level algorithms.
Definition: MinimalEventLoopMgr.h:73
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
GaudiPython.Pythonizations.ctx
ctx
Definition: Pythonizations.py:578
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
MinimalEventLoopMgr::m_outStreamList
ListAlg m_outStreamList
List of output streams.
Definition: MinimalEventLoopMgr.h:76
Gaudi::setAppReturnCode
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:59
Gaudi::TestSuite::QueueingEventLoopMgr::start
StatusCode start() override
Definition: QueueingEventLoopMgr.cpp:100
Gaudi::TestSuite::QueueingEventLoopMgr::m_inFlight
std::atomic< std::size_t > m_inFlight
Definition: QueueingEventLoopMgr.cpp:65
DEBMSG
#define DEBMSG
Definition: QueueingEventLoopMgr.cpp:21
SmartIF< IProperty >
Gaudi::ReturnCode::UnhandledException
constexpr int UnhandledException
Definition: AppReturnCode.h:37
Gaudi::TestSuite
Definition: ConditionAccessorHolder.h:21
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
std::atomic< std::size_t >
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
MinimalEventLoopMgr::m_WB
SmartIF< IHiveWhiteBoard > m_WB
< Event data service (whiteboard)
Definition: MinimalEventLoopMgr.h:138
Gaudi::TestSuite::QueueingEventLoopMgr::m_incoming
tbb::concurrent_bounded_queue< EventContext > m_incoming
Definition: QueueingEventLoopMgr.cpp:63
Gaudi::TestSuite::QueueingEventLoopMgr::processEvent
std::tuple< StatusCode, EventContext > processEvent(EventContext &&context)
Definition: QueueingEventLoopMgr.cpp:128
Gaudi::TestSuite::QueueingEventLoopMgr::push
void push(EventContext &&ctx) override
Definition: QueueingEventLoopMgr.cpp:42
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
SmartIF::as
SmartIF< IFace > as() const
return a new SmartIF instance to another interface
Definition: SmartIF.h:117
std::ostringstream
STL class.
ThreadLocalContext.h
MinimalEventLoopMgr::m_topAlgList
ListAlg m_topAlgList
Definition: MinimalEventLoopMgr.h:74
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
Gaudi::TestSuite::QueueingEventLoopMgr::empty
bool empty() const override
Tell if the processor has events in the queues.
Definition: QueueingEventLoopMgr.cpp:45
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
MinimalEventLoopMgr.h
EventContext
Definition: EventContext.h:34
std::ostringstream::str
T str(T... args)
Gaudi::TestSuite::QueueingEventLoopMgr::m_queueCapacity
Gaudi::Property< std::size_t > m_queueCapacity
Definition: QueueingEventLoopMgr.cpp:68
MinimalEventLoopMgr::m_abortEventListener
AbortEventListener m_abortEventListener
Instance of the incident listener waiting for AbortEvent.
Definition: MinimalEventLoopMgr.h:82
AlgExecState
Definition: IAlgExecStateSvc.h:36
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
compareRootHistos.state
state
Definition: compareRootHistos.py:496
Gaudi::TestSuite::QueueingEventLoopMgr::stop
StatusCode stop() override
Definition: QueueingEventLoopMgr.cpp:200
Gaudi::TestSuite::QueueingEventLoopMgr
Definition: QueueingEventLoopMgr.cpp:26
GAUDI_API
#define GAUDI_API
Definition: Kernel.h:81
Gaudi::Property< std::size_t >
Gaudi::TestSuite::QueueingEventLoopMgr::pop
std::optional< ResultType > pop() override
Get the next available result, if any.
Definition: QueueingEventLoopMgr.cpp:54
std::thread::join
T join(T... args)
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
PrepareBase.out
out
Definition: PrepareBase.py:20