Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v38r0 (2143aa4c)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
main.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
31 #include <Gaudi/Application.h>
35 #include <GaudiKernel/IProperty.h>
36 #include <queue>
37 #include <thread>
38 
39 using namespace std::literals::chrono_literals;
41 
42 int main() {
43  { // useless scope, just to get the same indentation as QueueingApplication.cpp (most of the code is identical)
44 
46  { "ApplicationMgr.JobOptionsType", "\"NONE\"" },
47  { "ApplicationMgr.EventLoop", "\"Gaudi::Examples::QueueingEventLoopMgr/QueueingEventLoopMgr\"" },
48  { "ApplicationMgr.OutputLevel", "3" },
49  { "ApplicationMgr.TopAlg", "['GaudiTesting::SleepyAlg/Alg1']" },
50  { "Alg1.SleepTime", "1" },
51  { "QueueingEventLoopMgr.OutputLevel", "2" },
52  { "QueueingEventLoopMgr.Capacity", "3" } };
53 
54  auto app = Gaudi::Application( std::move( opts ) );
55 
56  return app.run( []( SmartIF<IStateful>& app ) -> int {
57  // - parameters for the job
58  const std::size_t n_of_batches = 2;
59  const std::size_t evts_in_batch = 5;
60 
61  // - get ready to process events
62  app->initialize().ignore();
63 
64  // - this MsgStream is useful to have uniform printout
65  MsgStream log( app.as<IMessageSvc>(), "<main>" );
66 
67  // we print the parameters here so that they appear close to the Capacity in the log
68  log << MSG::INFO << " n_of_batches: " << n_of_batches << endmsg;
69  log << MSG::INFO << " evts_in_batch: " << evts_in_batch << endmsg;
70 
71  app->start().ignore(); // this starts the QueueingEventLoopMgr processing thread
72 
73  // - main processing loop
74  {
75  // - get the IQueueingEventProcessor interface of the application
77 
78  // - processing state informations
79  // - events ready to be processed
81  // - count of events enqueued
82  std::size_t evt_count = 0;
83 
84  // - loop over input batches
85  for ( std::size_t batch = 1; batch <= n_of_batches; ++batch ) {
86  // - prepare the batch
87  log << MSG::INFO << "prepare batch of events n. " << batch << endmsg;
88  if ( batch == 2 ) {
89  log << MSG::INFO << " (pretend we need time so that the processing thread drains the input queue)"
90  << endmsg;
91  sleep_for( 4s );
92  log << MSG::INFO << " (all events in the queue should have been processed by now)" << endmsg;
93  }
94  for ( std::size_t i = 0; i < evts_in_batch; ++i ) {
95  // - create a new EventContext for each event in the batch
96  auto ctx = qep->createEventContext();
97  // ... here you can do something with the context ... like setting I/O related stuff
98  // - declare the event as ready to be enqueued
99  ready.push( std::move( ctx ) );
100  }
101 
102  // - once the batch is ready we can push all events, relying on the push to block if needed
103  log << MSG::INFO << "looping over the batch" << endmsg;
104  while ( !ready.empty() ) {
105  ++evt_count;
106  log << MSG::INFO << "- pushing event " << evt_count << " (" << ready.front() << ")..." << endmsg;
107  qep->push( std::move( ready.front() ) ); // this blocks if the system is full
108  ready.pop();
109  log << MSG::INFO << " ... event " << evt_count << " pushed" << endmsg;
110 
111  // - for each push we try a pop, to avoid that the output queue gets too big
112  log << MSG::INFO << "- checking for completed events" << endmsg;
113  if ( auto result = qep->pop() ) { // this never blocks, but evaulates to false if there was nothing to pop
114  // - if an event completed, we can do something (e.g. I/O)
115  auto&& [sc, ctx] = std::move( *result );
116  log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
117  }
118  }
119  }
120  log << MSG::INFO << "no more inputs: let's drain the output queue" << endmsg;
121  while ( !qep->empty() ) {
122  if ( auto result = qep->pop() ) {
123  auto&& [sc, ctx] = std::move( *result );
124  log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
125  } else {
126  sleep_for( 10ms );
127  }
128  }
129 
130  // - nothing else to do on the events
131  log << MSG::INFO << "all done" << endmsg;
132  }
133 
134  // - terminate the application
135  app->stop().ignore(); // this stops the QueueingEventLoopMgr processing thread
136  app->finalize().ignore();
137 
138  // - get and propagate the return code the ApplicationMgr whishes to expose
139  return Gaudi::getAppReturnCode( app.as<IProperty>() );
140  } );
141  }
142 }
std::this_thread::sleep_for
T sleep_for(T... args)
IMessageSvc
Definition: IMessageSvc.h:47
Read.app
app
Definition: Read.py:36
AppReturnCode.h
std::move
T move(T... args)
Gaudi.Application
Gaudi application entry point.
Definition: __init__.py:87
MSG::INFO
@ INFO
Definition: IMessageSvc.h:25
gaudirun.s
string s
Definition: gaudirun.py:346
Gaudi::getAppReturnCode
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:79
IQueueingEventProcessor.h
std::queue
STL class.
std::queue::front
T front(T... args)
IMessageSvc.h
Gaudi::Units::ms
constexpr double ms
Definition: SystemOfUnits.h:154
IProperty
Definition: IProperty.h:33
GaudiPython.Pythonizations.ctx
ctx
Definition: Pythonizations.py:578
gaudirun.opts
opts
Definition: gaudirun.py:336
std::log
T log(T... args)
std::queue::pop
T pop(T... args)
Application.h
SmartIF< IStateful >
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map< std::string, std::string >
MsgStream
Definition: MsgStream.h:34
std::queue::empty
T empty(T... args)
std::queue::push
T push(T... args)
std::size_t
IProperty.h
main
int main(int argc, char *argv[])
Definition: main.cpp:14