The Gaudi Framework
master (37c0b60a)
QueueingApplication.cpp
Go to the documentation of this file.
1
/***********************************************************************************\
2
* (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3
* *
4
* This software is distributed under the terms of the Apache version 2 licence, *
5
* copied verbatim in the file "LICENSE". *
6
* *
7
* In applying this licence, CERN does not waive the privileges and immunities *
8
* granted to it by virtue of its status as an Intergovernmental Organization *
9
* or submit itself to any jurisdiction. *
10
\***********************************************************************************/
31
#include <
Gaudi/Application.h
>
32
#include <
Gaudi/Interfaces/IQueueingEventProcessor.h
>
33
#include <
Gaudi/Property.h
>
34
#include <
GaudiKernel/AppReturnCode.h
>
35
#include <
GaudiKernel/IAppMgrUI.h
>
36
#include <
GaudiKernel/IMessageSvc.h
>
37
#include <
GaudiKernel/IProperty.h
>
38
#include <
GaudiKernel/IStateful.h
>
39
#include <
GaudiKernel/ISvcLocator.h
>
40
#include <
GaudiKernel/SmartIF.h
>
41
#include <queue>
42
#include <thread>
43
44
using namespace
std::literals::chrono_literals;
45
using
std::this_thread::sleep_for
;
46
47
namespace
Gaudi::TestSuite
{
48
class
QueueingApplication
:
public
Gaudi::Application
{
49
// - nothing special to add to the base constructor (which implements the default configuration)
50
using
Application::Application;
51
52
// this method is used to implement the main application logic (prepare to run, loop over events, terminate)
53
int
run
()
override
{
54
// - parameters for the job
55
const
std::size_t
n_of_batches = 2;
56
const
std::size_t
evts_in_batch = 5;
57
58
// - get ready to process events
59
app
->initialize().ignore();
60
61
// - this MsgStream is useful to have uniform printout
62
MsgStream
log
(
app
.as<
IMessageSvc
>(),
"<main>"
);
63
64
// we print the parameters here so that they appear close to the Capacity in the log
65
log
<<
MSG::INFO
<<
" n_of_batches: "
<< n_of_batches <<
endmsg
;
66
log
<<
MSG::INFO
<<
" evts_in_batch: "
<< evts_in_batch <<
endmsg
;
67
68
app
->start().ignore();
// this starts the QueueingEventLoopMgr processing thread
69
70
// - main processing loop
71
{
72
// - get the IQueueingEventProcessor interface of the application
73
SmartIF<Gaudi::Interfaces::IQueueingEventProcessor>
qep{
app
};
74
75
// - processing state informations
76
// - events ready to be processed
77
std::queue<EventContext>
ready;
78
// - count of events enqueued
79
std::size_t
evt_count = 0;
80
81
// - loop over input batches
82
for
(
std::size_t
batch = 1; batch <= n_of_batches; ++batch ) {
83
// - prepare the batch
84
log
<<
MSG::INFO
<<
"prepare batch of events n. "
<< batch <<
endmsg
;
85
if
( batch == 2 ) {
86
log
<<
MSG::INFO
<<
" (pretend we need time so that the processing thread drains the input queue)"
87
<<
endmsg
;
88
sleep_for( 4
s
);
89
log
<<
MSG::INFO
<<
" (all events in the queue should have been processed by now)"
<<
endmsg
;
90
}
91
for
(
std::size_t
i = 0; i < evts_in_batch; ++i ) {
92
// - create a new EventContext for each event in the batch
93
auto
ctx
= qep->createEventContext();
94
// ... here you can do something with the context ... like setting I/O related stuff
95
// - declare the event as ready to be enqueued
96
ready.
push
(
std::move
(
ctx
) );
97
}
98
99
// - once the batch is ready we can push all events, relying on the push to block if needed
100
log
<<
MSG::INFO
<<
"looping over the batch"
<<
endmsg
;
101
while
( !ready.
empty
() ) {
102
++evt_count;
103
log
<<
MSG::INFO
<<
"- pushing event "
<< evt_count <<
" ("
<< ready.
front
() <<
")..."
<<
endmsg
;
104
qep->push(
std::move
( ready.
front
() ) );
// this blocks if the system is full
105
ready.
pop
();
106
log
<<
MSG::INFO
<<
" ... event "
<< evt_count <<
" pushed"
<<
endmsg
;
107
108
// - for each push we try a pop, to avoid that the output queue gets too big
109
log
<<
MSG::INFO
<<
"- checking for completed events"
<<
endmsg
;
110
if
(
auto
result = qep->pop() ) {
// this never blocks, but evaulates to false if there was nothing to pop
111
// - if an event completed, we can do something (e.g. I/O)
112
auto
&& [sc,
ctx
] =
std::move
( *result );
113
log
<<
MSG::INFO
<<
" "
<<
ctx
<<
" -> "
<< sc <<
endmsg
;
114
}
115
}
116
}
117
log
<<
MSG::INFO
<<
"no more inputs: let's drain the output queue"
<<
endmsg
;
118
while
( !qep->empty() ) {
119
if
(
auto
result = qep->pop() ) {
120
auto
&& [sc,
ctx
] =
std::move
( *result );
121
log
<<
MSG::INFO
<<
" "
<<
ctx
<<
" -> "
<< sc <<
endmsg
;
122
}
else
{
123
sleep_for( 10
ms
);
124
}
125
}
126
127
// - nothing else to do on the events
128
log
<<
MSG::INFO
<<
"all done"
<<
endmsg
;
129
}
130
131
// - terminate the application
132
app
->stop().ignore();
// this stops the QueueingEventLoopMgr processing thread
133
app
->finalize().ignore();
134
135
// - get and propagate the return code the ApplicationMgr whishes to expose
136
return
getAppReturnCode
(
app
.as<
IProperty
>() );
137
}
138
};
// namespace Gaudi::TestSuite
139
140
DECLARE_COMPONENT
( QueueingApplication )
141
}
// namespace Gaudi::TestSuite
std::this_thread::sleep_for
T sleep_for(T... args)
IMessageSvc
Definition:
IMessageSvc.h:47
Read.app
app
Definition:
Read.py:36
AppReturnCode.h
std::move
T move(T... args)
Gaudi.Application
Gaudi application entry point.
Definition:
__init__.py:87
MSG::INFO
@ INFO
Definition:
IMessageSvc.h:25
gaudirun.s
string s
Definition:
gaudirun.py:346
Gaudi::getAppReturnCode
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition:
AppReturnCode.h:79
IQueueingEventProcessor.h
std::queue
STL class.
IAppMgrUI.h
std::queue::front
T front(T... args)
IMessageSvc.h
Gaudi::Units::ms
constexpr double ms
Definition:
SystemOfUnits.h:154
IProperty
Definition:
IProperty.h:33
SmartIF.h
GaudiPython.Pythonizations.ctx
ctx
Definition:
Pythonizations.py:578
std::log
T log(T... args)
std::queue::pop
T pop(T... args)
IStateful.h
Application.h
SmartIF
Definition:
IConverter.h:25
Gaudi::TestSuite
Definition:
ConditionAccessorHolder.h:21
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition:
MsgStream.h:202
MsgStream
Definition:
MsgStream.h:33
Gaudi::TestSuite::QueueingApplication::run
int run() override
Implement the application main logic:
Definition:
QueueingApplication.cpp:53
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition:
PluginServiceV1.h:46
std::queue::empty
T empty(T... args)
std::queue::push
T push(T... args)
std::size_t
IProperty.h
ISvcLocator.h
Property.h
Gaudi::TestSuite::QueueingApplication
Definition:
QueueingApplication.cpp:48
GaudiTestSuite
src
QueueingEventProcessor
QueueingApplication.cpp
Generated on Thu Dec 19 2024 15:35:07 for The Gaudi Framework by
1.8.18