Gaudi Framework, version v24r2

Home   Generated: Wed Dec 4 2013
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OutputStream.cpp
Go to the documentation of this file.
1 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
2 
3 // Framework include files
14 #include "GaudiKernel/Incident.h"
16 
17 #include "GaudiKernel/MsgStream.h"
18 #include "GaudiKernel/strcasecmp.h"
19 #include "GaudiKernel/DataObject.h"
21 #include "OutputStream.h"
22 #include "OutputStreamAgent.h"
23 
24 #include <set>
25 
26 // Define the algorithm factory for the standard output data writer
28 
29 // Standard Constructor
30 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
31 : Algorithm(name, pSvcLocator)
32 {
33  m_doPreLoad = true;
34  m_doPreLoadOpt = false;
35  m_verifyItems = true;
36  m_output = "";
37  m_outputName = "";
38  m_outputType = "UPDATE";
39  m_storeName = "EventDataSvc";
40  m_persName = "EventPersistencySvc";
41  m_agent = new OutputStreamAgent(this);
42  m_acceptAlgs = new std::vector<Algorithm*>();
43  m_requireAlgs = new std::vector<Algorithm*>();
44  m_vetoAlgs = new std::vector<Algorithm*>();
47  m_fireIncidents = true;
48  declareProperty("ItemList", m_itemNames);
49  declareProperty("OptItemList", m_optItemNames);
50  declareProperty("Preload", m_doPreLoad);
51  declareProperty("PreloadOptItems", m_doPreLoadOpt);
52  declareProperty("Output", m_output);
53  declareProperty("OutputFile", m_outputName);
54  declareProperty("EvtDataSvc", m_storeName);
55  declareProperty("EvtConversionSvc", m_persName);
56  declareProperty("AcceptAlgs", m_acceptNames);
57  declareProperty("RequireAlgs", m_requireNames);
58  declareProperty("VetoAlgs", m_vetoNames);
59  declareProperty("VerifyItems", m_verifyItems);
62 
63  // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
64  m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
65  m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
66  m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
67 }
68 
69 // Standard Destructor
71  delete m_agent;
72  delete m_acceptAlgs;
73  delete m_requireAlgs;
74  delete m_vetoAlgs;
75 }
76 
77 // initialize data writer
79  MsgStream log(msgSvc(), name());
80 
81  // Reset the number of events written
82  m_events = 0;
83  // Get access to the DataManagerSvc
85  if( !m_pDataManager.isValid() ) {
86  log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
87  return StatusCode::FAILURE;
88  }
89  // Get access to the IncidentService
90  m_incidentSvc = serviceLocator()->service("IncidentSvc");
91  if( !m_incidentSvc.isValid() ) {
92  log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
93  return StatusCode::FAILURE;
94  }
95  // Get access to the assigned data service
97  if( !m_pDataProvider.isValid() ) {
98  log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
99  return StatusCode::FAILURE;
100  }
101  if ( hasInput() ) {
102  StatusCode status = connectConversionSvc();
103  if( !status.isSuccess() ) {
104  log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
107  return status;
108  }
109  }
110 
111  // Clear the list with optional items
113  // Clear the item list
115 
117  // Take the new item list from the properties.
118  for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
119  addItem( m_itemList, *i );
120  }
121 
122  // Take the new item list from the properties.
123  for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
124  addItem( m_optItemList, *i );
125  }
126 
127  // Take the item list to the data service preload list.
128  if ( m_doPreLoad ) {
129  for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
130  m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
131  }
132  // Not working: bad reference counting! pdataSvc->release();
133  }
134 
135  if ( m_doPreLoadOpt ) {
136  for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
137  m_pDataProvider->addPreLoadItem( *(*j) );
138  }
139  }
140  log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
141 
142  // Decode the accept, required and veto Algorithms. The logic is the following:
143  // a. The event is accepted if all lists are empty.
144  // b. The event is provisionally accepted if any Algorithm in the accept list
145  // has been executed and has indicated that its filter is passed. This
146  // provisional acceptance can be overridden by the other lists.
147  // c. The event is rejected unless all Algorithms in the required list have
148  // been executed and have indicated that their filter passed.
149  // d. The event is rejected if any Algorithm in the veto list has been
150  // executed and has indicated that its filter has passed.
153  decodeVetoAlgs ().ignore();
154  return StatusCode::SUCCESS;
155 }
156 
157 // terminate data writer
159  MsgStream log(msgSvc(), name());
160  log << MSG::INFO << "Events output: " << m_events << endmsg;
169  return StatusCode::SUCCESS;
170 }
171 
172 // Work entry point
174 {
175  // Clear any previously existing item list
176  clearSelection();
177  // Test whether this event should be output
178  if ( isEventAccepted() )
179  {
180  const StatusCode sc = writeObjects();
181  clearSelection();
182  ++m_events;
183  if ( sc.isSuccess() && m_fireIncidents )
184  {
185  m_incidentSvc->fireIncident(Incident(m_outputName,
187  }
188  else if ( m_fireIncidents )
189  {
190  m_incidentSvc->fireIncident(Incident(m_outputName,
192  }
193  return sc;
194  }
195  return StatusCode::SUCCESS;
196 }
197 
198 // Select the different objects and write them to file
200 {
201  // Connect the output file to the service
202  StatusCode status = collectObjects();
203  if ( status.isSuccess() )
204  {
206  if ( sel->begin() != sel->end() )
207  {
208  status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
209  if ( status.isSuccess() )
210  {
211  // Now pass the collection to the persistency service
212  IOpaqueAddress* pAddress = NULL;
213  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
214  {
215  try
216  {
217  const StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
218  if ( !iret.isSuccess() )
219  {
220  status = iret;
221  continue;
222  }
223  IRegistry* pReg = (*j)->registry();
224  pReg->setAddress(pAddress);
225  }
226  catch ( const std::exception & excpt )
227  {
228  MsgStream log( msgSvc(), name() );
229  const std::string loc = ( (*j)->registry() ?
230  (*j)->registry()->identifier() : "UnRegistered" );
231  log << MSG::FATAL
232  << "std::exception during createRep for '" << loc << "' "
233  << System::typeinfoName( typeid(**j) )
234  << endmsg;
235  log << MSG::FATAL << excpt.what() << endmsg;
236  throw;
237  }
238  }
239  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
240  {
241  try
242  {
243  IRegistry* pReg = (*j)->registry();
244  const StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
245  if ( !iret.isSuccess() )
246  {
247  status = iret;
248  }
249  }
250  catch ( const std::exception & excpt )
251  {
252  MsgStream log( msgSvc(), name() );
253  const std::string loc = ( (*j)->registry() ?
254  (*j)->registry()->identifier() : "UnRegistered" );
255  log << MSG::FATAL
256  << "std::exception during fillRepRefs for '" << loc << "'"
257  << System::typeinfoName( typeid(**j) )
258  << endmsg;
259  log << MSG::FATAL << excpt.what() << endmsg;
260  throw;
261  }
262  }
263  // Commit the data if there was no error; otherwise possibly discard
264  if ( status.isSuccess() )
265  {
266  status = m_pConversionSvc->commitOutput(m_outputName, true);
267  }
268  else
269  {
270  m_pConversionSvc->commitOutput(m_outputName, false).ignore();
271  }
272  }
273  }
274  }
275  return status;
276 }
277 
278 // Place holder to create configurable data store agent
280  if ( level < m_currentItem->depth() ) {
281  if ( dir->object() != 0 ) {
282  /*
283  std::cout << "Analysing ("
284  << dir->name()
285  << ") Object:"
286  << ((dir->object()==0) ? "UNLOADED" : "LOADED")
287  << std::endl;
288  */
289  m_objects.push_back(dir->object());
290  return true;
291  }
292  }
293  return false;
294 }
295 
298  MsgStream log(msgSvc(), name());
301  // Traverse the tree and collect the requested objects
302  for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
303  DataObject* obj = 0;
304  m_currentItem = (*i);
305  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
306  if ( iret.isSuccess() ) {
307  iret = m_pDataManager->traverseSubTree(obj, m_agent);
308  if ( !iret.isSuccess() ) {
309  status = iret;
310  }
311  }
312  else {
313  log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
314  << m_currentItem->path() << endmsg;
315  status = iret;
316  }
317  }
318  // Traverse the tree and collect the requested objects (tolerate missing items here)
319  for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
320  DataObject* obj = 0;
321  m_currentItem = (*i);
322  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
323  if ( iret.isSuccess() ) {
324  iret = m_pDataManager->traverseSubTree(obj, m_agent);
325  }
326  if ( !iret.isSuccess() ) {
327  if(log.level() <= MSG::DEBUG )
328  log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
329  << m_currentItem->path() << endmsg;
330  }
331  }
332 
333  if (status.isSuccess()){
334  // Remove duplicates from the list of objects, preserving the order in the list
336  std::vector<DataObject*> tmp; // temporary vector with the reduced list
337  tmp.reserve(m_objects.size());
339  if (!unique.count(*o)) {
340  // if the pointer is not in the set, add it to both the set and the temporary vector
341  unique.insert(*o);
342  tmp.push_back(*o);
343  }
344  }
345  m_objects.swap(tmp); // swap the data of the two vectors
346  }
347 
348  return status;
349 }
350 
351 // Clear collected object list
354 }
355 
356 // Remove all items from the output streamer list;
358  for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
359  delete (*i);
360  }
361  itms.erase(itms.begin(), itms.end());
362 }
363 
364 // Find single item identified by its path (exact match)
368  if ( (*i)->path() == path ) return (*i);
369  }
371  if ( (*j)->path() == path ) return (*j);
372  }
373  return 0;
374 }
375 
376 // Add item to output streamer list
377 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
378  MsgStream log(msgSvc(), name());
379  int level = 0;
380  size_t sep = descriptor.rfind("#");
381  std::string obj_path (descriptor,0,sep);
382  std::string slevel (descriptor,sep+1,descriptor.length());
383  if ( slevel == "*" ) {
384  level = 9999999;
385  }
386  else {
387  level = atoi(slevel.c_str());
388  }
389  if ( m_verifyItems ) {
390  size_t idx = obj_path.find("/",1);
391  while(idx != std::string::npos) {
392  std::string sub_item = obj_path.substr(0,idx);
393  if ( 0 == findItem(sub_item) ) {
394  addItem(itms, sub_item+"#1");
395  }
396  idx = obj_path.find("/",idx+1);
397  }
398  }
399  DataStoreItem* item = new DataStoreItem(obj_path, level);
400  if(log.level() <= MSG::DEBUG )
401  log << MSG::DEBUG << "Adding OutputStream item " << item->path()
402  << " with " << item->depth()
403  << " level(s)." << endmsg;
404  itms.push_back( item );
405 }
406 
407 // Connect to proper conversion service
409  StatusCode status = StatusCode(StatusCode::FAILURE, true);
410  MsgStream log(msgSvc(), name());
411  // Get output file from input
412  std::string dbType, svc, shr;
413  Tokenizer tok(true);
414  tok.analyse(m_output, " ", "", "", "=", "'", "'");
415  for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
416  const std::string& tag = (*i).tag();
417  const std::string& val = (*i).value();
418  switch( ::toupper(tag[0]) ) {
419  case 'D':
420  m_outputName = val;
421  break;
422  case 'T':
423  dbType = val;
424  break;
425  case 'S':
426  switch( ::toupper(tag[1]) ) {
427  case 'V': svc = val; break;
428  case 'H': shr = "YES"; break;
429  }
430  break;
431  case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
432  switch( ::toupper(val[0]) ) {
433  case 'R':
434  if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
435  m_outputType = "RECREATE";
436  else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
437  m_outputType = "READ";
438  break;
439  case 'C':
440  case 'N':
441  case 'W':
442  m_outputType = "NEW";
443  break;
444  case 'U':
445  m_outputType = "UPDATE";
446  break;
447  default:
448  m_outputType = "???";
449  break;
450  }
451  break;
452  default:
453  break;
454  }
455  }
456  if ( !shr.empty() ) m_outputType += "|SHARED";
457  // Get access to the default Persistency service
458  // The default service is the same for input as for output.
459  // If this is not desired, then a specialized OutputStream must overwrite
460  // this value.
461  if ( dbType.length() > 0 || svc.length() > 0 ) {
462  std::string typ = dbType.length()>0 ? dbType : svc;
464  if( !ipers.isValid() ) {
465  log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
466  return StatusCode::FAILURE;
467  }
468  IConversionSvc *cnvSvc = 0;
469  status = ipers->getService(typ, cnvSvc);
470  if( !status.isSuccess() ) {
471  log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
472  return status;
473  }
474  // Increase reference count and keep service.
475  m_pConversionSvc = cnvSvc;
476  }
477  else {
478  log << MSG::FATAL
479  << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
480  << "You either have to specify a technology name or a service name!" << endmsg
481  << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
482  return StatusCode::FAILURE;
483  }
484  return StatusCode::SUCCESS;
485 }
486 
489 }
490 
493  if (sc.isFailure()) {
494  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
495  "OutputStream::acceptAlgsHandler",sc);
496  }
497 }
498 
501 }
502 
505  if (sc.isFailure()) {
506  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
507  "OutputStream::requireAlgsHandler",sc);
508  }
509 }
510 
513 }
514 
515 void OutputStream::vetoAlgsHandler( Property& /* theProp */ ) {
517  if (sc.isFailure()) {
518  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
519  "OutputStream::vetoAlgsHandler",sc);
520  }
521 }
522 
524  std::vector<Algorithm*>* theAlgs )
525 {
526  // Reset the list of Algorithms
527  theAlgs->clear( );
528 
529  MsgStream log( msgSvc( ), name( ) );
530 
532 
534  if ( theAlgMgr.isValid() ) {
535  // Build the list of Algorithms from the names list
536  const std::vector<std::string> nameList = theNames.value( );
538  std::vector<std::string>::const_iterator itend = nameList.end( );
539  for (it = nameList.begin(); it != itend; ++it) {
540  // Check whether the supplied name corresponds to an existing
541  // Algorithm object.
542  const std::string &theName = (*it);
543  SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
544  Algorithm* theAlgorithm;
545  if ( theIAlg.isValid() ) {
546  result = StatusCode::SUCCESS;
547  try{
548  theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
549  } catch(...){
550  result = StatusCode::FAILURE;
551  }
552  }
553  if ( result.isSuccess( ) ) {
554  // Check that the specified algorithm doesn't already exist in the list
556  std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
557  for (ita = theAlgs->begin(); ita != itaend; ++ita) {
558  Algorithm* existAlgorithm = (*ita);
559  if ( theAlgorithm == existAlgorithm ) {
560  result = StatusCode::FAILURE;
561  break;
562  }
563  }
564  if ( result.isSuccess( ) ) {
565  theAlgorithm->addRef();
566  theAlgs->push_back( theAlgorithm );
567  }
568  }
569  else {
570  log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
571  }
572  }
573  result = StatusCode::SUCCESS;
574  }
575  else {
576  log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
577  }
578  return result;
579 }
580 
582  typedef std::vector<Algorithm*>::iterator AlgIter;
583  bool result = true;
584 
585  // Loop over all Algorithms in the accept list to see
586  // whether any have been executed and have their filter
587  // passed flag set. Any match causes the event to be
588  // provisionally accepted.
589  if ( ! m_acceptAlgs->empty() ) {
590  result = false;
591  for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
592  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
593  result = true;
594  break;
595  }
596  }
597  }
598 
599  // Loop over all Algorithms in the required list to see
600  // whether all have been executed and have their filter
601  // passed flag set. Any mismatch causes the event to be
602  // rejected.
603  if ( result && ! m_requireAlgs->empty() ) {
604  for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
605  if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
606  result = false;
607  break;
608  }
609  }
610  }
611 
612  // Loop over all Algorithms in the veto list to see
613  // whether any have been executed and have their filter
614  // passed flag set. Any match causes the event to be
615  // rejected.
616  if ( result && ! m_vetoAlgs->empty() ) {
617  for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
618  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
619  result = false;
620  break;
621  }
622  }
623  }
624  return result;
625 }
626 
628  return !(m_itemNames.empty() && m_optItemNames.empty());
629 }

Generated at Wed Dec 4 2013 14:33:07 for Gaudi Framework, version v24r2 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004