All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
DataStreamTool.cpp
Go to the documentation of this file.
1 // Include files
2 
3 // from Gaudi
4 #include "GaudiKernel/MsgStream.h"
5 #include "GaudiKernel/SmartIF.h"
6 #include "GaudiKernel/Incident.h"
7 #include "GaudiKernel/MsgStream.h"
8 #include "GaudiKernel/ISvcLocator.h"
9 #include "GaudiKernel/ISvcManager.h"
10 #include "GaudiKernel/IAddressCreator.h"
11 #include "GaudiKernel/PropertyMgr.h"
12 #include "GaudiKernel/EventSelectorDataStream.h"
13 #include "GaudiKernel/DataStreamTool.h"
14 #include "GaudiKernel/Service.h"
15 
16 //-----------------------------------------------------------------------------
17 // Implementation file for class : DataStreamTool
18 //
19 // 2006-09-21 : Andres Felipe Osorio Oliveros
20 //-----------------------------------------------------------------------------
21 
22 //=============================================================================
23 // Standard constructor, initializes variables
24 //=============================================================================
26  const std::string& name,
27  const IInterface* parent )
28  : base_class ( type, name , parent )
29 {
30  //declareInterface<IDataStreamTool>(this);
31 }
32 //=============================================================================
34 
36 
38  if( !status.isSuccess() ) {
39  logger << MSG::FATAL << "Error. Cannot initialize base class." << endmsg;
40  return status;
41  }
42 
43  // Get the references to the services that are needed by the ApplicationMgr itself
44  m_incidentSvc = serviceLocator()->service("IncidentSvc");
45  if( !m_incidentSvc ) {
46  logger << MSG::FATAL << "Error retrieving IncidentSvc." << endmsg;
47  return StatusCode::FAILURE;
48  }
49 
50  return StatusCode::SUCCESS;
51 
52 }
53 
54 StatusCode DataStreamTool::addStream(const std::string & input) {
55 
56  if ( getStream(input) ) {
57  MsgStream log(msgSvc(), name());
58  log << MSG::WARNING << "Input stream " << input << "already in use" << endmsg;
59  }
60 
61  m_streamSpecs.push_back(input);
62 
63  auto strname = name() + '_' + std::to_string( ++m_streamCount );
64  EventSelectorDataStream* s = nullptr;
65 
66  StatusCode status = createStream(strname, input , s );
67 
68  if( status.isSuccess() && s ) {
69  s->addRef();
70  m_streams.push_back(s);
71  status = StatusCode::SUCCESS;
72  } else {
73  MsgStream log(msgSvc(), name());
74  if (s) {
75  s->release();
76  log << MSG::ERROR << "Error connecting/creating Stream: " << s << endmsg;
77  }
78  log << MSG::ERROR << "Error connecting/creating Stream: " << input << endmsg;
79  status = StatusCode::FAILURE;
80  }
81  return status;
82 }
83 
84 StatusCode DataStreamTool::addStreams(const StreamSpecs & inputs) {
85 
87  for ( auto& i : inputs ) {
88  status = addStream(i);
89  if (!status.isSuccess()) break;
90  }
91  return status;
92 
93 }
94 
96  clear().ignore();
98  return AlgTool::finalize();
99 }
100 
102  IEvtSelector* sel = nullptr;
103  StatusCode status = s->initialize();
104  if ( status.isSuccess() ) {
105  status = createSelector(s->name(), s->selectorType(), sel);
106  if ( status.isSuccess() ) {
107  SmartIF<IProperty> prop(sel); //Att: IProperty, IService used to point to EventSelector
108  SmartIF<IService> isvc(sel);
109  s->setSelector(sel);
110  sel->release(); // No need for this interface anymore, it is passed to the stream
111  if ( prop && isvc ) {
112  for( const auto& i : s->properties() ) prop->setProperty(i).ignore();
113  int output_level = this->outputLevel();
114  prop->setProperty(IntegerProperty("OutputLevel",output_level)).ignore();
115  // FIXME: (MCl) Why do we have to initialize the selector here?
116  return isvc->sysInitialize();
117  }
118  }
119  }
120  return StatusCode::FAILURE;
121 }
122 
123 // Create (sub-) Event selector service
124 StatusCode DataStreamTool::createSelector(const std::string& nam, const std::string& typ, IEvtSelector*& sel) {
125  auto isvc = make_SmartIF( Service::Factory::create(typ, nam, serviceLocator()) );
126  if ( isvc ) {
127  auto isel = isvc.as<IEvtSelector>();
128  if ( isel ) {
129  sel = isel.get();
130  sel->addRef(); // make sure that sel is not left dangling once isel and isvc go out of scope...
131  return StatusCode::SUCCESS;
132  }
133  }
134  sel = nullptr;
135  MsgStream log(msgSvc(), name());
136  log << MSG::ERROR << "Failed to create IEvtSelector " << typ << "/" << nam << endmsg;
137  return StatusCode::FAILURE;
138 }
139 
140 
142  if ( s ) {
143  IEvtSelector* sel = s->selector();
144  if ( sel ) {
145  SmartIF<IService> isvc(sel);
146  if ( isvc ) {
147  isvc->finalize().ignore();
148  s->finalize().ignore();
149  // Fire EndStream "Incident"
150  m_incidentSvc->fireIncident(Incident(name(),IncidentType::EndStream));
151  return StatusCode::SUCCESS;
152  }
153  // Failed to get service interface of sub-event selector
154  return StatusCode::FAILURE;
155  }
156  // No selector (yet) attached - no need to finalize it!
157  return StatusCode::SUCCESS;
158  }
159  return StatusCode::FAILURE;
160 }
161 
162 
163 StatusCode DataStreamTool::eraseStream ( const std::string& info ) {
164 
165  auto i = getStreamIterator(info);
166  if ( i != m_streams.end() ) {
167  (*i)->release();
168  m_streams.erase(i);
169  return StatusCode::SUCCESS;
170  }
171  return StatusCode::FAILURE;
172 }
173 
174 StatusCode DataStreamTool::createStream(const std::string& nam, const std::string& info,
175  EventSelectorDataStream*& stream) {
176  stream = new EventSelectorDataStream(nam, info, serviceLocator());
177  return StatusCode::SUCCESS;
178 }
179 
180 
182  auto i = getStreamIterator(info);
183  return i != m_streams.end() ? *i : nullptr;
184 }
185 
186 DataStreamTool::Streams::iterator DataStreamTool::getStreamIterator ( const std::string& info ) {
187  return std::find_if( std::begin(m_streams), std::end(m_streams),
188  [&](const EventSelectorDataStream* i) {
189  return i->definition()==info;
190  });
191 }
192 
194  // pos has to point inside the vector
195  return ( (pos >= 0) && ((size_t)pos < m_streams.size()) ) ? m_streams[pos]
196  : nullptr;
197 }
198 
200 {
201  return m_streams.back();
202 }
203 
204 
205 
207 {
208 
209  StatusCode iret, status = StatusCode::SUCCESS;
210  iret.ignore();
211 
212  MsgStream log(msgSvc(), name());
213 
214  // disconnect the streams
215  for ( auto& il : m_streamSpecs ) {
217  if ( s ) {
218  if ( s->isInitialized() ) {
219  iret = finalizeStream(s);
220  if ( !iret.isSuccess() ) {
221  log << MSG::ERROR << "Error finalizing Stream" << il << endmsg;
222  status = iret;
223  }
224  }
225  iret = eraseStream( il );
226  if ( !iret.isSuccess() ) {
227  log << MSG::ERROR << "Error diconnecting Stream" << il << endmsg;
228  status = iret;
229  }
230  }
231  }
232 
233  m_streamSpecs.clear();
234 
235  return status;
236 }
237 
238 
240 {
241 
242  if ( !s ) return StatusCode::FAILURE;
243  s->addRef();
244  m_streams.push_back(s);
245  return StatusCode::SUCCESS;
246 
247 }
248 
249 StatusCode DataStreamTool::connectStream( const std::string & info )
250 {
251  if ( getStream(info) ) {
252  MsgStream log(msgSvc(), name());
253  log << MSG::WARNING << "Input stream " << info << "already in use" << endmsg;
254  }
255  auto nam = name() + '_' + std::to_string( ++m_streamCount);
256  EventSelectorDataStream* s = nullptr;
257  StatusCode status = createStream(nam, info, s);
258  if ( status.isSuccess() ) return connectStream(s);
259  s->release();
260  return status;
261 }
262 
263 /*
264 
265  Taking control over Streams and return them to EventSelector
266 
267 */
268 
269 
271 {
272 
273  EventSelectorDataStream * nextStream = getStream(dsid);
274  if ( !nextStream ) return StatusCode::FAILURE; //<-end of streams reached
275 
276  esds = nextStream;
277  ++m_streamID;
278 
279  return StatusCode::SUCCESS;
280 
281 }
282 
284 {
285 
286  EventSelectorDataStream * previousStream = getStream(dsid);
287  if ( !previousStream ) return StatusCode::FAILURE; //<-begin of streams reached
288 
289  esds = previousStream;
290  --m_streamID;
291 
292  return StatusCode::SUCCESS;
293 
294 }
bool isInitialized() const
Check initialization status.
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
string to_string(const T &value)
Definition: mergesort.cpp:40
Streams::iterator getStreamIterator(const std::string &)
const std::string & name() const
Retrieve stream name.
StatusCode getNextStream(const EventSelectorDataStream *&, size_type &) override
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode initialize() override
Definition: AlgTool.cpp:325
StatusCode clear() override
The Event Selector Interface.
Definition: IEvtSelector.h:18
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
auto begin(reverse_wrapper< T > &w)
Definition: reverse.h:45
IEvtSelector * selector() const
Retrieve event selector object.
StatusCode createStream(const std::string &, const std::string &, EventSelectorDataStream *&) override
GAUDIPS_API Logger & logger()
Return the current logger instance.
StatusCode initializeStream(EventSelectorDataStream *) override
Initialize newly opened stream.
StatusCode finalize() override
StatusCode finalize() override
Definition: AlgTool.cpp:395
virtual StatusCode sysInitialize()=0
Initialize Service.
virtual StatusCode initialize()
Parse input criteria.
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
EventSelectorDataStream * lastStream() override
unsigned long addRef() override
Reference Interface instance.
Definition: implements.h:44
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:47
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Definition of the basic interface.
Definition: IInterface.h:234
SimpleProperty< int > IntegerProperty
Definition: Property.h:708
virtual StatusCode setProperty(const Property &p)=0
Set the property by property.
virtual StatusCode finalize()
Finalize stream and release resources.
StatusCode finalizeStream(EventSelectorDataStream *) override
Finalize no longer needed stream.
StatusCode createSelector(const std::string &, const std::string &, IEvtSelector *&) override
StatusCode connectStream(EventSelectorDataStream *)
Connect single stream by reference.
DataStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard constructor.
EventSelectorDataStream * getStream(const std::string &) override
Retrieve stream by name.
StatusCode addStream(const std::string &) override
StatusCode eraseStream(const std::string &) override
const std::string & definition() const
Retrieve definition string.
SmartIF< IIncidentSvc > m_incidentSvc
Reference to the incident service.
virtual unsigned long release()=0
Release Interface instance.
StatusCode initialize() override
size_type m_streamID
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
Definition of class EventSelectorDataStream.
StatusCode getPreviousStream(const EventSelectorDataStream *&, size_type &) override
const std::string & selectorType() const
Retrieve event selector type.
Base class for all Incidents (computing events).
Definition: Incident.h:16
size_type m_streamCount
string s
Definition: gaudirun.py:245
virtual unsigned long addRef()=0
Increment the reference count of Interface instance.
StatusCode addStreams(const StreamSpecs &) override
const Properties & properties()
Access properties.
void reset(TYPE *ptr=nullptr)
Set the internal pointer to the passed one disposing of the old one.
Definition: SmartIF.h:88
void ignore() const
Definition: StatusCode.h:108
list i
Definition: ana.py:128
SmartIF< IFace > make_SmartIF(IFace *iface)
Definition: SmartIF.h:143
virtual void setSelector(IEvtSelector *pSelector)
Attach event selector object.
StreamSpecs m_streamSpecs
unsigned long release() override
Release Interface instance.
Definition: implements.h:46
string type
Definition: gaudirun.py:151