All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OutputStream.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "GaudiKernel/Tokenizer.h"
3 #include "GaudiKernel/IRegistry.h"
4 #include "GaudiKernel/IAlgManager.h"
5 #include "GaudiKernel/ISvcLocator.h"
6 #include "GaudiKernel/IConversionSvc.h"
7 #include "GaudiKernel/IDataManagerSvc.h"
8 #include "GaudiKernel/IDataProviderSvc.h"
9 #include "GaudiKernel/IPersistencySvc.h"
10 #include "GaudiKernel/IOpaqueAddress.h"
11 #include "GaudiKernel/Incident.h"
12 #include "GaudiKernel/IIncidentSvc.h"
13 
14 #include "GaudiKernel/MsgStream.h"
15 #include "GaudiKernel/strcasecmp.h"
16 #include "GaudiKernel/DataObject.h"
17 #include "GaudiKernel/DataStoreItem.h"
18 #include "OutputStream.h"
19 #include "OutputStreamAgent.h"
20 
21 #include <set>
22 
23 // Define the algorithm factory for the standard output data writer
25 
26 #define ON_DEBUG if (log.level() <= MSG::DEBUG)
27 
28 // Standard Constructor
29 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
30 : Algorithm(name, pSvcLocator)
31 {
32  m_doPreLoad = true;
33  m_doPreLoadOpt = false;
34  m_verifyItems = true;
35  m_output = "";
36  m_outputName = "";
37  m_outputType = "UPDATE";
38  m_storeName = "EventDataSvc";
39  m_persName = "EventPersistencySvc";
40  m_agent = new OutputStreamAgent(this);
41  m_acceptAlgs = new std::vector<Algorithm*>();
42  m_requireAlgs = new std::vector<Algorithm*>();
43  m_vetoAlgs = new std::vector<Algorithm*>();
46  m_fireIncidents = true;
47  declareProperty("ItemList", m_itemNames);
48  declareProperty("OptItemList", m_optItemNames);
49  declareProperty("AlgDependentItemList", m_algDependentItemList);
50  declareProperty("Preload", m_doPreLoad);
51  declareProperty("PreloadOptItems", m_doPreLoadOpt);
52  declareProperty("Output", m_output);
53  declareProperty("OutputFile", m_outputName);
54  declareProperty("EvtDataSvc", m_storeName);
55  declareProperty("EvtConversionSvc", m_persName);
56  declareProperty("AcceptAlgs", m_acceptNames);
57  declareProperty("RequireAlgs", m_requireNames);
58  declareProperty("VetoAlgs", m_vetoNames);
59  declareProperty("VerifyItems", m_verifyItems);
62 
63  // Associate action handlers with the AcceptAlgs, RequireAlgs and VetoAlgs.
67 
68  //setProperty( "OutputLevel", 2 );
69 
70 }
71 
72 // Standard Destructor
74  delete m_agent;
75  delete m_acceptAlgs;
76  delete m_requireAlgs;
77  delete m_vetoAlgs;
78 }
79 
80 // initialize data writer
82  MsgStream log(msgSvc(), name());
83 
84  // Reset the number of events written
85  m_events = 0;
86  // Get access to the DataManagerSvc
88  if( !m_pDataManager.isValid() ) {
89  log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
90  return StatusCode::FAILURE;
91  }
92  // Get access to the IncidentService
93  m_incidentSvc = serviceLocator()->service("IncidentSvc");
94  if( !m_incidentSvc.isValid() ) {
95  log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
96  return StatusCode::FAILURE;
97  }
98  // Get access to the assigned data service
100  if( !m_pDataProvider.isValid() ) {
101  log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
102  return StatusCode::FAILURE;
103  }
104  if ( hasInput() ) {
105  StatusCode status = connectConversionSvc();
106  if( !status.isSuccess() ) {
107  log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
110  return status;
111  }
112  }
113 
114  // Clear the list with optional items
116  // Clear the item list
118 
119  // Take the new item list from the properties.
120  ON_DEBUG log << MSG::DEBUG << "ItemList : " << m_itemNames << endmsg;
121  for( ItemNames::const_iterator i = m_itemNames.begin();
122  i != m_itemNames.end(); ++i )
123  {
124  addItem( m_itemList, *i );
125  }
126 
127  // Take the new item list from the properties.
128  ON_DEBUG log << MSG::DEBUG << "OptItemList : " << m_optItemNames << endmsg;
129  for( ItemNames::const_iterator i = m_optItemNames.begin();
130  i != m_optItemNames.end(); ++i )
131  {
132  addItem( m_optItemList, *i );
133  }
134 
135  // prepare the algorithm selected dependent locations
136  ON_DEBUG log << MSG::DEBUG << "AlgDependentItemList : " << m_algDependentItemList << endmsg;
137  for ( AlgDependentItemNames::const_iterator a = m_algDependentItemList.begin();
138  a != m_algDependentItemList.end(); ++a )
139  {
140  // Get the algorithm pointer
141  Algorithm * theAlgorithm = decodeAlgorithm( a->first );
142  if ( theAlgorithm )
143  {
144  // Get the item list for this alg
145  Items& items = m_algDependentItems[theAlgorithm];
146  // Clear the list for this alg
147  clearItems( items );
148  // fill the list again
149  for ( ItemNames::const_iterator i = a->second.begin();
150  i != a->second.end(); ++i )
151  {
152  addItem( items, *i );
153  }
154  }
155  }
156 
157  // Take the item list to the data service preload list.
158  if ( m_doPreLoad ) {
159  for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
160  m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
161  }
162  // Not working: bad reference counting! pdataSvc->release();
163  }
164 
165  if ( m_doPreLoadOpt ) {
166  for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
167  m_pDataProvider->addPreLoadItem( *(*j) );
168  }
169  }
170  log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
171 
172  // Decode the accept, required and veto Algorithms. The logic is the following:
173  // a. The event is accepted if all lists are empty.
174  // b. The event is provisionally accepted if any Algorithm in the accept list
175  // has been executed and has indicated that its filter is passed. This
176  // provisional acceptance can be overridden by the other lists.
177  // c. The event is rejected unless all Algorithms in the required list have
178  // been executed and have indicated that their filter passed.
179  // d. The event is rejected if any Algorithm in the veto list has been
180  // executed and has indicated that its filter has passed.
183  decodeVetoAlgs ().ignore();
184  return StatusCode::SUCCESS;
185 }
186 
187 // terminate data writer
189  MsgStream log(msgSvc(), name());
190  log << MSG::INFO << "Events output: " << m_events << endmsg;
199  return StatusCode::SUCCESS;
200 }
201 
202 // Work entry point
204 {
205  // Clear any previously existing item list
206  clearSelection();
207  // Test whether this event should be output
208  if ( isEventAccepted() )
209  {
210  const StatusCode sc = writeObjects();
211  clearSelection();
212  ++m_events;
213  if ( sc.isSuccess() && m_fireIncidents )
214  {
215  m_incidentSvc->fireIncident(Incident(m_outputName,
217  }
218  else if ( m_fireIncidents )
219  {
220  m_incidentSvc->fireIncident(Incident(m_outputName,
222  }
223  return sc;
224  }
225  return StatusCode::SUCCESS;
226 }
227 
228 // Select the different objects and write them to file
230 {
231  // Connect the output file to the service
232  StatusCode status = collectObjects();
233  if ( status.isSuccess() )
234  {
236  if ( sel->begin() != sel->end() )
237  {
238  status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
239  if ( status.isSuccess() )
240  {
241  // Now pass the collection to the persistency service
242  IOpaqueAddress* pAddress = NULL;
243  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
244  {
245  try
246  {
247  const StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
248  if ( !iret.isSuccess() )
249  {
250  status = iret;
251  continue;
252  }
253  IRegistry* pReg = (*j)->registry();
254  pReg->setAddress(pAddress);
255  }
256  catch ( const std::exception & excpt )
257  {
258  MsgStream log( msgSvc(), name() );
259  const std::string loc = ( (*j)->registry() ?
260  (*j)->registry()->identifier() : "UnRegistered" );
261  log << MSG::FATAL
262  << "std::exception during createRep for '" << loc << "' "
263  << System::typeinfoName( typeid(**j) )
264  << endmsg;
265  log << MSG::FATAL << excpt.what() << endmsg;
266  throw;
267  }
268  }
269  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
270  {
271  try
272  {
273  IRegistry* pReg = (*j)->registry();
274  const StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
275  if ( !iret.isSuccess() )
276  {
277  status = iret;
278  }
279  }
280  catch ( const std::exception & excpt )
281  {
282  MsgStream log( msgSvc(), name() );
283  const std::string loc = ( (*j)->registry() ?
284  (*j)->registry()->identifier() : "UnRegistered" );
285  log << MSG::FATAL
286  << "std::exception during fillRepRefs for '" << loc << "'"
287  << System::typeinfoName( typeid(**j) )
288  << endmsg;
289  log << MSG::FATAL << excpt.what() << endmsg;
290  throw;
291  }
292  }
293  // Commit the data if there was no error; otherwise possibly discard
294  if ( status.isSuccess() )
295  {
296  status = m_pConversionSvc->commitOutput(m_outputName, true);
297  }
298  else
299  {
300  m_pConversionSvc->commitOutput(m_outputName, false).ignore();
301  }
302  }
303  }
304  }
305  return status;
306 }
307 
308 // Place holder to create configurable data store agent
310  if ( level < m_currentItem->depth() ) {
311  if ( dir->object() != 0 ) {
312  /*
313  std::cout << "Analysing ("
314  << dir->name()
315  << ") Object:"
316  << ((dir->object()==0) ? "UNLOADED" : "LOADED")
317  << std::endl;
318  */
319  m_objects.push_back(dir->object());
320  return true;
321  }
322  }
323  return false;
324 }
325 
328  MsgStream log(msgSvc(), name());
330 
331  // Traverse the tree and collect the requested objects
332  for ( Items::iterator i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
333  DataObject* obj = 0;
334  m_currentItem = (*i);
335  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
336  if ( iret.isSuccess() ) {
337  iret = m_pDataManager->traverseSubTree(obj, m_agent);
338  if ( !iret.isSuccess() ) {
339  status = iret;
340  }
341  }
342  else {
343  log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
344  << m_currentItem->path() << endmsg;
345  status = iret;
346  }
347  }
348 
349  // Traverse the tree and collect the requested objects (tolerate missing items here)
350  for ( Items::iterator i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
351  DataObject* obj = 0;
352  m_currentItem = (*i);
353  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
354  if ( iret.isSuccess() ) {
355  iret = m_pDataManager->traverseSubTree(obj, m_agent);
356  }
357  if ( !iret.isSuccess() ) {
358  ON_DEBUG
359  log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
360  << m_currentItem->path() << endmsg;
361  }
362  }
363 
364  // Collect objects dependent on particular algorithms
365  for ( AlgDependentItems::const_iterator iAlgItems = m_algDependentItems.begin();
366  iAlgItems != m_algDependentItems.end(); ++iAlgItems )
367  {
368  Algorithm * alg = iAlgItems->first;
369  const Items& items = iAlgItems->second;
370  if ( alg->isExecuted() && alg->filterPassed() )
371  {
372  ON_DEBUG
373  log << MSG::DEBUG << "Algorithm '" << alg->name() << "' fired. Adding " << items << endmsg;
374  for ( Items::const_iterator i = items.begin(); i != items.end(); ++i )
375  {
376  DataObject* obj = NULL;
377  m_currentItem = (*i);
378  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(),obj);
379  if ( iret.isSuccess() )
380  {
381  iret = m_pDataManager->traverseSubTree(obj,m_agent);
382  if ( !iret.isSuccess() ) { status = iret; }
383  }
384  else
385  {
386  log << MSG::ERROR << "Cannot write mandatory (algorithm dependent) object(s) (Not found) "
387  << m_currentItem->path() << endmsg;
388  status = iret;
389  }
390  }
391  }
392  }
393 
394  if (status.isSuccess())
395  {
396  // Remove duplicates from the list of objects, preserving the order in the list
397  std::set<DataObject*> unique;
398  std::vector<DataObject*> tmp; // temporary vector with the reduced list
399  tmp.reserve(m_objects.size());
400  for (std::vector<DataObject*>::iterator o = m_objects.begin(); o != m_objects.end(); ++o) {
401  if (!unique.count(*o)) {
402  // if the pointer is not in the set, add it to both the set and the temporary vector
403  unique.insert(*o);
404  tmp.push_back(*o);
405  }
406  }
407  m_objects.swap(tmp); // swap the data of the two vectors
408  }
409 
410  return status;
411 }
412 
413 // Clear collected object list
415  m_objects.erase(m_objects.begin(), m_objects.end());
416 }
417 
418 // Remove all items from the output streamer list;
420  for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
421  delete (*i);
422  }
423  itms.erase(itms.begin(), itms.end());
424 }
425 
426 // Find single item identified by its path (exact match)
428 OutputStream::findItem(const std::string& path) {
429  for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
430  if ( (*i)->path() == path ) return (*i);
431  }
432  for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
433  if ( (*j)->path() == path ) return (*j);
434  }
435  return 0;
436 }
437 
438 // Add item to output streamer list
439 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
440  MsgStream log(msgSvc(), name());
441  int level = 0;
442  size_t sep = descriptor.rfind("#");
443  std::string obj_path (descriptor,0,sep);
444  std::string slevel (descriptor,sep+1,descriptor.length());
445  if ( slevel == "*" ) {
446  level = 9999999;
447  }
448  else {
449  level = atoi(slevel.c_str());
450  }
451  if ( m_verifyItems ) {
452  size_t idx = obj_path.find("/",1);
453  while(idx != std::string::npos) {
454  std::string sub_item = obj_path.substr(0,idx);
455  if ( 0 == findItem(sub_item) ) {
456  addItem(itms, sub_item+"#1");
457  }
458  idx = obj_path.find("/",idx+1);
459  }
460  }
461  DataStoreItem* item = new DataStoreItem(obj_path, level);
462  ON_DEBUG
463  log << MSG::DEBUG << "Adding OutputStream item " << item->path()
464  << " with " << item->depth()
465  << " level(s)." << endmsg;
466  itms.push_back( item );
467 }
468 
469 // Connect to proper conversion service
471  StatusCode status = StatusCode(StatusCode::FAILURE, true);
472  MsgStream log(msgSvc(), name());
473  // Get output file from input
474  std::string dbType, svc, shr;
475  Tokenizer tok(true);
476  tok.analyse(m_output, " ", "", "", "=", "'", "'");
477  for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
478  const std::string& tag = (*i).tag();
479  const std::string& val = (*i).value();
480  switch( ::toupper(tag[0]) ) {
481  case 'D':
482  m_outputName = val;
483  break;
484  case 'T':
485  dbType = val;
486  break;
487  case 'S':
488  switch( ::toupper(tag[1]) ) {
489  case 'V': svc = val; break;
490  case 'H': shr = "YES"; break;
491  }
492  break;
493  case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
494  switch( ::toupper(val[0]) ) {
495  case 'R':
496  if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
497  m_outputType = "RECREATE";
498  else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
499  m_outputType = "READ";
500  break;
501  case 'C':
502  case 'N':
503  case 'W':
504  m_outputType = "NEW";
505  break;
506  case 'U':
507  m_outputType = "UPDATE";
508  break;
509  default:
510  m_outputType = "???";
511  break;
512  }
513  break;
514  default:
515  break;
516  }
517  }
518  if ( !shr.empty() ) m_outputType += "|SHARED";
519  // Get access to the default Persistency service
520  // The default service is the same for input as for output.
521  // If this is not desired, then a specialized OutputStream must overwrite
522  // this value.
523  if ( dbType.length() > 0 || svc.length() > 0 ) {
524  std::string typ = dbType.length()>0 ? dbType : svc;
526  if( !ipers.isValid() ) {
527  log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
528  return StatusCode::FAILURE;
529  }
530  IConversionSvc *cnvSvc = 0;
531  status = ipers->getService(typ, cnvSvc);
532  if( !status.isSuccess() ) {
533  log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
534  return status;
535  }
536  // Increase reference count and keep service.
537  m_pConversionSvc = cnvSvc;
538  }
539  else
540  {
541  log << MSG::FATAL
542  << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
543  << "You either have to specify a technology name or a service name!" << endmsg
544  << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
545  return StatusCode::FAILURE;
546  }
547  return StatusCode::SUCCESS;
548 }
549 
551  MsgStream log(msgSvc(), name());
552  ON_DEBUG
553  log << MSG::DEBUG << "AcceptAlgs : " << m_acceptNames.value() << endmsg;
555 }
556 
559  if (sc.isFailure()) {
560  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
561  "OutputStream::acceptAlgsHandler",sc);
562  }
563 }
564 
566  MsgStream log(msgSvc(), name());
567  ON_DEBUG
568  log << MSG::DEBUG << "RequireAlgs : " << m_requireNames.value() << endmsg;
570 }
571 
574  if (sc.isFailure()) {
575  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
576  "OutputStream::requireAlgsHandler",sc);
577  }
578 }
579 
581  MsgStream log(msgSvc(), name());
582  ON_DEBUG
583  log << MSG::DEBUG << "VetoAlgs : " << m_vetoNames.value() << endmsg;
585 }
586 
587 void OutputStream::vetoAlgsHandler( Property& /* theProp */ ) {
589  if (sc.isFailure()) {
590  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
591  "OutputStream::vetoAlgsHandler",sc);
592  }
593 }
594 
595 Algorithm* OutputStream::decodeAlgorithm( const std::string& theName )
596 {
597  Algorithm * theAlgorithm = NULL;
598 
600  if ( theAlgMgr.isValid() )
601  {
602  // Check whether the supplied name corresponds to an existing
603  // Algorithm object.
604  SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
605  if ( theIAlg.isValid() )
606  {
607  try
608  {
609  theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
610  }
611  catch(...)
612  {
613  // do nothing
614  }
615  }
616  }
617  else
618  {
619  MsgStream log( msgSvc( ), name( ) );
620  log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
621  }
622 
623  if ( !theAlgorithm )
624  {
625  MsgStream log( msgSvc( ), name( ) );
626  log << MSG::WARNING
627  << "Failed to decode Algorithm name " << theName << endmsg;
628  }
629 
630  return theAlgorithm;
631 }
632 
634  std::vector<Algorithm*>* theAlgs )
635 {
636  // Reset the list of Algorithms
637  theAlgs->clear( );
638 
640 
641  // Build the list of Algorithms from the names list
642  const std::vector<std::string> nameList = theNames.value( );
643  for ( std::vector<std::string>::const_iterator it = nameList.begin();
644  it != nameList.end(); ++it )
645  {
646 
647  Algorithm * theAlgorithm = decodeAlgorithm( *it );
648  if ( theAlgorithm )
649  {
650  // Check that the specified algorithm doesn't already exist in the list
651  for ( std::vector<Algorithm*>::iterator ita = theAlgs->begin();
652  ita != theAlgs->end(); ++ita )
653  {
654  Algorithm * existAlgorithm = (*ita);
655  if ( theAlgorithm == existAlgorithm )
656  {
657  result = StatusCode::FAILURE;
658  break;
659  }
660  }
661  if ( result.isSuccess( ) )
662  {
663  theAlgorithm->addRef();
664  theAlgs->push_back( theAlgorithm );
665  }
666  }
667  else
668  {
669  MsgStream log( msgSvc( ), name( ) );
670  log << MSG::INFO << *it << " doesn't exist - ignored" << endmsg;
671  }
672 
673  }
674  result = StatusCode::SUCCESS;
675 
676  return result;
677 }
678 
680  typedef std::vector<Algorithm*>::iterator AlgIter;
681  bool result = true;
682 
683  // Loop over all Algorithms in the accept list to see
684  // whether any have been executed and have their filter
685  // passed flag set. Any match causes the event to be
686  // provisionally accepted.
687  if ( ! m_acceptAlgs->empty() ) {
688  result = false;
689  for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
690  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
691  result = true;
692  break;
693  }
694  }
695  }
696 
697  // Loop over all Algorithms in the required list to see
698  // whether all have been executed and have their filter
699  // passed flag set. Any mismatch causes the event to be
700  // rejected.
701  if ( result && ! m_requireAlgs->empty() ) {
702  for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
703  if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
704  result = false;
705  break;
706  }
707  }
708  }
709 
710  // Loop over all Algorithms in the veto list to see
711  // whether any have been executed and have their filter
712  // passed flag set. Any match causes the event to be
713  // rejected.
714  if ( result && ! m_vetoAlgs->empty() ) {
715  for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
716  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
717  result = false;
718  break;
719  }
720  }
721  }
722  return result;
723 }
724 
726  return !(m_itemNames.empty() && m_optItemNames.empty() &&
727  m_algDependentItemList.empty());
728 }