Gaudi Framework, version v20r4

Generated: 8 Jan 2009

OutputStream.cpp

Go to the documentation of this file.
00001 // $Id: OutputStream.cpp,v 1.23 2008/01/15 13:46:52 marcocle Exp $
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003 
00004 // Framework include files
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 
00016 #include "GaudiKernel/MsgStream.h"
00017 #include "GaudiKernel/strcasecmp.h"
00018 #include "GaudiKernel/DataObject.h"
00019 #include "GaudiKernel/DataStoreItem.h"
00020 #include "OutputStream.h"
00021 #include "OutputStreamAgent.h"
00022 
00023 // Define the algorithm factory for the standard output data writer
00024 DECLARE_ALGORITHM_FACTORY(OutputStream)
00025 
00026 // Standard Constructor
00027 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00028  : Algorithm(name, pSvcLocator)
00029 {
00030   m_doPreLoad      = true;
00031   m_doPreLoadOpt   = false;
00032   m_verifyItems    = true;
00033   m_output         = "";
00034   m_outputName     = "";
00035   m_outputType     = "UPDATE";
00036   m_storeName      = "EventDataSvc";
00037   m_persName       = "EventPersistencySvc";
00038   m_agent          = new OutputStreamAgent(this);
00039   m_pDataManager   = 0;
00040   m_pDataProvider  = 0;
00041   m_pConversionSvc = 0;
00042   m_acceptAlgs     = new std::vector<Algorithm*>();
00043   m_requireAlgs    = new std::vector<Algorithm*>();
00044   m_vetoAlgs       = new std::vector<Algorithm*>();
00045   declareProperty("ItemList",         m_itemNames);
00046   declareProperty("OptItemList",      m_optItemNames);
00047   declareProperty("Preload",          m_doPreLoad);
00048   declareProperty("PreloadOptItems",  m_doPreLoadOpt);
00049   declareProperty("Output",           m_output);
00050   declareProperty("OutputFile",       m_outputName);
00051   declareProperty("EvtDataSvc",       m_storeName);
00052   declareProperty("EvtConversionSvc", m_persName);
00053   declareProperty("AcceptAlgs",       m_acceptNames);
00054   declareProperty("RequireAlgs",      m_requireNames);
00055   declareProperty("VetoAlgs",         m_vetoNames);
00056   declareProperty("VerifyItems",      m_verifyItems);
00057 
00058   // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
00059   m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00060   m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00061   m_vetoNames.declareUpdateHandler   ( &OutputStream::vetoAlgsHandler   , this );
00062 }
00063 
00064 // Standard Destructor
00065 OutputStream::~OutputStream()   {
00066   delete m_agent;
00067   delete m_acceptAlgs;
00068   delete m_requireAlgs;
00069   delete m_vetoAlgs;
00070 }
00071 
00072 // initialize data writer
00073 StatusCode OutputStream::initialize() {
00074   StatusCode status = StatusCode::SUCCESS;
00075   MsgStream log(msgSvc(), name());
00076 
00077   // Reset the number of events written
00078   m_events = 0;
00079   // Get access to the DataManagerSvc
00080   status = serviceLocator()->service(m_storeName, m_pDataManager, true);
00081   if( !status.isSuccess() )   {
00082     log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endreq;
00083     return status;
00084   }
00085   // Get access to the assigned data service
00086   status = serviceLocator()->service(m_storeName, m_pDataProvider, true);
00087   if( !status.isSuccess() )   {
00088     log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endreq;
00089     return status;
00090   }
00091   if ( !(m_itemNames.empty() && m_optItemNames.empty()) )  {
00092     status = connectConversionSvc();
00093     if( !status.isSuccess() )   {
00094       log << MSG::FATAL << "Unable to connect to conversion service." << endreq;
00095       return status;
00096     }
00097   }
00098 
00099   // Clear the list with optional items
00100   clearItems(m_optItemList);
00101   // Clear the item list
00102   clearItems(m_itemList);
00103 
00104   ItemNames::iterator i;
00105   // Take the new item list from the properties.
00106   for(i = m_itemNames.begin(); i != m_itemNames.end(); i++)   {
00107     addItem( m_itemList, *i );
00108   }
00109 
00110   // Take the new item list from the properties.
00111   for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++)   {
00112     addItem( m_optItemList, *i );
00113   }
00114 
00115   // Take the item list to the data service preload list.
00116   if ( m_doPreLoad )    {
00117     for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++)   {
00118       m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00119     }
00120     // Not working: bad reference counting! pdataSvc->release();
00121   }
00122 
00123   if ( m_doPreLoadOpt )    {
00124     for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00125       m_pDataProvider->addPreLoadItem( *(*j) );
00126     }
00127   }
00128   log << MSG::INFO << "Data source: " << m_storeName  << " output: " << m_output << endreq;
00129 
00130   // Decode the accept, required and veto Algorithms. The logic is the following:
00131   //  a. The event is accepted if all lists are empty.
00132   //  b. The event is provisionally accepted if any Algorithm in the accept list
00133   //     has been executed and has indicated that its filter is passed. This
00134   //     provisional acceptance can be overridden by the other lists.
00135   //  c. The event is rejected unless all Algorithms in the required list have
00136   //     been executed and have indicated that their filter passed.
00137   //  d. The event is rejected if any Algorithm in the veto list has been
00138   //     executed and has indicated that its filter has passed.
00139   decodeAcceptAlgs ().ignore();
00140   decodeRequireAlgs().ignore();
00141   decodeVetoAlgs   ().ignore();
00142   return status;
00143 }
00144 
00145 // terminate data writer
00146 StatusCode OutputStream::finalize() {
00147   MsgStream log(msgSvc(), name());
00148   log << MSG::INFO << "Events output: " << m_events << endreq;
00149   if ( m_pDataProvider ) m_pDataProvider->release();
00150   m_pDataProvider = 0;
00151   if ( m_pDataManager ) m_pDataManager->release();
00152   m_pDataManager = 0;
00153   if ( m_pConversionSvc ) m_pConversionSvc->release();
00154   m_pConversionSvc = 0;
00155   clearItems(m_optItemList);
00156   clearItems(m_itemList);
00157         return StatusCode::SUCCESS;
00158 }
00159 
00160 // Work entry point
00161 StatusCode OutputStream::execute() {
00162   // Clear any previously existing item list
00163   clearSelection();
00164   // Test whether this event should be output
00165   if ( isEventAccepted() )  {
00166     StatusCode sc = writeObjects();
00167     clearSelection();
00168     m_events++;
00169     return sc;
00170   }
00171   return StatusCode::SUCCESS;
00172 }
00173 
00174 // Select the different objects and write them to file 
00175 StatusCode OutputStream::writeObjects()  {
00176   // Connect the output file to the service
00177   StatusCode status = collectObjects();
00178   if ( status.isSuccess() )   {
00179     IDataSelector*  sel = selectedObjects();
00180     if ( sel->begin() != sel->end() )  {
00181       status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00182       if ( status.isSuccess() )   {
00183         // Now pass the collection to the persistency service
00184         IDataSelector::iterator j;
00185         IOpaqueAddress* pAddress = 0;
00186         for ( j = sel->begin(); j != sel->end(); j++ )    {
00187           StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00188           if ( !iret.isSuccess() )      {
00189             status = iret;
00190             continue;
00191           }
00192           IRegistry* pReg = (*j)->registry();
00193           pReg->setAddress(pAddress);
00194         }
00195         for ( j = sel->begin(); j != sel->end(); j++ )    {
00196           IRegistry* pReg = (*j)->registry();
00197           StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00198           if ( !iret.isSuccess() )      {
00199             status = iret;
00200           }
00201         }
00202               // Commit the data if there was no error; otherwise possibly discard
00203         if ( status.isSuccess() )  {
00204           status = m_pConversionSvc->commitOutput(m_outputName, true);
00205         }
00206         else   {
00207           m_pConversionSvc->commitOutput(m_outputName, false);
00208         }
00209       }
00210     }
00211   }
00212   return status;
00213 }
00214 
00215 // Place holder to create configurable data store agent
00216 bool OutputStream::collect(IRegistry* dir, int level)    {
00217   if ( level < m_currentItem->depth() )   {
00218     if ( dir->object() != 0 )   {
00219       /*
00220       std::cout << "Analysing (" 
00221                 << dir->name() 
00222                 << ") Object:" 
00223                 << ((dir->object()==0) ? "UNLOADED" : "LOADED") 
00224                 << std::endl;
00225       */
00226       m_objects.push_back(dir->object());
00227       return true;
00228     }
00229   }
00230   return false;
00231 }
00232 
00234 StatusCode OutputStream::collectObjects()   {
00235   MsgStream log(msgSvc(), name());
00236   StatusCode status = StatusCode::SUCCESS;
00237   Items::iterator i;
00238   // Traverse the tree and collect the requested objects
00239   for ( i = m_itemList.begin(); i != m_itemList.end(); i++ )    {
00240     DataObject* obj = 0;
00241     m_currentItem = (*i);
00242     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00243     if ( iret.isSuccess() )  {
00244       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00245       if ( !iret.isSuccess() )  {
00246         status = iret;
00247       }
00248     }
00249     else  {
00250       log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) " 
00251           << m_currentItem->path() << endreq;
00252       status = iret;
00253     }
00254   }
00255   // Traverse the tree and collect the requested objects (tolerate missing itmes here)
00256   for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ )    {
00257     DataObject* obj = 0;
00258     m_currentItem = (*i);
00259     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00260     if ( iret.isSuccess() )  {
00261       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00262     }
00263     if ( !iret.isSuccess() )    {
00264       log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) " 
00265           << m_currentItem->path() << endreq;
00266     }
00267   }
00268   return status;
00269 }
00270 
00271 // Clear collected object list
00272 void OutputStream::clearSelection()     {
00273   m_objects.erase(m_objects.begin(), m_objects.end());
00274 }
00275 
00276 // Remove all items from the output streamer list;
00277 void OutputStream::clearItems(Items& itms)     {
00278   for ( Items::iterator i = itms.begin(); i != itms.end(); i++ )    {  
00279     delete (*i);
00280   }
00281   itms.erase(itms.begin(), itms.end());
00282 }
00283 
00284 // Find single item identified by its path (exact match)
00285 DataStoreItem* 
00286 OutputStream::findItem(const std::string& path)  {
00287   for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i)  {
00288     if ( (*i)->path() == path )  return (*i);
00289   }
00290   for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j)  {
00291     if ( (*j)->path() == path )  return (*j);
00292   }
00293   return 0;
00294 }
00295 
00296 // Add item to output streamer list
00297 void OutputStream::addItem(Items& itms, const std::string& descriptor)   {
00298         MsgStream log(msgSvc(), name());
00299   int level = 0;
00300   size_t sep = descriptor.rfind("#");
00301   std::string obj_path (descriptor,0,sep);
00302   std::string slevel   (descriptor,sep+1,descriptor.length());
00303   if ( slevel == "*" )  {
00304     level = 9999999;
00305   }
00306   else   {
00307     level = atoi(slevel.c_str());
00308   }
00309   if ( m_verifyItems )  {
00310     size_t idx = obj_path.find("/",1);
00311     while(idx != std::string::npos)  {
00312       std::string sub_item = obj_path.substr(0,idx);
00313       if ( 0 == findItem(sub_item) )   {
00314         addItem(itms, sub_item+"#1");
00315       }
00316       idx = obj_path.find("/",idx+1);
00317     }
00318   }
00319   DataStoreItem* item = new DataStoreItem(obj_path, level);
00320   log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00321       << " with " << item->depth() 
00322       << " level(s)." << endreq;
00323   itms.push_back( item );
00324 }
00325 
00326 // Connect to proper conversion service
00327 StatusCode OutputStream::connectConversionSvc()   {
00328   StatusCode status = StatusCode::FAILURE;
00329         MsgStream log(msgSvc(), name());
00330   // Get output file from input
00331   std::string dbType, svc, shr;
00332   Tokenizer tok(true);
00333   tok.analyse(m_output, " ", "", "", "=", "'", "'");
00334   for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i)   {
00335     const std::string& tag = (*i).tag();
00336     const std::string& val = (*i).value();
00337     switch( ::toupper(tag[0]) )    {
00338     case 'D':
00339       m_outputName = val;
00340       break;
00341     case 'T':
00342       dbType = val;
00343       break;
00344     case 'S':
00345       switch( ::toupper(tag[1]) )   {
00346       case 'V':    svc = val;      break;
00347       case 'H':    shr = "YES";    break;
00348       }
00349       break;
00350     case 'O':   // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
00351       switch( ::toupper(val[0]) )   {
00352       case 'R':
00353         if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00354           m_outputType = "RECREATE";
00355         else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00356           m_outputType = "READ";
00357         break;
00358       case 'C':
00359       case 'N':
00360       case 'W':
00361         m_outputType = "NEW";
00362         break;
00363       case 'U':
00364         m_outputType = "UPDATE";
00365         break;
00366       default:
00367         m_outputType = "???";
00368         break;
00369       }
00370       break;
00371     default:
00372       break;
00373     }
00374   }
00375   if ( !shr.empty() ) m_outputType += "|SHARED";
00376   // Get access to the default Persistency service
00377   // The default service is the same for input as for output.
00378   // If this is not desired, then a specialized OutputStream must overwrite 
00379   // this value.
00380   if ( dbType.length() > 0 && svc.length()==0 )   {
00381     IPersistencySvc* ipers = 0;
00382     status = serviceLocator()->service(m_persName, ipers );
00383     if( !status.isSuccess() )   {
00384       log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endreq;
00385       return status;
00386     }
00387     status = ipers->getService(dbType, m_pConversionSvc);
00388     if( !status.isSuccess() )   {
00389       log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endreq;
00390       return status;
00391     }
00392     // Increase reference count and keep service.
00393     m_pConversionSvc->addRef();
00394   }
00395   else if ( svc.length() > 0 )    {
00396     // On success reference count is automatically increased.
00397     status = serviceLocator()->service(svc, m_pConversionSvc, true );
00398     if( !status.isSuccess() )   {
00399       log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endreq;
00400       return status;
00401     }
00402   }
00403   else    {
00404     log << MSG::FATAL
00405         << "Unable to locate IConversionSvc interface (Unknown technology) " << endreq
00406         << "You either have to specify a technology name or a service name!" << endreq
00407         << "Please correct the job option \"" << name() << ".Output\" !"     << endreq;
00408     status = StatusCode::FAILURE;
00409   }
00410   return status;
00411 }
00412 
00413 StatusCode OutputStream::decodeAcceptAlgs( ) {
00414   return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00415 }
00416 
00417 void OutputStream::acceptAlgsHandler( Property& /* theProp */ )  {
00418   StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00419   if (sc.isFailure()) {
00420     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00421                          "OutputStream::acceptAlgsHandler",sc);
00422   }
00423 }
00424 
00425 StatusCode OutputStream::decodeRequireAlgs( )  {
00426   return decodeAlgorithms( m_requireNames, m_requireAlgs );
00427 }
00428 
00429 void OutputStream::requireAlgsHandler( Property& /* theProp */ )  {
00430   StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00431   if (sc.isFailure()) {
00432     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00433                          "OutputStream::requireAlgsHandler",sc);
00434   }
00435 }
00436 
00437 StatusCode OutputStream::decodeVetoAlgs( )  {
00438   return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00439 }
00440 
00441 void OutputStream::vetoAlgsHandler( Property& /* theProp */ )  {
00442   StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00443   if (sc.isFailure()) {
00444     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00445                          "OutputStream::vetoAlgsHandler",sc);
00446   }
00447 }
00448 
00449 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00450                                            std::vector<Algorithm*>* theAlgs )
00451 {
00452   // Reset the list of Algorithms
00453   theAlgs->clear( );
00454 
00455   MsgStream log( msgSvc( ), name( ) );
00456 
00457   IAlgManager* theAlgMgr;
00458   StatusCode result = serviceLocator()->getService( "ApplicationMgr",
00459     IID_IAlgManager,
00460     *pp_cast<IInterface>(&theAlgMgr) );
00461   if ( result.isSuccess( ) ) {
00462     // Build the list of Algorithms from the names list
00463     const std::vector<std::string> nameList = theNames.value( );
00464     std::vector<std::string>::const_iterator it;
00465     std::vector<std::string>::const_iterator itend = nameList.end( );
00466     for (it = nameList.begin(); it != itend; it++) {
00467       // Check whether the suppied name corresponds to an existing
00468       // Algorithm object.
00469       std::string theName = (*it);
00470       IAlgorithm* theIAlg;
00471       Algorithm*  theAlgorithm;
00472       result = theAlgMgr->getAlgorithm( theName, theIAlg );
00473       if ( result.isSuccess( ) ) {
00474         try{
00475           theAlgorithm = dynamic_cast<Algorithm*>(theIAlg);
00476         } catch(...){
00477           result = StatusCode::FAILURE;
00478         }
00479       }
00480       if ( result.isSuccess( ) ) {
00481         // Check that the specified algorithm doesn't already exist in the list
00482         std::vector<Algorithm*>::iterator ita;
00483         std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00484         for (ita = theAlgs->begin(); ita != itaend; ita++) {
00485           Algorithm* existAlgorithm = (*ita);
00486           if ( theAlgorithm == existAlgorithm ) {
00487             result = StatusCode::FAILURE;
00488             break;
00489           }
00490         }
00491         if ( result.isSuccess( ) ) {
00492           theAlgs->push_back( theAlgorithm );
00493         }
00494       }
00495       else {
00496         log << MSG::INFO << theName << " doesn't exist - ignored" << endreq;
00497       }
00498     }
00499     result = StatusCode::SUCCESS;
00500   }
00501   else {
00502     log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endreq;
00503   }
00504   return result;
00505 }
00506 
00507 bool OutputStream::isEventAccepted( ) const  {
00508   typedef std::vector<Algorithm*>::iterator AlgIter;
00509   bool result = true;
00510 
00511   // Loop over all Algorithms in the accept list to see
00512   // whether any have been executed and have their filter
00513   // passed flag set. Any match causes the event to be
00514   // provisionally accepted.
00515   if ( ! m_acceptAlgs->empty() ) {
00516     result = false;
00517     for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00518       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00519         result = true;
00520         break;
00521       }
00522     }
00523   }
00524 
00525   // Loop over all Algorithms in the required list to see
00526   // whether all have been executed and have their filter
00527   // passed flag set. Any mismatch causes the event to be
00528   // rejected.
00529   if ( result && ! m_requireAlgs->empty() ) {
00530     for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00531       if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00532         result = false;
00533         break;
00534       }
00535     }
00536   }
00537 
00538   // Loop over all Algorithms in the veto list to see
00539   // whether any have been executed and have their filter
00540   // passed flag set. Any match causes the event to be
00541   // rejected.
00542   if ( result && ! m_vetoAlgs->empty() ) {
00543     for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00544       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00545         result = false;
00546         break;
00547       }
00548     }
00549   }
00550   return result;
00551 }

Generated at Thu Jan 8 17:44:24 2009 for Gaudi Framework, version v20r4 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004