Gaudi Framework, version v21r6

Home   Generated: 11 Nov 2009

OutputStream.cpp

Go to the documentation of this file.
00001 // $Id: OutputStream.cpp,v 1.23 2008/01/15 13:46:52 marcocle Exp $
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003 
00004 // Framework include files
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 #include "GaudiKernel/Incident.h"
00016 #include "GaudiKernel/IIncidentSvc.h"
00017 
00018 #include "GaudiKernel/MsgStream.h"
00019 #include "GaudiKernel/strcasecmp.h"
00020 #include "GaudiKernel/DataObject.h"
00021 #include "GaudiKernel/DataStoreItem.h"
00022 #include "OutputStream.h"
00023 #include "OutputStreamAgent.h"
00024 
00025 // Define the algorithm factory for the standard output data writer
00026 DECLARE_ALGORITHM_FACTORY(OutputStream)
00027 
00028 // Standard Constructor
00029 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00030  : Algorithm(name, pSvcLocator)
00031 {
00032   m_doPreLoad      = true;
00033   m_doPreLoadOpt   = false;
00034   m_verifyItems    = true;
00035   m_output         = "";
00036   m_outputName     = "";
00037   m_outputType     = "UPDATE";
00038   m_storeName      = "EventDataSvc";
00039   m_persName       = "EventPersistencySvc";
00040   m_agent          = new OutputStreamAgent(this);
00041   m_acceptAlgs     = new std::vector<Algorithm*>();
00042   m_requireAlgs    = new std::vector<Algorithm*>();
00043   m_vetoAlgs       = new std::vector<Algorithm*>();
00046   m_fireIncidents  = true;
00047   declareProperty("ItemList",         m_itemNames);
00048   declareProperty("OptItemList",      m_optItemNames);
00049   declareProperty("Preload",          m_doPreLoad);
00050   declareProperty("PreloadOptItems",  m_doPreLoadOpt);
00051   declareProperty("Output",           m_output);
00052   declareProperty("OutputFile",       m_outputName);
00053   declareProperty("EvtDataSvc",       m_storeName);
00054   declareProperty("EvtConversionSvc", m_persName);
00055   declareProperty("AcceptAlgs",       m_acceptNames);
00056   declareProperty("RequireAlgs",      m_requireNames);
00057   declareProperty("VetoAlgs",         m_vetoNames);
00058   declareProperty("VerifyItems",      m_verifyItems);
00061 
00062   // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
00063   m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00064   m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00065   m_vetoNames.declareUpdateHandler   ( &OutputStream::vetoAlgsHandler   , this );
00066 }
00067 
00068 // Standard Destructor
00069 OutputStream::~OutputStream()   {
00070   delete m_agent;
00071   delete m_acceptAlgs;
00072   delete m_requireAlgs;
00073   delete m_vetoAlgs;
00074 }
00075 
00076 // initialize data writer
00077 StatusCode OutputStream::initialize() {
00078   MsgStream log(msgSvc(), name());
00079 
00080   // Reset the number of events written
00081   m_events = 0;
00082   // Get access to the DataManagerSvc
00083   m_pDataManager = serviceLocator()->service(m_storeName);
00084   if( !m_pDataManager.isValid() )   {
00085     log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00086     return StatusCode::FAILURE;
00087   }
00088   // Get access to the IncidentService
00089   m_incidentSvc = serviceLocator()->service("IncidentSvc");
00090   if( !m_incidentSvc.isValid() )  {
00091     log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
00092     return StatusCode::FAILURE;
00093   }
00094   // Get access to the assigned data service
00095   m_pDataProvider = serviceLocator()->service(m_storeName);
00096   if( !m_pDataProvider.isValid() )   {
00097     log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00098     return StatusCode::FAILURE;
00099   }
00100   if ( !(m_itemNames.empty() && m_optItemNames.empty()) )  {
00101     StatusCode status = connectConversionSvc();
00102     if( !status.isSuccess() )   {
00103       log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00104       if(m_outputName!="" && m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00105                                            IncidentType::FailOutputFile));
00106       return status;
00107     }
00108   }
00109 
00110   // Clear the list with optional items
00111   clearItems(m_optItemList);
00112   // Clear the item list
00113   clearItems(m_itemList);
00114 
00115   ItemNames::iterator i;
00116   // Take the new item list from the properties.
00117   for(i = m_itemNames.begin(); i != m_itemNames.end(); i++)   {
00118     addItem( m_itemList, *i );
00119   }
00120 
00121   // Take the new item list from the properties.
00122   for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++)   {
00123     addItem( m_optItemList, *i );
00124   }
00125 
00126   // Take the item list to the data service preload list.
00127   if ( m_doPreLoad )    {
00128     for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++)   {
00129       m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00130     }
00131     // Not working: bad reference counting! pdataSvc->release();
00132   }
00133 
00134   if ( m_doPreLoadOpt )    {
00135     for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00136       m_pDataProvider->addPreLoadItem( *(*j) );
00137     }
00138   }
00139   log << MSG::INFO << "Data source: " << m_storeName  << " output: " << m_output << endmsg;
00140 
00141   // Decode the accept, required and veto Algorithms. The logic is the following:
00142   //  a. The event is accepted if all lists are empty.
00143   //  b. The event is provisionally accepted if any Algorithm in the accept list
00144   //     has been executed and has indicated that its filter is passed. This
00145   //     provisional acceptance can be overridden by the other lists.
00146   //  c. The event is rejected unless all Algorithms in the required list have
00147   //     been executed and have indicated that their filter passed.
00148   //  d. The event is rejected if any Algorithm in the veto list has been
00149   //     executed and has indicated that its filter has passed.
00150   decodeAcceptAlgs ().ignore();
00151   decodeRequireAlgs().ignore();
00152   decodeVetoAlgs   ().ignore();
00153   return StatusCode::SUCCESS;
00154 }
00155 
00156 // terminate data writer
00157 StatusCode OutputStream::finalize() {
00158   MsgStream log(msgSvc(), name());
00159   log << MSG::INFO << "Events output: " << m_events << endmsg;
00160   if(m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00161                                        IncidentType::EndOutputFile));
00162   m_incidentSvc.reset();
00163   m_pDataProvider.reset();
00164   m_pDataManager.reset();
00165   m_pConversionSvc.reset();
00166   clearItems(m_optItemList);
00167   clearItems(m_itemList);
00168   return StatusCode::SUCCESS;
00169 }
00170 
00171 // Work entry point
00172 StatusCode OutputStream::execute() {
00173   // Clear any previously existing item list
00174   clearSelection();
00175   // Test whether this event should be output
00176   if ( isEventAccepted() )  {
00177     StatusCode sc = writeObjects();
00178     clearSelection();
00179     m_events++;
00180     if(sc.isSuccess() && m_fireIncidents) 
00181       m_incidentSvc->fireIncident(Incident(m_outputName,
00182                                            IncidentType::WroteToOutputFile));
00183     else if(m_fireIncidents) 
00184       m_incidentSvc->fireIncident(Incident(m_outputName,
00185                                            IncidentType::FailOutputFile));
00186     return sc;
00187   }
00188   return StatusCode::SUCCESS;
00189 }
00190 
00191 // Select the different objects and write them to file
00192 StatusCode OutputStream::writeObjects()  {
00193   // Connect the output file to the service
00194   StatusCode status = collectObjects();
00195   if ( status.isSuccess() )   {
00196     IDataSelector*  sel = selectedObjects();
00197     if ( sel->begin() != sel->end() )  {
00198       status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00199       if ( status.isSuccess() )   {
00200         // Now pass the collection to the persistency service
00201         IDataSelector::iterator j;
00202         IOpaqueAddress* pAddress = 0;
00203         for ( j = sel->begin(); j != sel->end(); j++ )    {
00204           StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00205           if ( !iret.isSuccess() )      {
00206             status = iret;
00207             continue;
00208           }
00209           IRegistry* pReg = (*j)->registry();
00210           pReg->setAddress(pAddress);
00211         }
00212         for ( j = sel->begin(); j != sel->end(); j++ )    {
00213           IRegistry* pReg = (*j)->registry();
00214           StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00215           if ( !iret.isSuccess() )      {
00216             status = iret;
00217           }
00218         }
00219               // Commit the data if there was no error; otherwise possibly discard
00220         if ( status.isSuccess() )  {
00221           status = m_pConversionSvc->commitOutput(m_outputName, true);
00222         }
00223         else   {
00224           m_pConversionSvc->commitOutput(m_outputName, false);
00225         }
00226       }
00227     }
00228   }
00229   return status;
00230 }
00231 
00232 // Place holder to create configurable data store agent
00233 bool OutputStream::collect(IRegistry* dir, int level)    {
00234   if ( level < m_currentItem->depth() )   {
00235     if ( dir->object() != 0 )   {
00236       /*
00237       std::cout << "Analysing ("
00238                 << dir->name()
00239                 << ") Object:"
00240                 << ((dir->object()==0) ? "UNLOADED" : "LOADED")
00241                 << std::endl;
00242       */
00243       m_objects.push_back(dir->object());
00244       return true;
00245     }
00246   }
00247   return false;
00248 }
00249 
00251 StatusCode OutputStream::collectObjects()   {
00252   MsgStream log(msgSvc(), name());
00253   StatusCode status = StatusCode::SUCCESS;
00254   Items::iterator i;
00255   // Traverse the tree and collect the requested objects
00256   for ( i = m_itemList.begin(); i != m_itemList.end(); i++ )    {
00257     DataObject* obj = 0;
00258     m_currentItem = (*i);
00259     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00260     if ( iret.isSuccess() )  {
00261       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00262       if ( !iret.isSuccess() )  {
00263         status = iret;
00264       }
00265     }
00266     else  {
00267       log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00268           << m_currentItem->path() << endmsg;
00269       status = iret;
00270     }
00271   }
00272   // Traverse the tree and collect the requested objects (tolerate missing itmes here)
00273   for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ )    {
00274     DataObject* obj = 0;
00275     m_currentItem = (*i);
00276     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00277     if ( iret.isSuccess() )  {
00278       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00279     }
00280     if ( !iret.isSuccess() )    {
00281       log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00282           << m_currentItem->path() << endmsg;
00283     }
00284   }
00285   return status;
00286 }
00287 
00288 // Clear collected object list
00289 void OutputStream::clearSelection()     {
00290   m_objects.erase(m_objects.begin(), m_objects.end());
00291 }
00292 
00293 // Remove all items from the output streamer list;
00294 void OutputStream::clearItems(Items& itms)     {
00295   for ( Items::iterator i = itms.begin(); i != itms.end(); i++ )    {
00296     delete (*i);
00297   }
00298   itms.erase(itms.begin(), itms.end());
00299 }
00300 
00301 // Find single item identified by its path (exact match)
00302 DataStoreItem*
00303 OutputStream::findItem(const std::string& path)  {
00304   for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i)  {
00305     if ( (*i)->path() == path )  return (*i);
00306   }
00307   for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j)  {
00308     if ( (*j)->path() == path )  return (*j);
00309   }
00310   return 0;
00311 }
00312 
00313 // Add item to output streamer list
00314 void OutputStream::addItem(Items& itms, const std::string& descriptor)   {
00315         MsgStream log(msgSvc(), name());
00316   int level = 0;
00317   size_t sep = descriptor.rfind("#");
00318   std::string obj_path (descriptor,0,sep);
00319   std::string slevel   (descriptor,sep+1,descriptor.length());
00320   if ( slevel == "*" )  {
00321     level = 9999999;
00322   }
00323   else   {
00324     level = atoi(slevel.c_str());
00325   }
00326   if ( m_verifyItems )  {
00327     size_t idx = obj_path.find("/",1);
00328     while(idx != std::string::npos)  {
00329       std::string sub_item = obj_path.substr(0,idx);
00330       if ( 0 == findItem(sub_item) )   {
00331         addItem(itms, sub_item+"#1");
00332       }
00333       idx = obj_path.find("/",idx+1);
00334     }
00335   }
00336   DataStoreItem* item = new DataStoreItem(obj_path, level);
00337   log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00338       << " with " << item->depth()
00339       << " level(s)." << endmsg;
00340   itms.push_back( item );
00341 }
00342 
00343 // Connect to proper conversion service
00344 StatusCode OutputStream::connectConversionSvc()   {
00345   StatusCode status = StatusCode(StatusCode::FAILURE, true);
00346   MsgStream log(msgSvc(), name());
00347   // Get output file from input
00348   std::string dbType, svc, shr;
00349   Tokenizer tok(true);
00350   tok.analyse(m_output, " ", "", "", "=", "'", "'");
00351   for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i)   {
00352     const std::string& tag = (*i).tag();
00353     const std::string& val = (*i).value();
00354     switch( ::toupper(tag[0]) )    {
00355     case 'D':
00356       m_outputName = val;
00357       break;
00358     case 'T':
00359       dbType = val;
00360       break;
00361     case 'S':
00362       switch( ::toupper(tag[1]) )   {
00363       case 'V':    svc = val;      break;
00364       case 'H':    shr = "YES";    break;
00365       }
00366       break;
00367     case 'O':   // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
00368       switch( ::toupper(val[0]) )   {
00369       case 'R':
00370         if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00371           m_outputType = "RECREATE";
00372         else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00373           m_outputType = "READ";
00374         break;
00375       case 'C':
00376       case 'N':
00377       case 'W':
00378         m_outputType = "NEW";
00379         break;
00380       case 'U':
00381         m_outputType = "UPDATE";
00382         break;
00383       default:
00384         m_outputType = "???";
00385         break;
00386       }
00387       break;
00388     default:
00389       break;
00390     }
00391   }
00392   if ( !shr.empty() ) m_outputType += "|SHARED";
00393   // Get access to the default Persistency service
00394   // The default service is the same for input as for output.
00395   // If this is not desired, then a specialized OutputStream must overwrite
00396   // this value.
00397   if ( dbType.length() > 0 && svc.length()==0 )   {
00398     SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00399     if( !ipers.isValid() )   {
00400       log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00401       return StatusCode::FAILURE;
00402     }
00403     IConversionSvc *cnvSvc = 0;
00404     status = ipers->getService(dbType, cnvSvc);
00405     if( !status.isSuccess() )   {
00406       log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endmsg;
00407       return status;
00408     }
00409     // Increase reference count and keep service.
00410     m_pConversionSvc = cnvSvc;
00411   }
00412   else if ( svc.length() > 0 )    {
00413     // On success reference count is automatically increased.
00414     m_pConversionSvc = serviceLocator()->service(svc);
00415     if( !m_pConversionSvc.isValid() )   {
00416       log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endmsg;
00417       return StatusCode::FAILURE;
00418     }
00419   }
00420   else    {
00421     log << MSG::FATAL
00422         << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00423         << "You either have to specify a technology name or a service name!" << endmsg
00424         << "Please correct the job option \"" << name() << ".Output\" !"     << endmsg;
00425     return StatusCode::FAILURE;
00426   }
00427   return StatusCode::SUCCESS;
00428 }
00429 
00430 StatusCode OutputStream::decodeAcceptAlgs( ) {
00431   return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00432 }
00433 
00434 void OutputStream::acceptAlgsHandler( Property& /* theProp */ )  {
00435   StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00436   if (sc.isFailure()) {
00437     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00438                          "OutputStream::acceptAlgsHandler",sc);
00439   }
00440 }
00441 
00442 StatusCode OutputStream::decodeRequireAlgs( )  {
00443   return decodeAlgorithms( m_requireNames, m_requireAlgs );
00444 }
00445 
00446 void OutputStream::requireAlgsHandler( Property& /* theProp */ )  {
00447   StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00448   if (sc.isFailure()) {
00449     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00450                          "OutputStream::requireAlgsHandler",sc);
00451   }
00452 }
00453 
00454 StatusCode OutputStream::decodeVetoAlgs( )  {
00455   return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00456 }
00457 
00458 void OutputStream::vetoAlgsHandler( Property& /* theProp */ )  {
00459   StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00460   if (sc.isFailure()) {
00461     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00462                          "OutputStream::vetoAlgsHandler",sc);
00463   }
00464 }
00465 
00466 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00467                                            std::vector<Algorithm*>* theAlgs )
00468 {
00469   // Reset the list of Algorithms
00470   theAlgs->clear( );
00471 
00472   MsgStream log( msgSvc( ), name( ) );
00473 
00474   StatusCode result = StatusCode::FAILURE;
00475 
00476   SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00477   if ( theAlgMgr.isValid() ) {
00478     // Build the list of Algorithms from the names list
00479     const std::vector<std::string> nameList = theNames.value( );
00480     std::vector<std::string>::const_iterator it;
00481     std::vector<std::string>::const_iterator itend = nameList.end( );
00482     for (it = nameList.begin(); it != itend; ++it) {
00483       // Check whether the supplied name corresponds to an existing
00484       // Algorithm object.
00485       const std::string &theName = (*it);
00486       SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00487       Algorithm*  theAlgorithm;
00488       if ( theIAlg.isValid() ) {
00489         result = StatusCode::SUCCESS;
00490         try{
00491           theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00492         } catch(...){
00493           result = StatusCode::FAILURE;
00494         }
00495       }
00496       if ( result.isSuccess( ) ) {
00497         // Check that the specified algorithm doesn't already exist in the list
00498         std::vector<Algorithm*>::iterator ita;
00499         std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00500         for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00501           Algorithm* existAlgorithm = (*ita);
00502           if ( theAlgorithm == existAlgorithm ) {
00503             result = StatusCode::FAILURE;
00504             break;
00505           }
00506         }
00507         if ( result.isSuccess( ) ) {
00508           theAlgorithm->addRef();
00509           theAlgs->push_back( theAlgorithm );
00510         }
00511       }
00512       else {
00513         log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00514       }
00515     }
00516     result = StatusCode::SUCCESS;
00517   }
00518   else {
00519     log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00520   }
00521   return result;
00522 }
00523 
00524 bool OutputStream::isEventAccepted( ) const  {
00525   typedef std::vector<Algorithm*>::iterator AlgIter;
00526   bool result = true;
00527 
00528   // Loop over all Algorithms in the accept list to see
00529   // whether any have been executed and have their filter
00530   // passed flag set. Any match causes the event to be
00531   // provisionally accepted.
00532   if ( ! m_acceptAlgs->empty() ) {
00533     result = false;
00534     for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00535       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00536         result = true;
00537         break;
00538       }
00539     }
00540   }
00541 
00542   // Loop over all Algorithms in the required list to see
00543   // whether all have been executed and have their filter
00544   // passed flag set. Any mismatch causes the event to be
00545   // rejected.
00546   if ( result && ! m_requireAlgs->empty() ) {
00547     for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00548       if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00549         result = false;
00550         break;
00551       }
00552     }
00553   }
00554 
00555   // Loop over all Algorithms in the veto list to see
00556   // whether any have been executed and have their filter
00557   // passed flag set. Any match causes the event to be
00558   // rejected.
00559   if ( result && ! m_vetoAlgs->empty() ) {
00560     for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00561       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00562         result = false;
00563         break;
00564       }
00565     }
00566   }
00567   return result;
00568 }

Generated at Wed Nov 11 16:23:14 2009 for Gaudi Framework, version v21r6 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004