Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  master (d98a2936)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
DataStreamTool.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
16 #include <GaudiKernel/Incident.h>
17 #include <GaudiKernel/MsgStream.h>
19 #include <GaudiKernel/Service.h>
20 #include <GaudiKernel/SmartIF.h>
21 
24  if ( !status.isSuccess() ) {
25  fatal() << "Error. Cannot initialize base class." << endmsg;
26  return status;
27  }
28 
29  // Get the references to the services that are needed by the ApplicationMgr itself
30  m_incidentSvc = serviceLocator()->service( "IncidentSvc" );
31  if ( !m_incidentSvc ) {
32  fatal() << "Error retrieving IncidentSvc." << endmsg;
33  return StatusCode::FAILURE;
34  }
35 
36  return StatusCode::SUCCESS;
37 }
38 
39 StatusCode DataStreamTool::addStream( const std::string& input ) {
40 
41  if ( getStream( input ) ) { warning() << "Input stream " << input << "already in use" << endmsg; }
42 
43  m_streamSpecs.push_back( input );
44 
45  auto strname = name() + '_' + std::to_string( ++m_streamCount );
46  EventSelectorDataStream* s = nullptr;
47 
48  StatusCode status = createStream( strname, input, s );
49 
50  if ( status.isSuccess() && s ) {
51  s->addRef();
52  m_streams.push_back( s );
53  status = StatusCode::SUCCESS;
54  } else {
55  if ( s ) {
56  s->release();
57  error() << "Error connecting/creating Stream: " << s << endmsg;
58  }
59  error() << "Error connecting/creating Stream: " << input << endmsg;
60  status = StatusCode::FAILURE;
61  }
62  return status;
63 }
64 
65 StatusCode DataStreamTool::addStreams( const StreamSpecs& inputs ) {
66 
68  for ( auto& i : inputs ) {
69  status = addStream( i );
70  if ( !status.isSuccess() ) break;
71  }
72  return status;
73 }
74 
76  clear().ignore();
78  return AlgTool::finalize();
79 }
80 
82  IEvtSelector* sel = nullptr;
83  StatusCode status = s->initialize();
84  if ( status.isSuccess() ) {
85  status = createSelector( s->name(), s->selectorType(), sel );
86  if ( status.isSuccess() ) {
87  SmartIF<IProperty> prop( sel ); // Att: IProperty, IService used to point to EventSelector
88  SmartIF<IService> isvc( sel );
89  s->setSelector( sel );
90  sel->release(); // No need for this interface anymore, it is passed to the stream
91  if ( prop && isvc ) {
92  for ( const auto& i : s->properties() ) prop->setProperty( i ).ignore();
93  prop->setProperty( Gaudi::Property<int>( "OutputLevel", msgLevel() ) ).ignore();
94  // FIXME: (MCl) Why do we have to initialize the selector here?
95  return isvc->sysInitialize();
96  }
97  }
98  }
99  return StatusCode::FAILURE;
100 }
101 
102 // Create (sub-) Event selector service
103 StatusCode DataStreamTool::createSelector( const std::string& nam, const std::string& typ, IEvtSelector*& sel ) {
104  auto isvc = make_SmartIF( Service::Factory::create( typ, nam, serviceLocator() ).release() );
105  if ( isvc ) {
106  auto isel = isvc.as<IEvtSelector>();
107  if ( isel ) {
108  sel = isel.get();
109  sel->addRef(); // make sure that sel is not left dangling once isel and isvc go out of scope...
110  return StatusCode::SUCCESS;
111  }
112  }
113  sel = nullptr;
114  error() << "Failed to create IEvtSelector " << typ << "/" << nam << endmsg;
115  return StatusCode::FAILURE;
116 }
117 
119  if ( s ) {
120  IEvtSelector* sel = s->selector();
121  if ( sel ) {
122  SmartIF<IService> isvc( sel );
123  if ( isvc ) {
124  isvc->finalize().ignore();
125  s->finalize().ignore();
126  // Fire EndStream "Incident"
127  m_incidentSvc->fireIncident( Incident( name(), IncidentType::EndStream ) );
128  return StatusCode::SUCCESS;
129  }
130  // Failed to get service interface of sub-event selector
131  return StatusCode::FAILURE;
132  }
133  // No selector (yet) attached - no need to finalize it!
134  return StatusCode::SUCCESS;
135  }
136  return StatusCode::FAILURE;
137 }
138 
139 StatusCode DataStreamTool::eraseStream( const std::string& info ) {
140 
141  auto i = getStreamIterator( info );
142  if ( i != m_streams.end() ) {
143  ( *i )->release();
144  m_streams.erase( i );
145  return StatusCode::SUCCESS;
146  }
147  return StatusCode::FAILURE;
148 }
149 
150 StatusCode DataStreamTool::createStream( const std::string& nam, const std::string& info,
152  stream = new EventSelectorDataStream( nam, info, serviceLocator() );
153  return StatusCode::SUCCESS;
154 }
155 
157  auto i = getStreamIterator( info );
158  return i != m_streams.end() ? *i : nullptr;
159 }
160 
161 DataStreamTool::Streams::iterator DataStreamTool::getStreamIterator( const std::string& info ) {
162  return std::find_if( std::begin( m_streams ), std::end( m_streams ),
163  [&]( const EventSelectorDataStream* i ) { return i->definition() == info; } );
164 }
165 
167  // pos has to point inside the vector
168  return ( ( pos >= 0 ) && ( (size_t)pos < m_streams.size() ) ) ? m_streams[pos] : nullptr;
169 }
170 
172 
174 
175  StatusCode iret, status = StatusCode::SUCCESS;
176  iret.ignore();
177 
178  // disconnect the streams
179  for ( auto& il : m_streamSpecs ) {
181  if ( s ) {
182  if ( s->isInitialized() ) {
183  iret = finalizeStream( s );
184  if ( !iret.isSuccess() ) {
185  error() << "Error finalizing Stream" << il << endmsg;
186  status = iret;
187  }
188  }
189  iret = eraseStream( il );
190  if ( !iret.isSuccess() ) {
191  error() << "Error diconnecting Stream" << il << endmsg;
192  status = iret;
193  }
194  }
195  }
196 
197  m_streamSpecs.clear();
198 
199  return status;
200 }
201 
203 
204  if ( !s ) return StatusCode::FAILURE;
205  s->addRef();
206  m_streams.push_back( s );
207  return StatusCode::SUCCESS;
208 }
209 
210 StatusCode DataStreamTool::connectStream( const std::string& info ) {
211  if ( getStream( info ) ) { warning() << "Input stream " << info << "already in use" << endmsg; }
212  auto nam = name() + '_' + std::to_string( ++m_streamCount );
213  EventSelectorDataStream* s = nullptr;
214  StatusCode status = createStream( nam, info, s );
215  if ( status.isSuccess() ) return connectStream( s );
216  s->release();
217  return status;
218 }
219 
220 /*
221 
222  Taking control over Streams and return them to EventSelector
223 
224 */
225 
227 
228  EventSelectorDataStream* nextStream = getStream( dsid );
229  if ( !nextStream ) return StatusCode::FAILURE; //<-end of streams reached
230 
231  esds = nextStream;
232  ++m_streamID;
233 
234  return StatusCode::SUCCESS;
235 }
236 
238 
239  EventSelectorDataStream* previousStream = getStream( dsid );
240  if ( !previousStream ) return StatusCode::FAILURE; //<-begin of streams reached
241 
242  esds = previousStream;
243  --m_streamID;
244 
245  return StatusCode::SUCCESS;
246 }
DataStreamTool::getNextStream
StatusCode getNextStream(const EventSelectorDataStream *&, size_type &) override
Definition: DataStreamTool.cpp:226
Write.stream
stream
Definition: Write.py:32
EventSelectorDataStream.h
DataStreamTool::clear
StatusCode clear() override
Definition: DataStreamTool.cpp:173
DataStreamTool::getStreamIterator
Streams::iterator getStreamIterator(const std::string &)
Definition: DataStreamTool.cpp:161
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
gaudirun.s
string s
Definition: gaudirun.py:346
SmartIF::reset
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
Definition: SmartIF.h:88
IAddressCreator.h
EventSelectorDataStream::definition
const std::string & definition() const
Retrieve definition string.
Definition: EventSelectorDataStream.h:94
IEvtSelector
Definition: IEvtSelector.h:26
DataStreamTool::initializeStream
StatusCode initializeStream(EventSelectorDataStream *) override
Initialize newly opened stream.
Definition: DataStreamTool.cpp:81
conf.release
string release
Definition: conf.py:27
DataStreamTool::createStream
StatusCode createStream(const std::string &, const std::string &, EventSelectorDataStream *&) override
Definition: DataStreamTool.cpp:150
AlgTool::initialize
StatusCode initialize() override
Definition: AlgTool.cpp:173
CommonMessaging< implements< IAlgTool, IDataHandleHolder, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:147
AlgTool::name
const std::string & name() const override
Retrieve full identifying name of the concrete tool object.
Definition: AlgTool.cpp:72
SmartIF.h
Gaudi::Utils::begin
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Definition: AttribStringParser.h:135
AlgTool::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: AlgTool.cpp:78
StatusCode
Definition: StatusCode.h:64
DataStreamTool::finalize
StatusCode finalize() override
Definition: DataStreamTool.cpp:75
DataStreamTool::finalizeStream
StatusCode finalizeStream(EventSelectorDataStream *) override
Finalize no longer needed stream.
Definition: DataStreamTool.cpp:118
DataStreamTool::createSelector
StatusCode createSelector(const std::string &, const std::string &, IEvtSelector *&) override
Definition: DataStreamTool.cpp:103
DataStreamTool::m_incidentSvc
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
Definition: DataStreamTool.h:94
DataStreamTool.h
DataStreamTool::connectStream
StatusCode connectStream(EventSelectorDataStream *)
Connect single stream by reference.
Definition: DataStreamTool.cpp:202
DataStreamTool::lastStream
EventSelectorDataStream * lastStream() override
Definition: DataStreamTool.cpp:171
SmartIF< IProperty >
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:198
DataStreamTool::addStream
StatusCode addStream(const std::string &) override
Definition: DataStreamTool.cpp:39
IOTest.sel
sel
Definition: IOTest.py:106
DataStreamTool::initialize
StatusCode initialize() override
Definition: DataStreamTool.cpp:22
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
Service.h
DataStreamTool::getStream
EventSelectorDataStream * getStream(const std::string &) override
Retrieve stream by name.
Definition: DataStreamTool.cpp:156
make_SmartIF
SmartIF< IFace > make_SmartIF(IFace *iface)
Definition: SmartIF.h:143
DataStreamTool::eraseStream
StatusCode eraseStream(const std::string &) override
Definition: DataStreamTool.cpp:139
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:99
DataStreamTool::addStreams
StatusCode addStreams(const StreamSpecs &) override
Definition: DataStreamTool.cpp:65
DataStreamTool::m_streamID
size_type m_streamID
Definition: DataStreamTool.h:85
EventSelectorDataStream
Definition of class EventSelectorDataStream.
Definition: EventSelectorDataStream.h:39
DataStreamTool::getPreviousStream
StatusCode getPreviousStream(const EventSelectorDataStream *&, size_type &) override
Definition: DataStreamTool.cpp:237
DataStreamTool::m_streamCount
size_type m_streamCount
Definition: DataStreamTool.h:87
IOTest.end
end
Definition: IOTest.py:125
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:100
ISvcLocator.h
DataStreamTool::m_streamSpecs
StreamSpecs m_streamSpecs
Definition: DataStreamTool.h:91
Incident.h
Incident
Definition: Incident.h:24
PropertyHolder.h
DataStreamTool::m_streams
Streams m_streams
Definition: DataStreamTool.h:89
Gaudi::Property< int >
ISvcManager.h
MsgStream.h
AlgTool::finalize
StatusCode finalize() override
Definition: AlgTool.cpp:225