All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
DataStreamTool.cpp
Go to the documentation of this file.
1 // Include files
2 
3 // from Gaudi
4 #include "GaudiKernel/MsgStream.h"
5 #include "GaudiKernel/xtoa.h"
6 #include "GaudiKernel/SmartIF.h"
7 #include "GaudiKernel/Incident.h"
8 #include "GaudiKernel/Tokenizer.h"
9 #include "GaudiKernel/MsgStream.h"
10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/ISvcLocator.h"
12 #include "GaudiKernel/ISvcManager.h"
13 #include "GaudiKernel/IAddressCreator.h"
14 #include "GaudiKernel/PropertyMgr.h"
15 #include "GaudiKernel/EventSelectorDataStream.h"
16 #include "GaudiKernel/DataStreamTool.h"
17 #include "GaudiKernel/ToolFactory.h"
18 #include "GaudiKernel/Service.h"
19 
20 #include <sstream>
21 
22 //-----------------------------------------------------------------------------
23 // Implementation file for class : DataStreamTool
24 //
25 // 2006-09-21 : Andres Felipe Osorio Oliveros
26 //-----------------------------------------------------------------------------
27 
28 //=============================================================================
29 // Standard constructor, initializes variables
30 //=============================================================================
32  const std::string& name,
33  const IInterface* parent )
34  : base_class ( type, name , parent )
35 {
36  //declareInterface<IDataStreamTool>(this);
37 
38  m_incidentSvc = 0;
39  m_streamCount = 0;
40  m_streamID = 0;
41 
42 }
43 //=============================================================================
44 // Destructor
45 //=============================================================================
47 }
48 
49 //=============================================================================
51 
53 
55  if( !status.isSuccess() ) {
56  logger << MSG::FATAL << "Error. Cannot initialize base class." << endmsg;
57  return status;
58  }
59 
60  // Get the references to the services that are needed by the ApplicationMgr itself
61  m_incidentSvc = serviceLocator()->service("IncidentSvc");
62  if( !m_incidentSvc.isValid() ) {
63  logger << MSG::FATAL << "Error retrieving IncidentSvc." << endmsg;
64  return StatusCode::FAILURE;
65  }
66 
67  return StatusCode::SUCCESS;
68 
69 }
70 
71 StatusCode DataStreamTool::addStream(const std::string & input) {
72 
73  if ( NULL != getStream(input) ) {
74  MsgStream log(msgSvc(), name());
75  log << MSG::WARNING << "Input stream " << input << "already in use" << endmsg;
76  }
77 
78  m_streamSpecs.push_back(input);
79 
80  std::ostringstream strname;
81  strname << name() << '_' << ++m_streamCount;
82 
84 
85  StatusCode status = createStream(strname.str(), input , s );
86 
87  if( status.isSuccess() && 0 != s ) {
88  s->addRef();
89  m_streams.push_back(s);
90  status = StatusCode::SUCCESS;
91  }
92  else {
93  MsgStream log(msgSvc(), name());
94  if (s) {
95  s->release();
96  log << MSG::ERROR << "Error connecting/creating Stream: " << s << endmsg;
97  }
98  log << MSG::ERROR << "Error connecting/creating Stream: " << input << endmsg;
99  status = StatusCode::FAILURE;
100  }
101 
102  return status;
103 
104 }
105 
107 
109 
110  for ( StreamSpecs::const_iterator itr = inputs.begin(); itr != inputs.end() && status.isSuccess(); ++itr ) {
111 
112  status = addStream(*itr);
113 
114  }
115 
116  return status;
117 
118 }
119 
121  clear().ignore();
122  m_incidentSvc = 0; // release
123 
124  return AlgTool::finalize();
125 }
126 
128  IEvtSelector* sel = 0;
129  StatusCode status = s->initialize();
130  if ( status.isSuccess() ) {
131  status = createSelector(s->name(), s->selectorType(), sel);
132  if ( status.isSuccess() ) {
133  SmartIF<IProperty> prop(sel); //Att: IProperty, IService used to point to EventSelector
134  SmartIF<IService> isvc(sel);
135  s->setSelector(sel);
136  sel->release(); // No need of this interface anymore, it is passed to the stream
137  if ( prop.isValid( ) && isvc.isValid( ) ) {
138  const Properties& p = s->properties();
139  for(Properties::const_iterator i=p.begin(); i!=p.end(); i++) {
140  prop->setProperty((*i)).ignore();
141  }
142  int output_level = this->outputLevel();
143  prop->setProperty(IntegerProperty("OutputLevel",output_level)).ignore();
144  // FIXME: (MCl) Why do we have to initialize the selector here?
145  return isvc->sysInitialize();
146  }
147  }
148  }
149  return StatusCode::FAILURE;
150 }
151 
152 // Create (sub-) Event selector service
153 StatusCode DataStreamTool::createSelector(const std::string& nam, const std::string& typ, IEvtSelector*& sel) {
154  IService* isvc = Service::Factory::create(typ, nam, serviceLocator());
155  if ( isvc ) {
156  StatusCode status = isvc->queryInterface(IEvtSelector::interfaceID(), (void**)&sel);
157  if ( status.isSuccess() ) {
158  return status;
159  }
160  sel = 0;
161  isvc->release();
162  }
163  MsgStream log(msgSvc(), name());
164  log << MSG::ERROR << "Failed to create IEvtSelector " << typ << "/" << nam << endmsg;
165  return StatusCode::FAILURE;
166 }
167 
168 
170  if ( s ) {
171  IEvtSelector* sel = const_cast<IEvtSelector*>(s->selector());
172  if ( sel ) {
173  SmartIF<IService> isvc(sel);
174  if ( isvc.isValid() ) {
175  isvc->finalize().ignore();
176  s->finalize().ignore();
177  // Fire EndStream "Incident"
179  return StatusCode::SUCCESS;
180  }
181  // Failed to get service interface of sub-event selector
182  return StatusCode::FAILURE;
183  }
184  // No selector (yet) attached - no need to finalize it!
185  return StatusCode::SUCCESS;
186  }
187  return StatusCode::FAILURE;
188 }
189 
190 
191 StatusCode DataStreamTool::eraseStream ( const std::string& info ) {
192 
193  Streams::iterator i = getStreamIterator(info);
194 
195  if ( m_streams.end() != i ) {
196  (*i)->release();
197  m_streams.erase(i);
198  return StatusCode::SUCCESS;
199  }
200 
201  return StatusCode::FAILURE;
202 }
203 
204 StatusCode DataStreamTool::createStream(const std::string& nam, const std::string& info,
205  EventSelectorDataStream*& stream) {
206  stream = new EventSelectorDataStream(nam, info, serviceLocator());
207 
208  return StatusCode::SUCCESS;
209 }
210 
211 
213  Streams::iterator i = getStreamIterator(info);
214  if ( m_streams.end() == i ) return NULL;
215  return *i;
216 }
217 
218 DataStreamTool::Streams::iterator DataStreamTool::getStreamIterator ( const std::string& info ) {
219  for ( Streams::iterator i = m_streams.begin(); i != m_streams.end(); i++ ) {
220  if ( (*i)->definition() == info ) {
221  return i;
222  }
223  }
224  return m_streams.end();
225 }
226 
228  if ( (pos >= 0) && ((size_t)pos < m_streams.size()) ) // pos has to point inside the vector
229  return m_streams[pos];
230  else
231  return 0;
232 }
233 
235 {
236  if (m_streams.size() > 1 )
237  return *(--m_streams.end());
238  else return *m_streams.begin();
239 
240 }
241 
242 
243 
245 {
246 
247  StatusCode iret, status = StatusCode::SUCCESS;
248  iret.ignore();
249 
250  MsgStream log(msgSvc(), name());
251 
252  // disconnect the streams
253  for ( StreamSpecs::const_iterator il = m_streamSpecs.begin(); il != m_streamSpecs.end(); il++ ) {
255  if ( NULL != s ) {
256  if ( s->isInitialized() ) {
257  iret = finalizeStream(s);
258  if ( !iret.isSuccess() ) {
259  log << MSG::ERROR << "Error finalizing Stream" << *il << endmsg;
260  status = iret;
261  }
262  }
263  iret = eraseStream( *il );
264  if ( !iret.isSuccess() ) {
265  log << MSG::ERROR << "Error diconnecting Stream" << *il << endmsg;
266  status = iret;
267  }
268  }
269  }
270 
271  m_streamSpecs.clear();
272 
273  return status;
274 }
275 
276 
278 {
279 
280  if ( 0 != s ) {
281  s->addRef();
282  m_streams.push_back(s);
283  return StatusCode::SUCCESS;
284  }
285 
286  return StatusCode::FAILURE;
287 
288 }
289 
290 StatusCode DataStreamTool::connectStream( const std::string & info )
291 {
292 
293  if ( NULL != getStream(info) ) {
294  MsgStream log(msgSvc(), name());
295  log << MSG::WARNING << "Input stream " << info << "already in use" << endmsg;
296  }
297  std::ostringstream nam;
298  nam << name() << '_' << ++m_streamCount;
300  StatusCode status = createStream(nam.str(), info, s);
301  if ( status.isSuccess() ) {
302  return connectStream(s);
303  }
304  s->release();
305  return status;
306 
307 
308 }
309 
310 /*
311 
312  Taking control over Streams and return them to EventSelector
313 
314 */
315 
316 
318 {
319 
320  EventSelectorDataStream * nextStream = getStream(dsid);
321  if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached
322 
323  esds = nextStream;
324  ++m_streamID;
325 
326  return StatusCode::SUCCESS;
327 
328 }
329 
331 {
332 
333  EventSelectorDataStream * previousStream = getStream(dsid);
334  if ( NULL == previousStream ) return StatusCode::FAILURE; //<-begin of streams reached
335 
336  esds = previousStream;
337  --m_streamID;
338 
339  return StatusCode::SUCCESS;
340 
341 }
342 
343 
344