The Gaudi Framework  master (37c0b60a)
QueueingApplication.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
31 #include <Gaudi/Application.h>
33 #include <Gaudi/Property.h>
35 #include <GaudiKernel/IAppMgrUI.h>
37 #include <GaudiKernel/IProperty.h>
38 #include <GaudiKernel/IStateful.h>
40 #include <GaudiKernel/SmartIF.h>
41 #include <queue>
42 #include <thread>
43 
44 using namespace std::literals::chrono_literals;
46 
47 namespace Gaudi::TestSuite {
49  // - nothing special to add to the base constructor (which implements the default configuration)
50  using Application::Application;
51 
52  // this method is used to implement the main application logic (prepare to run, loop over events, terminate)
53  int run() override {
54  // - parameters for the job
55  const std::size_t n_of_batches = 2;
56  const std::size_t evts_in_batch = 5;
57 
58  // - get ready to process events
59  app->initialize().ignore();
60 
61  // - this MsgStream is useful to have uniform printout
62  MsgStream log( app.as<IMessageSvc>(), "<main>" );
63 
64  // we print the parameters here so that they appear close to the Capacity in the log
65  log << MSG::INFO << " n_of_batches: " << n_of_batches << endmsg;
66  log << MSG::INFO << " evts_in_batch: " << evts_in_batch << endmsg;
67 
68  app->start().ignore(); // this starts the QueueingEventLoopMgr processing thread
69 
70  // - main processing loop
71  {
72  // - get the IQueueingEventProcessor interface of the application
74 
75  // - processing state informations
76  // - events ready to be processed
78  // - count of events enqueued
79  std::size_t evt_count = 0;
80 
81  // - loop over input batches
82  for ( std::size_t batch = 1; batch <= n_of_batches; ++batch ) {
83  // - prepare the batch
84  log << MSG::INFO << "prepare batch of events n. " << batch << endmsg;
85  if ( batch == 2 ) {
86  log << MSG::INFO << " (pretend we need time so that the processing thread drains the input queue)"
87  << endmsg;
88  sleep_for( 4s );
89  log << MSG::INFO << " (all events in the queue should have been processed by now)" << endmsg;
90  }
91  for ( std::size_t i = 0; i < evts_in_batch; ++i ) {
92  // - create a new EventContext for each event in the batch
93  auto ctx = qep->createEventContext();
94  // ... here you can do something with the context ... like setting I/O related stuff
95  // - declare the event as ready to be enqueued
96  ready.push( std::move( ctx ) );
97  }
98 
99  // - once the batch is ready we can push all events, relying on the push to block if needed
100  log << MSG::INFO << "looping over the batch" << endmsg;
101  while ( !ready.empty() ) {
102  ++evt_count;
103  log << MSG::INFO << "- pushing event " << evt_count << " (" << ready.front() << ")..." << endmsg;
104  qep->push( std::move( ready.front() ) ); // this blocks if the system is full
105  ready.pop();
106  log << MSG::INFO << " ... event " << evt_count << " pushed" << endmsg;
107 
108  // - for each push we try a pop, to avoid that the output queue gets too big
109  log << MSG::INFO << "- checking for completed events" << endmsg;
110  if ( auto result = qep->pop() ) { // this never blocks, but evaulates to false if there was nothing to pop
111  // - if an event completed, we can do something (e.g. I/O)
112  auto&& [sc, ctx] = std::move( *result );
113  log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
114  }
115  }
116  }
117  log << MSG::INFO << "no more inputs: let's drain the output queue" << endmsg;
118  while ( !qep->empty() ) {
119  if ( auto result = qep->pop() ) {
120  auto&& [sc, ctx] = std::move( *result );
121  log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
122  } else {
123  sleep_for( 10ms );
124  }
125  }
126 
127  // - nothing else to do on the events
128  log << MSG::INFO << "all done" << endmsg;
129  }
130 
131  // - terminate the application
132  app->stop().ignore(); // this stops the QueueingEventLoopMgr processing thread
133  app->finalize().ignore();
134 
135  // - get and propagate the return code the ApplicationMgr whishes to expose
136  return getAppReturnCode( app.as<IProperty>() );
137  }
138  }; // namespace Gaudi::TestSuite
139 
140  DECLARE_COMPONENT( QueueingApplication )
141 } // namespace Gaudi::TestSuite
std::this_thread::sleep_for
T sleep_for(T... args)
IMessageSvc
Definition: IMessageSvc.h:47
Read.app
app
Definition: Read.py:36
AppReturnCode.h
std::move
T move(T... args)
Gaudi.Application
Gaudi application entry point.
Definition: __init__.py:87
MSG::INFO
@ INFO
Definition: IMessageSvc.h:25
gaudirun.s
string s
Definition: gaudirun.py:346
Gaudi::getAppReturnCode
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:79
IQueueingEventProcessor.h
std::queue
STL class.
IAppMgrUI.h
std::queue::front
T front(T... args)
IMessageSvc.h
Gaudi::Units::ms
constexpr double ms
Definition: SystemOfUnits.h:154
IProperty
Definition: IProperty.h:33
SmartIF.h
GaudiPython.Pythonizations.ctx
ctx
Definition: Pythonizations.py:578
std::log
T log(T... args)
std::queue::pop
T pop(T... args)
IStateful.h
Application.h
SmartIF
Definition: IConverter.h:25
Gaudi::TestSuite
Definition: ConditionAccessorHolder.h:21
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
MsgStream
Definition: MsgStream.h:33
Gaudi::TestSuite::QueueingApplication::run
int run() override
Implement the application main logic:
Definition: QueueingApplication.cpp:53
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
std::queue::empty
T empty(T... args)
std::queue::push
T push(T... args)
std::size_t
IProperty.h
ISvcLocator.h
Property.h
Gaudi::TestSuite::QueueingApplication
Definition: QueueingApplication.cpp:48