The Gaudi Framework  v36r1 (3e2fb5a8)
QueueingEventLoopMgr.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
14 #include <GaudiKernel/ThreadLocalContext.h> // for Gaudi::Hive::setCurrentContext
15 #include <tbb/concurrent_queue.h>
16 #include <thread>
17 
18 #define ON_DEBUG if ( UNLIKELY( outputLevel() <= MSG::DEBUG ) )
19 #define ON_VERBOSE if ( UNLIKELY( outputLevel() <= MSG::VERBOSE ) )
20 
21 #define DEBMSG ON_DEBUG debug()
22 #define VERMSG ON_VERBOSE verbose()
23 
24 namespace Gaudi::Examples {
26  : public extends<MinimalEventLoopMgr, Gaudi::Interfaces::IQueueingEventProcessor> {
27  public:
28  using extends::extends;
29 
30  StatusCode start() override;
31  StatusCode stop() override;
32 
33  // backward compatible EventProcessor implementation.
35  using namespace std::chrono_literals;
36  push( std::move( ctx ) );
37  std::optional<ResultType> result;
38  while ( !( result = pop() ) ) std::this_thread::sleep_for( 1ms );
39  return std::get<0>( std::move( *result ) );
40  }
41 
42  void push( EventContext&& ctx ) override { m_incoming.push( std::move( ctx ) ); }
43 
45  bool empty() const override {
46  // because of the way we count "in flight" (+1 while waiting that we get something from the queue)
47  // and the way tbb:concurrent_bounded_queue reports the size while waiting for pop to return (-1)
48  // this is a correct definition of "empty" (nothing pending, nothing being processed and no results
49  // to be popped)
50  return !( m_inFlight + m_done.size() + m_incoming.size() );
51  }
52 
54  std::optional<ResultType> pop() override {
55  ResultType out;
56  if ( m_done.try_pop( out ) )
57  return out;
58  else
59  return std::nullopt;
60  }
61 
62  private:
63  tbb::concurrent_bounded_queue<EventContext> m_incoming;
64  tbb::concurrent_bounded_queue<ResultType> m_done;
65  std::atomic<std::size_t> m_inFlight{0};
66 
67  // our capacity is the input queue capacity + 1 (N events pending + 1 being processed)
69  this, "Capacity", m_incoming.capacity() + 1,
70  [this]( Gaudi::Details::PropertyBase& ) { m_incoming.set_capacity( m_queueCapacity - 1 ); }};
71 
72  std::tuple<StatusCode, EventContext> processEvent( EventContext&& context );
73 
75  };
76 } // namespace Gaudi::Examples
77 
79 
80 using namespace Gaudi::Examples;
81 
82 namespace {
85  class RetCodeGuard {
86  public:
87  inline RetCodeGuard( SmartIF<IProperty> appmgr, int retcode )
88  : m_appmgr( std::move( appmgr ) ), m_retcode( retcode ) {}
89  inline void ignore() { m_retcode = Gaudi::ReturnCode::Success; }
90  inline ~RetCodeGuard() {
91  if ( UNLIKELY( Gaudi::ReturnCode::Success != m_retcode ) ) {
92  Gaudi::setAppReturnCode( m_appmgr, m_retcode ).ignore();
93  }
94  }
95 
96  private:
97  SmartIF<IProperty> m_appmgr;
98  int m_retcode;
99  };
100 } // namespace
101 
103  auto ok = base_class::start();
104  if ( !ok ) return ok;
105 
106  info() << m_queueCapacity << endmsg;
107 
108  m_evtLoopThread = std::thread( [this]() {
109  while ( true ) {
111  ++m_inFlight; // yes, this is not accurate.
112  m_incoming.pop( ctx );
113  if ( LIKELY( ctx.valid() ) ) {
114  // the sleep is not strictly needed, but it should make the output more stable for the test
115  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
116  m_done.push( processEvent( std::move( ctx ) ) );
117  --m_inFlight; // yes, this is not accurate.
118  } else {
119  DEBMSG << "exiting event loop thread" << endmsg;
120  m_done.emplace( StatusCode::SUCCESS, std::move( ctx ) );
121  --m_inFlight; // yes, this is not accurate.
122  break;
123  }
124  }
125  } );
126 
127  return ok;
128 }
129 
131  DEBMSG << "processing " << context << endmsg;
132 
133  bool eventfailed = false;
134 
135  m_aess->reset( context );
137 
138  // select the appropriate store
139  if ( m_WB.isValid() ) m_WB->selectStore( context.slot() ).ignore();
140 
141  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
142  const auto appmgr = serviceLocator()->as<IProperty>();
143  // Call the execute() method of all top algorithms
144  for ( auto& ita : m_topAlgList ) {
146  try {
148  DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
150  sc.ignore();
151  break;
152  }
153  RetCodeGuard rcg( appmgr, Gaudi::ReturnCode::UnhandledException );
154  sc = ita->sysExecute( context );
155  rcg.ignore(); // disarm the guard
156  } catch ( const GaudiException& Exception ) {
157  fatal() << ".executeEvent(): Exception with tag=" << Exception.tag() << " thrown by " << ita->name() << endmsg;
158  error() << Exception << endmsg;
159  } catch ( const std::exception& Exception ) {
160  fatal() << ".executeEvent(): Standard std::exception thrown by " << ita->name() << endmsg;
161  error() << Exception.what() << endmsg;
162  } catch ( ... ) { fatal() << ".executeEvent(): UNKNOWN Exception thrown by " << ita->name() << endmsg; }
163 
164  if ( UNLIKELY( !sc.isSuccess() ) ) {
165  warning() << "Execution of algorithm " << ita->name() << " failed" << endmsg;
166  eventfailed = true;
167  }
168  }
169 
170  m_aess->updateEventStatus( eventfailed, context );
171 
172  // ensure that the abortEvent flag is cleared before the next event
174  DEBMSG << "AbortEvent incident fired by " << m_abortEventListener.abortEventSource << endmsg;
176  }
177 
178  // Call the execute() method of all output streams
179  for ( auto& ito : m_outStreamList ) {
180  AlgExecState& state = m_aess->algExecState( ito, context );
181  state.setFilterPassed( true );
182  StatusCode sc = ito->sysExecute( context );
183  if ( UNLIKELY( !sc.isSuccess() ) ) {
184  warning() << "Execution of output stream " << ito->name() << " failed" << endmsg;
185  eventfailed = true;
186  }
187  }
188 
190  // Check if there was an error processing current event
191  if ( UNLIKELY( eventfailed ) ) {
192  error() << "Error processing event loop." << endmsg;
193  std::ostringstream ost;
194  m_aess->dump( ost, context );
195  DEBMSG << "Dumping AlgExecStateSvc status:\n" << ost.str() << endmsg;
196  outcome = StatusCode::FAILURE;
197  }
198 
199  return {std::move( outcome ), std::move( context )};
200 }
201 
203  // Send an invalid context to stop the processing thread
204  push( EventContext{} );
206 
207  return base_class::stop();
208 }
Gaudi::ReturnCode::Success
constexpr int Success
Definition: AppReturnCode.h:26
Gaudi::Details::PropertyBase
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: PropertyBase.h:35
Gaudi::Hive::setCurrentContext
GAUDI_API void setCurrentContext(const EventContext *ctx)
Definition: ThreadLocalContext.cpp:41
std::this_thread::sleep_for
T sleep_for(T... args)
Gaudi::Examples::QueueingEventLoopMgr::pop
std::optional< ResultType > pop() override
Get the next available result, if any.
Definition: QueueingEventLoopMgr.cpp:54
std::exception
STL class.
AppReturnCode.h
std::move
T move(T... args)
MinimalEventLoopMgr::AbortEventListener::abortEventSource
std::string abortEventSource
Source of the AbortEvent incident.
Definition: MinimalEventLoopMgr.h:51
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:355
MinimalEventLoopMgr::AbortEventListener::abortEvent
bool abortEvent
Flag signalling that the event being processed has to be aborted (skip all following top algs).
Definition: MinimalEventLoopMgr.h:49
SmartIF::reset
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
Definition: SmartIF.h:96
GaudiException
Definition: GaudiException.h:31
IQueueingEventProcessor.h
gaudirun.retcode
retcode
Definition: gaudirun.py:620
Gaudi::Examples::QueueingEventLoopMgr::stop
StatusCode stop() override
Definition: QueueingEventLoopMgr.cpp:202
std::tuple
Gaudi::Examples::QueueingEventLoopMgr::m_evtLoopThread
std::thread m_evtLoopThread
Definition: QueueingEventLoopMgr.cpp:74
IOTest.start
start
Definition: IOTest.py:108
Gaudi::Units::ms
constexpr double ms
Definition: SystemOfUnits.h:154
IProperty
Definition: IProperty.h:33
Gaudi::Examples::QueueingEventLoopMgr::m_inFlight
std::atomic< std::size_t > m_inFlight
Definition: QueueingEventLoopMgr.cpp:65
MinimalEventLoopMgr::m_aess
SmartIF< IAlgExecStateSvc > m_aess
List of top level algorithms.
Definition: MinimalEventLoopMgr.h:75
SmartIF::isValid
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:72
GaudiPython.Pythonizations.ctx
ctx
Definition: Pythonizations.py:566
Gaudi::Examples
Definition: ConditionAccessorHolder.h:21
StatusCode
Definition: StatusCode.h:65
std::thread
STL class.
MinimalEventLoopMgr::m_outStreamList
ListAlg m_outStreamList
List of output streams.
Definition: MinimalEventLoopMgr.h:78
Gaudi::setAppReturnCode
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:59
LIKELY
#define LIKELY(x)
Definition: Kernel.h:105
Gaudi::Examples::QueueingEventLoopMgr::executeEvent
StatusCode executeEvent(EventContext &&ctx) override
Definition: QueueingEventLoopMgr.cpp:34
Gaudi::Examples::QueueingEventLoopMgr::push
void push(EventContext &&ctx) override
Definition: QueueingEventLoopMgr.cpp:42
DEBMSG
#define DEBMSG
Definition: QueueingEventLoopMgr.cpp:21
SmartIF< IProperty >
Gaudi::ReturnCode::UnhandledException
constexpr int UnhandledException
Definition: AppReturnCode.h:37
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::atomic< std::size_t >
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
Gaudi::Examples::QueueingEventLoopMgr::processEvent
std::tuple< StatusCode, EventContext > processEvent(EventContext &&context)
Definition: QueueingEventLoopMgr.cpp:130
MinimalEventLoopMgr::m_WB
SmartIF< IHiveWhiteBoard > m_WB
< Event data service (whiteboard)
Definition: MinimalEventLoopMgr.h:140
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:156
SmartIF::as
SmartIF< IFace > as() const
return a new SmartIF instance to another interface
Definition: SmartIF.h:117
std::ostringstream
STL class.
ThreadLocalContext.h
MinimalEventLoopMgr::m_topAlgList
ListAlg m_topAlgList
Definition: MinimalEventLoopMgr.h:76
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
Gaudi::Examples::QueueingEventLoopMgr::m_incoming
tbb::concurrent_bounded_queue< EventContext > m_incoming
Definition: QueueingEventLoopMgr.cpp:63
compareRootHistos.state
def state
Definition: compareRootHistos.py:468
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
Gaudi::Examples::QueueingEventLoopMgr::empty
bool empty() const override
Tell if the processor has events in the queues.
Definition: QueueingEventLoopMgr.cpp:45
MinimalEventLoopMgr.h
EventContext
Definition: EventContext.h:34
Gaudi::Examples::QueueingEventLoopMgr::start
StatusCode start() override
Definition: QueueingEventLoopMgr.cpp:102
std::ostringstream::str
T str(T... args)
MinimalEventLoopMgr::m_abortEventListener
AbortEventListener m_abortEventListener
Instance of the incident listener waiting for AbortEvent.
Definition: MinimalEventLoopMgr.h:84
Gaudi::Examples::QueueingEventLoopMgr::m_queueCapacity
Gaudi::Property< std::size_t > m_queueCapacity
Definition: QueueingEventLoopMgr.cpp:68
AlgExecState
Definition: IAlgExecStateSvc.h:37
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
UNLIKELY
#define UNLIKELY(x)
Definition: Kernel.h:106
Gaudi::Examples::QueueingEventLoopMgr
Definition: QueueingEventLoopMgr.cpp:26
Gaudi::Examples::QueueingEventLoopMgr::m_done
tbb::concurrent_bounded_queue< ResultType > m_done
Definition: QueueingEventLoopMgr.cpp:64
GAUDI_API
#define GAUDI_API
Definition: Kernel.h:81
Gaudi::Property< std::size_t >
std::thread::join
T join(T... args)
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335
PrepareBase.out
out
Definition: PrepareBase.py:20