Gaudi Framework, version v25r0

Home   Generated: Mon Feb 17 2014
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
OutputStream.cpp
Go to the documentation of this file.
1 // Framework include files
11 #include "GaudiKernel/Incident.h"
13 
14 #include "GaudiKernel/MsgStream.h"
15 #include "GaudiKernel/strcasecmp.h"
16 #include "GaudiKernel/DataObject.h"
18 #include "OutputStream.h"
19 #include "OutputStreamAgent.h"
20 
21 #include <set>
22 
23 // Define the algorithm factory for the standard output data writer
25 
26 // Standard Constructor
27 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
28 : Algorithm(name, pSvcLocator)
29 {
30  m_doPreLoad = true;
31  m_doPreLoadOpt = false;
32  m_verifyItems = true;
33  m_output = "";
34  m_outputName = "";
35  m_outputType = "UPDATE";
36  m_storeName = "EventDataSvc";
37  m_persName = "EventPersistencySvc";
38  m_agent = new OutputStreamAgent(this);
39  m_acceptAlgs = new std::vector<Algorithm*>();
40  m_requireAlgs = new std::vector<Algorithm*>();
41  m_vetoAlgs = new std::vector<Algorithm*>();
44  m_fireIncidents = true;
45  declareProperty("ItemList", m_itemNames);
46  declareProperty("OptItemList", m_optItemNames);
47  declareProperty("Preload", m_doPreLoad);
48  declareProperty("PreloadOptItems", m_doPreLoadOpt);
49  declareProperty("Output", m_output);
50  declareProperty("OutputFile", m_outputName);
51  declareProperty("EvtDataSvc", m_storeName);
52  declareProperty("EvtConversionSvc", m_persName);
53  declareProperty("AcceptAlgs", m_acceptNames);
54  declareProperty("RequireAlgs", m_requireNames);
55  declareProperty("VetoAlgs", m_vetoNames);
56  declareProperty("VerifyItems", m_verifyItems);
59 
60  // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
61  m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
62  m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
63  m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
64 }
65 
66 // Standard Destructor
68  delete m_agent;
69  delete m_acceptAlgs;
70  delete m_requireAlgs;
71  delete m_vetoAlgs;
72 }
73 
74 // initialize data writer
76  MsgStream log(msgSvc(), name());
77 
78  // Reset the number of events written
79  m_events = 0;
80  // Get access to the DataManagerSvc
82  if( !m_pDataManager.isValid() ) {
83  log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
84  return StatusCode::FAILURE;
85  }
86  // Get access to the IncidentService
87  m_incidentSvc = serviceLocator()->service("IncidentSvc");
88  if( !m_incidentSvc.isValid() ) {
89  log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
90  return StatusCode::FAILURE;
91  }
92  // Get access to the assigned data service
94  if( !m_pDataProvider.isValid() ) {
95  log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
96  return StatusCode::FAILURE;
97  }
98  if ( hasInput() ) {
100  if( !status.isSuccess() ) {
101  log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
104  return status;
105  }
106  }
107 
108  // Clear the list with optional items
110  // Clear the item list
112 
114  // Take the new item list from the properties.
115  for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
116  addItem( m_itemList, *i );
117  }
118 
119  // Take the new item list from the properties.
120  for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
121  addItem( m_optItemList, *i );
122  }
123 
124  // Take the item list to the data service preload list.
125  if ( m_doPreLoad ) {
126  for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
127  m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
128  }
129  // Not working: bad reference counting! pdataSvc->release();
130  }
131 
132  if ( m_doPreLoadOpt ) {
133  for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
134  m_pDataProvider->addPreLoadItem( *(*j) );
135  }
136  }
137  log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
138 
139  // Decode the accept, required and veto Algorithms. The logic is the following:
140  // a. The event is accepted if all lists are empty.
141  // b. The event is provisionally accepted if any Algorithm in the accept list
142  // has been executed and has indicated that its filter is passed. This
143  // provisional acceptance can be overridden by the other lists.
144  // c. The event is rejected unless all Algorithms in the required list have
145  // been executed and have indicated that their filter passed.
146  // d. The event is rejected if any Algorithm in the veto list has been
147  // executed and has indicated that its filter has passed.
150  decodeVetoAlgs ().ignore();
151  return StatusCode::SUCCESS;
152 }
153 
154 // terminate data writer
156  MsgStream log(msgSvc(), name());
157  log << MSG::INFO << "Events output: " << m_events << endmsg;
166  return StatusCode::SUCCESS;
167 }
168 
169 // Work entry point
171 {
172  // Clear any previously existing item list
173  clearSelection();
174  // Test whether this event should be output
175  if ( isEventAccepted() )
176  {
177  const StatusCode sc = writeObjects();
178  clearSelection();
179  ++m_events;
180  if ( sc.isSuccess() && m_fireIncidents )
181  {
182  m_incidentSvc->fireIncident(Incident(m_outputName,
184  }
185  else if ( m_fireIncidents )
186  {
187  m_incidentSvc->fireIncident(Incident(m_outputName,
189  }
190  return sc;
191  }
192  return StatusCode::SUCCESS;
193 }
194 
195 // Select the different objects and write them to file
197 {
198  // Connect the output file to the service
199  StatusCode status = collectObjects();
200  if ( status.isSuccess() )
201  {
203  if ( sel->begin() != sel->end() )
204  {
205  status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
206  if ( status.isSuccess() )
207  {
208  // Now pass the collection to the persistency service
209  IOpaqueAddress* pAddress = NULL;
210  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
211  {
212  try
213  {
214  const StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
215  if ( !iret.isSuccess() )
216  {
217  status = iret;
218  continue;
219  }
220  IRegistry* pReg = (*j)->registry();
221  pReg->setAddress(pAddress);
222  }
223  catch ( const std::exception & excpt )
224  {
225  MsgStream log( msgSvc(), name() );
226  const std::string loc = ( (*j)->registry() ?
227  (*j)->registry()->identifier() : "UnRegistered" );
228  log << MSG::FATAL
229  << "std::exception during createRep for '" << loc << "' "
230  << System::typeinfoName( typeid(**j) )
231  << endmsg;
232  log << MSG::FATAL << excpt.what() << endmsg;
233  throw;
234  }
235  }
236  for ( IDataSelector::iterator j = sel->begin(); j != sel->end(); ++j )
237  {
238  try
239  {
240  IRegistry* pReg = (*j)->registry();
241  const StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
242  if ( !iret.isSuccess() )
243  {
244  status = iret;
245  }
246  }
247  catch ( const std::exception & excpt )
248  {
249  MsgStream log( msgSvc(), name() );
250  const std::string loc = ( (*j)->registry() ?
251  (*j)->registry()->identifier() : "UnRegistered" );
252  log << MSG::FATAL
253  << "std::exception during fillRepRefs for '" << loc << "'"
254  << System::typeinfoName( typeid(**j) )
255  << endmsg;
256  log << MSG::FATAL << excpt.what() << endmsg;
257  throw;
258  }
259  }
260  // Commit the data if there was no error; otherwise possibly discard
261  if ( status.isSuccess() )
262  {
263  status = m_pConversionSvc->commitOutput(m_outputName, true);
264  }
265  else
266  {
267  m_pConversionSvc->commitOutput(m_outputName, false).ignore();
268  }
269  }
270  }
271  }
272  return status;
273 }
274 
275 // Place holder to create configurable data store agent
277  if ( level < m_currentItem->depth() ) {
278  if ( dir->object() != 0 ) {
279  /*
280  std::cout << "Analysing ("
281  << dir->name()
282  << ") Object:"
283  << ((dir->object()==0) ? "UNLOADED" : "LOADED")
284  << std::endl;
285  */
286  m_objects.push_back(dir->object());
287  return true;
288  }
289  }
290  return false;
291 }
292 
295  MsgStream log(msgSvc(), name());
298  // Traverse the tree and collect the requested objects
299  for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
300  DataObject* obj = 0;
301  m_currentItem = (*i);
302  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
303  if ( iret.isSuccess() ) {
304  iret = m_pDataManager->traverseSubTree(obj, m_agent);
305  if ( !iret.isSuccess() ) {
306  status = iret;
307  }
308  }
309  else {
310  log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
311  << m_currentItem->path() << endmsg;
312  status = iret;
313  }
314  }
315  // Traverse the tree and collect the requested objects (tolerate missing items here)
316  for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
317  DataObject* obj = 0;
318  m_currentItem = (*i);
319  StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
320  if ( iret.isSuccess() ) {
321  iret = m_pDataManager->traverseSubTree(obj, m_agent);
322  }
323  if ( !iret.isSuccess() ) {
324  if(log.level() <= MSG::DEBUG )
325  log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
326  << m_currentItem->path() << endmsg;
327  }
328  }
329 
330  if (status.isSuccess()){
331  // Remove duplicates from the list of objects, preserving the order in the list
333  std::vector<DataObject*> tmp; // temporary vector with the reduced list
334  tmp.reserve(m_objects.size());
336  if (!unique.count(*o)) {
337  // if the pointer is not in the set, add it to both the set and the temporary vector
338  unique.insert(*o);
339  tmp.push_back(*o);
340  }
341  }
342  m_objects.swap(tmp); // swap the data of the two vectors
343  }
344 
345  return status;
346 }
347 
348 // Clear collected object list
351 }
352 
353 // Remove all items from the output streamer list;
355  for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
356  delete (*i);
357  }
358  itms.erase(itms.begin(), itms.end());
359 }
360 
361 // Find single item identified by its path (exact match)
365  if ( (*i)->path() == path ) return (*i);
366  }
368  if ( (*j)->path() == path ) return (*j);
369  }
370  return 0;
371 }
372 
373 // Add item to output streamer list
374 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
375  MsgStream log(msgSvc(), name());
376  int level = 0;
377  size_t sep = descriptor.rfind("#");
378  std::string obj_path (descriptor,0,sep);
379  std::string slevel (descriptor,sep+1,descriptor.length());
380  if ( slevel == "*" ) {
381  level = 9999999;
382  }
383  else {
384  level = atoi(slevel.c_str());
385  }
386  if ( m_verifyItems ) {
387  size_t idx = obj_path.find("/",1);
388  while(idx != std::string::npos) {
389  std::string sub_item = obj_path.substr(0,idx);
390  if ( 0 == findItem(sub_item) ) {
391  addItem(itms, sub_item+"#1");
392  }
393  idx = obj_path.find("/",idx+1);
394  }
395  }
396  DataStoreItem* item = new DataStoreItem(obj_path, level);
397  if(log.level() <= MSG::DEBUG )
398  log << MSG::DEBUG << "Adding OutputStream item " << item->path()
399  << " with " << item->depth()
400  << " level(s)." << endmsg;
401  itms.push_back( item );
402 }
403 
404 // Connect to proper conversion service
406  StatusCode status = StatusCode(StatusCode::FAILURE, true);
407  MsgStream log(msgSvc(), name());
408  // Get output file from input
409  std::string dbType, svc, shr;
410  Tokenizer tok(true);
411  tok.analyse(m_output, " ", "", "", "=", "'", "'");
412  for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
413  const std::string& tag = (*i).tag();
414  const std::string& val = (*i).value();
415  switch( ::toupper(tag[0]) ) {
416  case 'D':
417  m_outputName = val;
418  break;
419  case 'T':
420  dbType = val;
421  break;
422  case 'S':
423  switch( ::toupper(tag[1]) ) {
424  case 'V': svc = val; break;
425  case 'H': shr = "YES"; break;
426  }
427  break;
428  case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
429  switch( ::toupper(val[0]) ) {
430  case 'R':
431  if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
432  m_outputType = "RECREATE";
433  else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
434  m_outputType = "READ";
435  break;
436  case 'C':
437  case 'N':
438  case 'W':
439  m_outputType = "NEW";
440  break;
441  case 'U':
442  m_outputType = "UPDATE";
443  break;
444  default:
445  m_outputType = "???";
446  break;
447  }
448  break;
449  default:
450  break;
451  }
452  }
453  if ( !shr.empty() ) m_outputType += "|SHARED";
454  // Get access to the default Persistency service
455  // The default service is the same for input as for output.
456  // If this is not desired, then a specialized OutputStream must overwrite
457  // this value.
458  if ( dbType.length() > 0 || svc.length() > 0 ) {
459  std::string typ = dbType.length()>0 ? dbType : svc;
461  if( !ipers.isValid() ) {
462  log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
463  return StatusCode::FAILURE;
464  }
465  IConversionSvc *cnvSvc = 0;
466  status = ipers->getService(typ, cnvSvc);
467  if( !status.isSuccess() ) {
468  log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
469  return status;
470  }
471  // Increase reference count and keep service.
472  m_pConversionSvc = cnvSvc;
473  }
474  else {
475  log << MSG::FATAL
476  << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
477  << "You either have to specify a technology name or a service name!" << endmsg
478  << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
479  return StatusCode::FAILURE;
480  }
481  return StatusCode::SUCCESS;
482 }
483 
486 }
487 
490  if (sc.isFailure()) {
491  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
492  "OutputStream::acceptAlgsHandler",sc);
493  }
494 }
495 
498 }
499 
502  if (sc.isFailure()) {
503  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
504  "OutputStream::requireAlgsHandler",sc);
505  }
506 }
507 
510 }
511 
512 void OutputStream::vetoAlgsHandler( Property& /* theProp */ ) {
514  if (sc.isFailure()) {
515  throw GaudiException("Failure in OutputStream::decodeAlgorithms",
516  "OutputStream::vetoAlgsHandler",sc);
517  }
518 }
519 
521  std::vector<Algorithm*>* theAlgs )
522 {
523  // Reset the list of Algorithms
524  theAlgs->clear( );
525 
526  MsgStream log( msgSvc( ), name( ) );
527 
529 
531  if ( theAlgMgr.isValid() ) {
532  // Build the list of Algorithms from the names list
533  const std::vector<std::string> nameList = theNames.value( );
535  std::vector<std::string>::const_iterator itend = nameList.end( );
536  for (it = nameList.begin(); it != itend; ++it) {
537  // Check whether the supplied name corresponds to an existing
538  // Algorithm object.
539  const std::string &theName = (*it);
540  SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
541  Algorithm* theAlgorithm;
542  if ( theIAlg.isValid() ) {
543  result = StatusCode::SUCCESS;
544  try{
545  theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
546  } catch(...){
547  result = StatusCode::FAILURE;
548  }
549  }
550  if ( result.isSuccess( ) ) {
551  // Check that the specified algorithm doesn't already exist in the list
553  std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
554  for (ita = theAlgs->begin(); ita != itaend; ++ita) {
555  Algorithm* existAlgorithm = (*ita);
556  if ( theAlgorithm == existAlgorithm ) {
557  result = StatusCode::FAILURE;
558  break;
559  }
560  }
561  if ( result.isSuccess( ) ) {
562  theAlgorithm->addRef();
563  theAlgs->push_back( theAlgorithm );
564  }
565  }
566  else {
567  log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
568  }
569  }
570  result = StatusCode::SUCCESS;
571  }
572  else {
573  log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
574  }
575  return result;
576 }
577 
579  typedef std::vector<Algorithm*>::iterator AlgIter;
580  bool result = true;
581 
582  // Loop over all Algorithms in the accept list to see
583  // whether any have been executed and have their filter
584  // passed flag set. Any match causes the event to be
585  // provisionally accepted.
586  if ( ! m_acceptAlgs->empty() ) {
587  result = false;
588  for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
589  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
590  result = true;
591  break;
592  }
593  }
594  }
595 
596  // Loop over all Algorithms in the required list to see
597  // whether all have been executed and have their filter
598  // passed flag set. Any mismatch causes the event to be
599  // rejected.
600  if ( result && ! m_requireAlgs->empty() ) {
601  for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
602  if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
603  result = false;
604  break;
605  }
606  }
607  }
608 
609  // Loop over all Algorithms in the veto list to see
610  // whether any have been executed and have their filter
611  // passed flag set. Any match causes the event to be
612  // rejected.
613  if ( result && ! m_vetoAlgs->empty() ) {
614  for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
615  if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
616  result = false;
617  break;
618  }
619  }
620  }
621  return result;
622 }
623 
625  return !(m_itemNames.empty() && m_optItemNames.empty());
626 }

Generated at Mon Feb 17 2014 14:37:40 for Gaudi Framework, version v25r0 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004