The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
QueueingApplication.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
30
31#include <Gaudi/Application.h>
33#include <Gaudi/Property.h>
40#include <GaudiKernel/SmartIF.h>
41#include <queue>
42#include <thread>
43
44using namespace std::literals::chrono_literals;
45using std::this_thread::sleep_for;
46
47namespace Gaudi::TestSuite {
49 // - nothing special to add to the base constructor (which implements the default configuration)
51
52 // this method is used to implement the main application logic (prepare to run, loop over events, terminate)
53 int run() override {
54 // - parameters for the job
55 const std::size_t n_of_batches = 2;
56 const std::size_t evts_in_batch = 5;
57
58 // - get ready to process events
59 app->initialize().ignore();
60
61 // - this MsgStream is useful to have uniform printout
62 MsgStream log( app.as<IMessageSvc>(), "<main>" );
63
64 // we print the parameters here so that they appear close to the Capacity in the log
65 log << MSG::INFO << " n_of_batches: " << n_of_batches << endmsg;
66 log << MSG::INFO << " evts_in_batch: " << evts_in_batch << endmsg;
67
68 app->start().ignore(); // this starts the QueueingEventLoopMgr processing thread
69
70 // - main processing loop
71 {
72 // - get the IQueueingEventProcessor interface of the application
74
75 // - processing state informations
76 // - events ready to be processed
77 std::queue<EventContext> ready;
78 // - count of events enqueued
79 std::size_t evt_count = 0;
80
81 // - loop over input batches
82 for ( std::size_t batch = 1; batch <= n_of_batches; ++batch ) {
83 // - prepare the batch
84 log << MSG::INFO << "prepare batch of events n. " << batch << endmsg;
85 if ( batch == 2 ) {
86 log << MSG::INFO << " (pretend we need time so that the processing thread drains the input queue)"
87 << endmsg;
88 sleep_for( 4s );
89 log << MSG::INFO << " (all events in the queue should have been processed by now)" << endmsg;
90 }
91 for ( std::size_t i = 0; i < evts_in_batch; ++i ) {
92 // - create a new EventContext for each event in the batch
93 auto ctx = qep->createEventContext();
94 // ... here you can do something with the context ... like setting I/O related stuff
95 // - declare the event as ready to be enqueued
96 ready.push( std::move( ctx ) );
97 }
98
99 // - once the batch is ready we can push all events, relying on the push to block if needed
100 log << MSG::INFO << "looping over the batch" << endmsg;
101 while ( !ready.empty() ) {
102 ++evt_count;
103 log << MSG::INFO << "- pushing event " << evt_count << " (" << ready.front() << ")..." << endmsg;
104 qep->push( std::move( ready.front() ) ); // this blocks if the system is full
105 ready.pop();
106 log << MSG::INFO << " ... event " << evt_count << " pushed" << endmsg;
107
108 // - for each push we try a pop, to avoid that the output queue gets too big
109 log << MSG::INFO << "- checking for completed events" << endmsg;
110 if ( auto result = qep->pop() ) { // this never blocks, but evaulates to false if there was nothing to pop
111 // - if an event completed, we can do something (e.g. I/O)
112 auto&& [sc, ctx] = std::move( *result );
113 log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
114 }
115 }
116 }
117 log << MSG::INFO << "no more inputs: let's drain the output queue" << endmsg;
118 while ( !qep->empty() ) {
119 if ( auto result = qep->pop() ) {
120 auto&& [sc, ctx] = std::move( *result );
121 log << MSG::INFO << " " << ctx << " -> " << sc << endmsg;
122 } else {
123 sleep_for( 10ms );
124 }
125 }
126
127 // - nothing else to do on the events
128 log << MSG::INFO << "all done" << endmsg;
129 }
130
131 // - terminate the application
132 app->stop().ignore(); // this stops the QueueingEventLoopMgr processing thread
133 app->finalize().ignore();
134
135 // - get and propagate the return code the ApplicationMgr whishes to expose
136 return getAppReturnCode( app.as<IProperty>() );
137 }
138 }; // namespace Gaudi::TestSuite
139
140 DECLARE_COMPONENT( QueueingApplication )
141} // namespace Gaudi::TestSuite
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define DECLARE_COMPONENT(type)
Gaudi application entry point.
Definition Application.h:27
Application(Options opts)
Construct and configure the application from the provided options.
SmartIF< IStateful > app
Handle to the ApplicationMgr instance.
Definition Application.h:50
The IMessage is the interface implemented by the message service.
Definition IMessageSvc.h:34
The IProperty is the basic interface for all components which have properties that can be set or get.
Definition IProperty.h:32
Definition of the MsgStream class used to transmit messages.
Definition MsgStream.h:29
Small smart pointer class with automatic reference counting for IInterface.
Definition SmartIF.h:28
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
@ INFO
Definition IMessageSvc.h:22