The Gaudi Framework  master (37c0b60a)
DataStreamTool.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 // Include files
12 
13 // from Gaudi
19 #include <GaudiKernel/Incident.h>
20 #include <GaudiKernel/MsgStream.h>
22 #include <GaudiKernel/Service.h>
23 #include <GaudiKernel/SmartIF.h>
24 
25 //-----------------------------------------------------------------------------
26 // Implementation file for class : DataStreamTool
27 //
28 // 2006-09-21 : Andres Felipe Osorio Oliveros
29 //-----------------------------------------------------------------------------
30 
31 //=============================================================================
33 
35  if ( !status.isSuccess() ) {
36  fatal() << "Error. Cannot initialize base class." << endmsg;
37  return status;
38  }
39 
40  // Get the references to the services that are needed by the ApplicationMgr itself
41  m_incidentSvc = serviceLocator()->service( "IncidentSvc" );
42  if ( !m_incidentSvc ) {
43  fatal() << "Error retrieving IncidentSvc." << endmsg;
44  return StatusCode::FAILURE;
45  }
46 
47  return StatusCode::SUCCESS;
48 }
49 
51 
52  if ( getStream( input ) ) { warning() << "Input stream " << input << "already in use" << endmsg; }
53 
54  m_streamSpecs.push_back( input );
55 
56  auto strname = name() + '_' + std::to_string( ++m_streamCount );
57  EventSelectorDataStream* s = nullptr;
58 
59  StatusCode status = createStream( strname, input, s );
60 
61  if ( status.isSuccess() && s ) {
62  s->addRef();
64  status = StatusCode::SUCCESS;
65  } else {
66  if ( s ) {
67  s->release();
68  error() << "Error connecting/creating Stream: " << s << endmsg;
69  }
70  error() << "Error connecting/creating Stream: " << input << endmsg;
71  status = StatusCode::FAILURE;
72  }
73  return status;
74 }
75 
76 StatusCode DataStreamTool::addStreams( const StreamSpecs& inputs ) {
77 
79  for ( auto& i : inputs ) {
80  status = addStream( i );
81  if ( !status.isSuccess() ) break;
82  }
83  return status;
84 }
85 
87  clear().ignore();
89  return AlgTool::finalize();
90 }
91 
93  IEvtSelector* sel = nullptr;
94  StatusCode status = s->initialize();
95  if ( status.isSuccess() ) {
96  status = createSelector( s->name(), s->selectorType(), sel );
97  if ( status.isSuccess() ) {
98  SmartIF<IProperty> prop( sel ); // Att: IProperty, IService used to point to EventSelector
99  SmartIF<IService> isvc( sel );
100  s->setSelector( sel );
101  sel->release(); // No need for this interface anymore, it is passed to the stream
102  if ( prop && isvc ) {
103  for ( const auto& i : s->properties() ) prop->setProperty( i ).ignore();
104  prop->setProperty( Gaudi::Property<int>( "OutputLevel", msgLevel() ) ).ignore();
105  // FIXME: (MCl) Why do we have to initialize the selector here?
106  return isvc->sysInitialize();
107  }
108  }
109  }
110  return StatusCode::FAILURE;
111 }
112 
113 // Create (sub-) Event selector service
115  auto isvc = make_SmartIF( Service::Factory::create( typ, nam, serviceLocator() ).release() );
116  if ( isvc ) {
117  auto isel = isvc.as<IEvtSelector>();
118  if ( isel ) {
119  sel = isel.get();
120  sel->addRef(); // make sure that sel is not left dangling once isel and isvc go out of scope...
121  return StatusCode::SUCCESS;
122  }
123  }
124  sel = nullptr;
125  error() << "Failed to create IEvtSelector " << typ << "/" << nam << endmsg;
126  return StatusCode::FAILURE;
127 }
128 
130  if ( s ) {
131  IEvtSelector* sel = s->selector();
132  if ( sel ) {
133  SmartIF<IService> isvc( sel );
134  if ( isvc ) {
135  isvc->finalize().ignore();
136  s->finalize().ignore();
137  // Fire EndStream "Incident"
138  m_incidentSvc->fireIncident( Incident( name(), IncidentType::EndStream ) );
139  return StatusCode::SUCCESS;
140  }
141  // Failed to get service interface of sub-event selector
142  return StatusCode::FAILURE;
143  }
144  // No selector (yet) attached - no need to finalize it!
145  return StatusCode::SUCCESS;
146  }
147  return StatusCode::FAILURE;
148 }
149 
151 
152  auto i = getStreamIterator( info );
153  if ( i != m_streams.end() ) {
154  ( *i )->release();
155  m_streams.erase( i );
156  return StatusCode::SUCCESS;
157  }
158  return StatusCode::FAILURE;
159 }
160 
163  stream = new EventSelectorDataStream( nam, info, serviceLocator() );
164  return StatusCode::SUCCESS;
165 }
166 
168  auto i = getStreamIterator( info );
169  return i != m_streams.end() ? *i : nullptr;
170 }
171 
172 DataStreamTool::Streams::iterator DataStreamTool::getStreamIterator( const std::string& info ) {
174  [&]( const EventSelectorDataStream* i ) { return i->definition() == info; } );
175 }
176 
178  // pos has to point inside the vector
179  return ( ( pos >= 0 ) && ( (size_t)pos < m_streams.size() ) ) ? m_streams[pos] : nullptr;
180 }
181 
183 
185 
186  StatusCode iret, status = StatusCode::SUCCESS;
187  iret.ignore();
188 
189  // disconnect the streams
190  for ( auto& il : m_streamSpecs ) {
192  if ( s ) {
193  if ( s->isInitialized() ) {
194  iret = finalizeStream( s );
195  if ( !iret.isSuccess() ) {
196  error() << "Error finalizing Stream" << il << endmsg;
197  status = iret;
198  }
199  }
200  iret = eraseStream( il );
201  if ( !iret.isSuccess() ) {
202  error() << "Error diconnecting Stream" << il << endmsg;
203  status = iret;
204  }
205  }
206  }
207 
208  m_streamSpecs.clear();
209 
210  return status;
211 }
212 
214 
215  if ( !s ) return StatusCode::FAILURE;
216  s->addRef();
217  m_streams.push_back( s );
218  return StatusCode::SUCCESS;
219 }
220 
222  if ( getStream( info ) ) { warning() << "Input stream " << info << "already in use" << endmsg; }
223  auto nam = name() + '_' + std::to_string( ++m_streamCount );
224  EventSelectorDataStream* s = nullptr;
225  StatusCode status = createStream( nam, info, s );
226  if ( status.isSuccess() ) return connectStream( s );
227  s->release();
228  return status;
229 }
230 
231 /*
232 
233  Taking control over Streams and return them to EventSelector
234 
235 */
236 
238 
239  EventSelectorDataStream* nextStream = getStream( dsid );
240  if ( !nextStream ) return StatusCode::FAILURE; //<-end of streams reached
241 
242  esds = nextStream;
243  ++m_streamID;
244 
245  return StatusCode::SUCCESS;
246 }
247 
249 
250  EventSelectorDataStream* previousStream = getStream( dsid );
251  if ( !previousStream ) return StatusCode::FAILURE; //<-begin of streams reached
252 
253  esds = previousStream;
254  --m_streamID;
255 
256  return StatusCode::SUCCESS;
257 }
DataStreamTool::getNextStream
StatusCode getNextStream(const EventSelectorDataStream *&, size_type &) override
Definition: DataStreamTool.cpp:237
Write.stream
stream
Definition: Write.py:32
std::string
STL class.
EventSelectorDataStream.h
DataStreamTool::clear
StatusCode clear() override
Definition: DataStreamTool.cpp:184
DataStreamTool::getStreamIterator
Streams::iterator getStreamIterator(const std::string &)
Definition: DataStreamTool.cpp:172
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
gaudirun.s
string s
Definition: gaudirun.py:346
SmartIF::reset
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
Definition: SmartIF.h:96
std::find_if
T find_if(T... args)
std::vector::size
T size(T... args)
IAddressCreator.h
EventSelectorDataStream::definition
const std::string & definition() const
Retrieve definition string.
Definition: EventSelectorDataStream.h:110
IEvtSelector
Definition: IEvtSelector.h:28
DataStreamTool::initializeStream
StatusCode initializeStream(EventSelectorDataStream *) override
Initialize newly opened stream.
Definition: DataStreamTool.cpp:92
conf.release
string release
Definition: conf.py:27
std::vector::back
T back(T... args)
DataStreamTool::createStream
StatusCode createStream(const std::string &, const std::string &, EventSelectorDataStream *&) override
Definition: DataStreamTool.cpp:161
AlgTool::initialize
StatusCode initialize() override
Definition: AlgTool.cpp:199
CommonMessaging< implements< IAlgTool, IDataHandleHolder, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:148
AlgTool::name
const std::string & name() const override
Retrieve full identifying name of the concrete tool object.
Definition: AlgTool.cpp:67
std::vector::push_back
T push_back(T... args)
SmartIF.h
AlgTool::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: AlgTool.cpp:88
StatusCode
Definition: StatusCode.h:65
DataStreamTool::finalize
StatusCode finalize() override
Definition: DataStreamTool.cpp:86
DataStreamTool::finalizeStream
StatusCode finalizeStream(EventSelectorDataStream *) override
Finalize no longer needed stream.
Definition: DataStreamTool.cpp:129
DataStreamTool::createSelector
StatusCode createSelector(const std::string &, const std::string &, IEvtSelector *&) override
Definition: DataStreamTool.cpp:114
DataStreamTool::m_incidentSvc
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
Definition: DataStreamTool.h:101
DataStreamTool.h
DataStreamTool::connectStream
StatusCode connectStream(EventSelectorDataStream *)
Connect single stream by reference.
Definition: DataStreamTool.cpp:213
std::to_string
T to_string(T... args)
DataStreamTool::lastStream
EventSelectorDataStream * lastStream() override
Definition: DataStreamTool.cpp:182
std::vector::erase
T erase(T... args)
SmartIF< IProperty >
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
DataStreamTool::addStream
StatusCode addStream(const std::string &) override
Definition: DataStreamTool.cpp:50
IOTest.sel
sel
Definition: IOTest.py:106
DataStreamTool::initialize
StatusCode initialize() override
Definition: DataStreamTool.cpp:32
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
Service.h
DataStreamTool::getStream
EventSelectorDataStream * getStream(const std::string &) override
Retrieve stream by name.
Definition: DataStreamTool.cpp:167
make_SmartIF
SmartIF< IFace > make_SmartIF(IFace *iface)
Definition: SmartIF.h:150
DataStreamTool::eraseStream
StatusCode eraseStream(const std::string &) override
Definition: DataStreamTool.cpp:150
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
std::begin
T begin(T... args)
DataStreamTool::addStreams
StatusCode addStreams(const StreamSpecs &) override
Definition: DataStreamTool.cpp:76
DataStreamTool::m_streamID
size_type m_streamID
Definition: DataStreamTool.h:92
EventSelectorDataStream
Definition of class EventSelectorDataStream.
Definition: EventSelectorDataStream.h:55
DataStreamTool::getPreviousStream
StatusCode getPreviousStream(const EventSelectorDataStream *&, size_type &) override
Definition: DataStreamTool.cpp:248
DataStreamTool::m_streamCount
size_type m_streamCount
Definition: DataStreamTool.h:94
std::vector::end
T end(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
ISvcLocator.h
DataStreamTool::m_streamSpecs
StreamSpecs m_streamSpecs
Definition: DataStreamTool.h:98
Incident.h
Incident
Definition: Incident.h:27
PropertyHolder.h
DataStreamTool::m_streams
Streams m_streams
Definition: DataStreamTool.h:96
Gaudi::Property< int >
ISvcManager.h
MsgStream.h
AlgTool::finalize
StatusCode finalize() override
Definition: AlgTool.cpp:266