Gaudi Framework, version v23r5

Home   Generated: Wed Nov 28 2012
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OutputStream.cpp
Go to the documentation of this file.
1 // $Id: OutputStream.cpp,v 1.23 2008/01/15 13:46:52 marcocle Exp $
2 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
3 
4 // Framework include files
15 #include "GaudiKernel/Incident.h"
17 
18 #include "GaudiKernel/MsgStream.h"
19 #include "GaudiKernel/strcasecmp.h"
20 #include "GaudiKernel/DataObject.h"
22 #include "OutputStream.h"
23 #include "OutputStreamAgent.h"
24 
25 #include <set>
26 
27 // Define the algorithm factory for the standard output data writer
29 
30 // Standard Constructor
31 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
32  : Algorithm(name, pSvcLocator)
33 {
34  m_doPreLoad = true;
35  m_doPreLoadOpt = false;
36  m_verifyItems = true;
37  m_output = "";
38  m_outputName = "";
39  m_outputType = "UPDATE";
40  m_storeName = "EventDataSvc";
41  m_persName = "EventPersistencySvc";
42  m_agent = new OutputStreamAgent(this);
43  m_acceptAlgs = new std::vector<Algorithm*>();
44  m_requireAlgs = new std::vector<Algorithm*>();
45  m_vetoAlgs = new std::vector<Algorithm*>();
48  m_fireIncidents = true;
49  declareProperty("ItemList", m_itemNames);
50  declareProperty("OptItemList", m_optItemNames);
51  declareProperty("Preload", m_doPreLoad);
52  declareProperty("PreloadOptItems", m_doPreLoadOpt);
53  declareProperty("Output", m_output);
54  declareProperty("OutputFile", m_outputName);
55  declareProperty("EvtDataSvc", m_storeName);
56  declareProperty("EvtConversionSvc", m_persName);
57  declareProperty("AcceptAlgs", m_acceptNames);
58  declareProperty("RequireAlgs", m_requireNames);
59  declareProperty("VetoAlgs", m_vetoNames);
60  declareProperty("VerifyItems", m_verifyItems);
63 
64  // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
65  m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
66  m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
67  m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
68 }
69 
70 // Standard Destructor
72  delete m_agent;
73  delete m_acceptAlgs;
74  delete m_requireAlgs;
75  delete m_vetoAlgs;
76 }
77 
78 // initialize data writer
80  MsgStream log(msgSvc(), name());
81 
82  // Reset the number of events written
83  m_events = 0;
84  // Get access to the DataManagerSvc
86  if( !m_pDataManager.isValid() ) {
87  log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
88  return StatusCode::FAILURE;
89  }
90  // Get access to the IncidentService
91  m_incidentSvc = serviceLocator()->service("IncidentSvc");
92  if( !m_incidentSvc.isValid() ) {
93  log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
94  return StatusCode::FAILURE;
95  }
96  // Get access to the assigned data service
98  if( !m_pDataProvider.isValid() ) {
99  log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
100  return StatusCode::FAILURE;
101  }
102  if ( hasInput() ) {
103  StatusCode status = connectConversionSvc();
104  if( !status.isSuccess() ) {
105  log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
108  return status;
109  }
110  }
111 
112  // Clear the list with optional items
114  // Clear the item list
116 
118  // Take the new item list from the properties.
119  for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
120  addItem( m_itemList, *i );
121  }
122 
123  // Take the new item list from the properties.
124  for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
125  addItem( m_optItemList, *i );
126  }
127 
128  // Take the item list to the data service preload list.
129  if ( m_doPreLoad ) {
130  for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
131  m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
132  }
133  // Not working: bad reference counting! pdataSvc->release();
134  }
135 
136  if ( m_doPreLoadOpt ) {
137  for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
138  m_pDataProvider->addPreLoadItem( *(*j) );
139  }
140  }
141  log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
142 
143  // Decode the accept, required and veto Algorithms. The logic is the following:
144  // a. The event is accepted if all lists are empty.
145  // b. The event is provisionally accepted if any Algorithm in the accept list
146  // has been executed and has indicated that its filter is passed. This
147  // provisional acceptance can be overridden by the other lists.
148  // c. The event is rejected unless all Algorithms in the required list have
149  // been executed and have indicated that their filter passed.
150  // d. The event is rejected if any Algorithm in the veto list has been
151  // executed and has indicated that its filter has passed.
154  decodeVetoAlgs ().ignore();
155  return StatusCode::SUCCESS;
156 }
157 
158 // terminate data writer
160  MsgStream log(msgSvc(), name());
161  log << MSG::INFO << "Events output: " << m_events << endmsg;
170  return StatusCode::SUCCESS;
171 }
172 
173 // Work entry point
175  // Clear any previously existing item list
176  clearSelection();
177  // Test whether this event should be output
178  if ( isEventAccepted() ) {
180  clearSelection();
181  m_events++;
182  if(sc.isSuccess() && m_fireIncidents)
183  m_incidentSvc->fireIncident(Incident(m_outputName,
185  else if(m_fireIncidents)
186  m_incidentSvc->fireIncident(Incident(m_outputName,
188  return sc;
189  }
190  return StatusCode::SUCCESS;
191 }
192 
193 // Select the different objects and write them to file
195  // Connect the output file to the service
196  StatusCode status = collectObjects();
197  if ( status.isSuccess() ) {
199  if ( sel->begin() != sel->end() ) {
200  status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
201  if ( status.isSuccess() ) {
202  // Now pass the collection to the persistency service
204  IOpaqueAddress* pAddress = 0;
205  for ( j = sel->begin(); j != sel->end(); j++ ) {
206  StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
207  if ( !iret.isSuccess() ) {
208  status = iret;
209  continue;
210  }
211  IRegistry* pReg = (*j)->registry();
212  pReg->setAddress(pAddress);
213  }
214  for ( j = sel->begin(); j != sel->end(); j++ ) {
215  IRegistry* pReg = (*j)->registry();
216  StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
217  if ( !iret.isSuccess() ) {
218  status = iret;
219  }
220  }
221  // Commit the data if there was no error; otherwise possibly discard
222  if ( status.isSuccess() ) {
223  status = m_pConversionSvc->commitOutput(m_outputName, true);
224  }
225  else {
226  m_pConversionSvc->commitOutput(m_outputName, false).ignore();
227  }
228  }
229  }
230  }
231  return status;
232 }
233 
234 // Place holder to create configurable data store agent
236  if ( level < m_currentItem->depth() ) {
237  if ( dir->object() != 0 ) {
238  /*
239  std::cout << "Analysing ("
240  << dir->name()
241  << ") Object:"
242  << ((dir->object()==0) ? "UNLOADED" : "LOADED")
243  << std::endl;
244  */
245  m_objects.push_back(dir->object());
246  return true;
247  }
248  }
249  return false;
250 }
251 
254  MsgStream log(msgSvc(), name());
257  // Traverse the tree and collect the requested objects
258  for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
259  DataObject* obj = 0;
260  m_currentItem = (*i);
261  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
262  if ( iret.isSuccess() ) {
263  iret = m_pDataManager->traverseSubTree(obj, m_agent);
264  if ( !iret.isSuccess() ) {
265  status = iret;
266  }
267  }
268  else {
269  log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
270  << m_currentItem->path() << endmsg;
271  status = iret;
272  }
273  }
274  // Traverse the tree and collect the requested objects (tolerate missing items here)
275  for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
276  DataObject* obj = 0;
277  m_currentItem = (*i);
278  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
279  if ( iret.isSuccess() ) {
280  iret = m_pDataManager->traverseSubTree(obj, m_agent);
281  }
282  if ( !iret.isSuccess() ) {
283  if(log.level() <= MSG::DEBUG )
284  log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
285  << m_currentItem->path() << endmsg;
286  }
287  }
288 
289  if (status.isSuccess()){
290  // Remove duplicates from the list of objects, preserving the order in the list
292  std::vector<DataObject*> tmp; // temporary vector with the reduced list
293  tmp.reserve(m_objects.size());
295  if (!unique.count(*o)) {
296  // if the pointer is not in the set, add it to both the set and the temporary vector
297  unique.insert(*o);
298  tmp.push_back(*o);
299  }
300  }
301  m_objects.swap(tmp); // swap the data of the two vectors
302  }
303 
304  return status;
305 }
306 
307 // Clear collected object list
310 }
311 
312 // Remove all items from the output streamer list;
314  for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
315  delete (*i);
316  }
317  itms.erase(itms.begin(), itms.end());
318 }
319 
320 // Find single item identified by its path (exact match)
324  if ( (*i)->path() == path ) return (*i);
325  }
327  if ( (*j)->path() == path ) return (*j);
328  }
329  return 0;
330 }
331 
332 // Add item to output streamer list
333 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
334  MsgStream log(msgSvc(), name());
335  int level = 0;
336  size_t sep = descriptor.rfind("#");
337  std::string obj_path (descriptor,0,sep);
338  std::string slevel (descriptor,sep+1,descriptor.length());
339  if ( slevel == "*" ) {
340  level = 9999999;
341  }
342  else {
343  level = atoi(slevel.c_str());
344  }
345  if ( m_verifyItems ) {
346  size_t idx = obj_path.find("/",1);
347  while(idx != std::string::npos) {
348  std::string sub_item = obj_path.substr(0,idx);
349  if ( 0 == findItem(sub_item) ) {
350  addItem(itms, sub_item+"#1");
351  }
352  idx = obj_path.find("/",idx+1);
353  }
354  }
355  DataStoreItem* item = new DataStoreItem(obj_path, level);
356  if(log.level() <= MSG::DEBUG )
357  log << MSG::DEBUG << "Adding OutputStream item " << item->path()
358  << " with " << item->depth()
359  << " level(s)." << endmsg;
360  itms.push_back( item );
361 }
362 
363 // Connect to proper conversion service
365  StatusCode status = StatusCode(StatusCode::FAILURE, true);
366  MsgStream log(msgSvc(), name());
367  // Get output file from input
368  std::string dbType, svc, shr;
369  Tokenizer tok(true);
370  tok.analyse(m_output, " ", "", "", "=", "'", "'");
371  for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
372  const std::string& tag = (*i).tag();
373  const std::string& val = (*i).value();
374  switch( ::toupper(tag[0]) ) {
375  case 'D':
376  m_outputName = val;
377  break;
378  case 'T':
379  dbType = val;
380  break;
381  case 'S':
382  switch( ::toupper(tag[1]) ) {
383  case 'V': svc = val; break;
384  case 'H': shr = "YES"; break;
385  }
386  break;
387  case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
388  switch( ::toupper(val[0]) ) {
389  case 'R':
390  if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
391  m_outputType = "RECREATE";
392  else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
393  m_outputType = "READ";
394  break;
395  case 'C':
396  case 'N':
397  case 'W':
398  m_outputType = "NEW";
399  break;
400  case 'U':
401  m_outputType = "UPDATE";
402  break;
403  default:
404  m_outputType = "???";
405  break;
406  }
407  break;
408  default:
409  break;
410  }
411  }
412  if ( !shr.empty() ) m_outputType += "|SHARED";
413  // Get access to the default Persistency service
414  // The default service is the same for input as for output.
415  // If this is not desired, then a specialized OutputStream must overwrite
416  // this value.
417  if ( dbType.length() > 0 || svc.length() > 0 ) {
418  std::string typ = dbType.length()>0 ? dbType : svc;
420  if( !ipers.isValid() ) {
421  log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
422  return StatusCode::FAILURE;
423  }
424  IConversionSvc *cnvSvc = 0;
425  status = ipers->getService(typ, cnvSvc);
426  if( !status.isSuccess() ) {
427  log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
428  return status;
429  }
430  // Increase reference count and keep service.
431  m_pConversionSvc = cnvSvc;
432  }
433  else {
434  log << MSG::FATAL
435  << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
436  << "You either have to specify a technology name or a service name!" << endmsg
437  << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
438  return StatusCode::FAILURE;
439  }
440  return StatusCode::SUCCESS;
441 }
442 
445 }
446 
449  if (sc.isFailure()) {
450  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
451  "OutputStream::acceptAlgsHandler",sc);
452  }
453 }
454 
457 }
458 
461  if (sc.isFailure()) {
462  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
463  "OutputStream::requireAlgsHandler",sc);
464  }
465 }
466 
469 }
470 
471 void OutputStream::vetoAlgsHandler( Property& /* theProp */ ) {
473  if (sc.isFailure()) {
474  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
475  "OutputStream::vetoAlgsHandler",sc);
476  }
477 }
478 
480  std::vector<Algorithm*>* theAlgs )
481 {
482  // Reset the list of Algorithms
483  theAlgs->clear( );
484 
485  MsgStream log( msgSvc( ), name( ) );
486 
488 
490  if ( theAlgMgr.isValid() ) {
491  // Build the list of Algorithms from the names list
492  const std::vector<std::string> nameList = theNames.value( );
494  std::vector<std::string>::const_iterator itend = nameList.end( );
495  for (it = nameList.begin(); it != itend; ++it) {
496  // Check whether the supplied name corresponds to an existing
497  // Algorithm object.
498  const std::string &theName = (*it);
499  SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
500  Algorithm* theAlgorithm;
501  if ( theIAlg.isValid() ) {
502  result = StatusCode::SUCCESS;
503  try{
504  theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
505  } catch(...){
506  result = StatusCode::FAILURE;
507  }
508  }
509  if ( result.isSuccess( ) ) {
510  // Check that the specified algorithm doesn't already exist in the list
512  std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
513  for (ita = theAlgs->begin(); ita != itaend; ++ita) {
514  Algorithm* existAlgorithm = (*ita);
515  if ( theAlgorithm == existAlgorithm ) {
516  result = StatusCode::FAILURE;
517  break;
518  }
519  }
520  if ( result.isSuccess( ) ) {
521  theAlgorithm->addRef();
522  theAlgs->push_back( theAlgorithm );
523  }
524  }
525  else {
526  log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
527  }
528  }
529  result = StatusCode::SUCCESS;
530  }
531  else {
532  log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
533  }
534  return result;
535 }
536 
538  typedef std::vector<Algorithm*>::iterator AlgIter;
539  bool result = true;
540 
541  // Loop over all Algorithms in the accept list to see
542  // whether any have been executed and have their filter
543  // passed flag set. Any match causes the event to be
544  // provisionally accepted.
545  if ( ! m_acceptAlgs->empty() ) {
546  result = false;
547  for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
548  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
549  result = true;
550  break;
551  }
552  }
553  }
554 
555  // Loop over all Algorithms in the required list to see
556  // whether all have been executed and have their filter
557  // passed flag set. Any mismatch causes the event to be
558  // rejected.
559  if ( result && ! m_requireAlgs->empty() ) {
560  for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
561  if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
562  result = false;
563  break;
564  }
565  }
566  }
567 
568  // Loop over all Algorithms in the veto list to see
569  // whether any have been executed and have their filter
570  // passed flag set. Any match causes the event to be
571  // rejected.
572  if ( result && ! m_vetoAlgs->empty() ) {
573  for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
574  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
575  result = false;
576  break;
577  }
578  }
579  }
580  return result;
581 }
582 
584  return !(m_itemNames.empty() && m_optItemNames.empty());
585 }

Generated at Wed Nov 28 2012 12:17:11 for Gaudi Framework, version v23r5 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004