Gaudi Framework, version v21r4

Home   Generated: 7 Sep 2009

OutputStream.cpp

Go to the documentation of this file.
00001 // $Id: OutputStream.cpp,v 1.23 2008/01/15 13:46:52 marcocle Exp $
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003 
00004 // Framework include files
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 
00016 #include "GaudiKernel/MsgStream.h"
00017 #include "GaudiKernel/strcasecmp.h"
00018 #include "GaudiKernel/DataObject.h"
00019 #include "GaudiKernel/DataStoreItem.h"
00020 #include "OutputStream.h"
00021 #include "OutputStreamAgent.h"
00022 
00023 // Define the algorithm factory for the standard output data writer
00024 DECLARE_ALGORITHM_FACTORY(OutputStream)
00025 
00026 // Standard Constructor
00027 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00028  : Algorithm(name, pSvcLocator)
00029 {
00030   m_doPreLoad      = true;
00031   m_doPreLoadOpt   = false;
00032   m_verifyItems    = true;
00033   m_output         = "";
00034   m_outputName     = "";
00035   m_outputType     = "UPDATE";
00036   m_storeName      = "EventDataSvc";
00037   m_persName       = "EventPersistencySvc";
00038   m_agent          = new OutputStreamAgent(this);
00039   m_acceptAlgs     = new std::vector<Algorithm*>();
00040   m_requireAlgs    = new std::vector<Algorithm*>();
00041   m_vetoAlgs       = new std::vector<Algorithm*>();
00042   declareProperty("ItemList",         m_itemNames);
00043   declareProperty("OptItemList",      m_optItemNames);
00044   declareProperty("Preload",          m_doPreLoad);
00045   declareProperty("PreloadOptItems",  m_doPreLoadOpt);
00046   declareProperty("Output",           m_output);
00047   declareProperty("OutputFile",       m_outputName);
00048   declareProperty("EvtDataSvc",       m_storeName);
00049   declareProperty("EvtConversionSvc", m_persName);
00050   declareProperty("AcceptAlgs",       m_acceptNames);
00051   declareProperty("RequireAlgs",      m_requireNames);
00052   declareProperty("VetoAlgs",         m_vetoNames);
00053   declareProperty("VerifyItems",      m_verifyItems);
00054 
00055   // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties
00056   m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00057   m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00058   m_vetoNames.declareUpdateHandler   ( &OutputStream::vetoAlgsHandler   , this );
00059 }
00060 
00061 // Standard Destructor
00062 OutputStream::~OutputStream()   {
00063   delete m_agent;
00064   delete m_acceptAlgs;
00065   delete m_requireAlgs;
00066   delete m_vetoAlgs;
00067 }
00068 
00069 // initialize data writer
00070 StatusCode OutputStream::initialize() {
00071   MsgStream log(msgSvc(), name());
00072 
00073   // Reset the number of events written
00074   m_events = 0;
00075   // Get access to the DataManagerSvc
00076   m_pDataManager = serviceLocator()->service(m_storeName);
00077   if( !m_pDataManager.isValid() )   {
00078     log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00079     return StatusCode::FAILURE;
00080   }
00081   // Get access to the assigned data service
00082   m_pDataProvider = serviceLocator()->service(m_storeName);
00083   if( !m_pDataProvider.isValid() )   {
00084     log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00085     return StatusCode::FAILURE;
00086   }
00087   if ( !(m_itemNames.empty() && m_optItemNames.empty()) )  {
00088     StatusCode status = connectConversionSvc();
00089     if( !status.isSuccess() )   {
00090       log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00091       return status;
00092     }
00093   }
00094 
00095   // Clear the list with optional items
00096   clearItems(m_optItemList);
00097   // Clear the item list
00098   clearItems(m_itemList);
00099 
00100   ItemNames::iterator i;
00101   // Take the new item list from the properties.
00102   for(i = m_itemNames.begin(); i != m_itemNames.end(); i++)   {
00103     addItem( m_itemList, *i );
00104   }
00105 
00106   // Take the new item list from the properties.
00107   for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++)   {
00108     addItem( m_optItemList, *i );
00109   }
00110 
00111   // Take the item list to the data service preload list.
00112   if ( m_doPreLoad )    {
00113     for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++)   {
00114       m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00115     }
00116     // Not working: bad reference counting! pdataSvc->release();
00117   }
00118 
00119   if ( m_doPreLoadOpt )    {
00120     for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00121       m_pDataProvider->addPreLoadItem( *(*j) );
00122     }
00123   }
00124   log << MSG::INFO << "Data source: " << m_storeName  << " output: " << m_output << endmsg;
00125 
00126   // Decode the accept, required and veto Algorithms. The logic is the following:
00127   //  a. The event is accepted if all lists are empty.
00128   //  b. The event is provisionally accepted if any Algorithm in the accept list
00129   //     has been executed and has indicated that its filter is passed. This
00130   //     provisional acceptance can be overridden by the other lists.
00131   //  c. The event is rejected unless all Algorithms in the required list have
00132   //     been executed and have indicated that their filter passed.
00133   //  d. The event is rejected if any Algorithm in the veto list has been
00134   //     executed and has indicated that its filter has passed.
00135   decodeAcceptAlgs ().ignore();
00136   decodeRequireAlgs().ignore();
00137   decodeVetoAlgs   ().ignore();
00138   return StatusCode::SUCCESS;
00139 }
00140 
00141 // terminate data writer
00142 StatusCode OutputStream::finalize() {
00143   MsgStream log(msgSvc(), name());
00144   log << MSG::INFO << "Events output: " << m_events << endmsg;
00145   if ( m_pDataProvider ) m_pDataProvider->release();
00146   m_pDataProvider = 0;
00147   if ( m_pDataManager ) m_pDataManager->release();
00148   m_pDataManager = 0;
00149   if ( m_pConversionSvc ) m_pConversionSvc->release();
00150   m_pConversionSvc = 0;
00151   clearItems(m_optItemList);
00152   clearItems(m_itemList);
00153   return StatusCode::SUCCESS;
00154 }
00155 
00156 // Work entry point
00157 StatusCode OutputStream::execute() {
00158   // Clear any previously existing item list
00159   clearSelection();
00160   // Test whether this event should be output
00161   if ( isEventAccepted() )  {
00162     StatusCode sc = writeObjects();
00163     clearSelection();
00164     m_events++;
00165     return sc;
00166   }
00167   return StatusCode::SUCCESS;
00168 }
00169 
00170 // Select the different objects and write them to file
00171 StatusCode OutputStream::writeObjects()  {
00172   // Connect the output file to the service
00173   StatusCode status = collectObjects();
00174   if ( status.isSuccess() )   {
00175     IDataSelector*  sel = selectedObjects();
00176     if ( sel->begin() != sel->end() )  {
00177       status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00178       if ( status.isSuccess() )   {
00179         // Now pass the collection to the persistency service
00180         IDataSelector::iterator j;
00181         IOpaqueAddress* pAddress = 0;
00182         for ( j = sel->begin(); j != sel->end(); j++ )    {
00183           StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00184           if ( !iret.isSuccess() )      {
00185             status = iret;
00186             continue;
00187           }
00188           IRegistry* pReg = (*j)->registry();
00189           pReg->setAddress(pAddress);
00190         }
00191         for ( j = sel->begin(); j != sel->end(); j++ )    {
00192           IRegistry* pReg = (*j)->registry();
00193           StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00194           if ( !iret.isSuccess() )      {
00195             status = iret;
00196           }
00197         }
00198               // Commit the data if there was no error; otherwise possibly discard
00199         if ( status.isSuccess() )  {
00200           status = m_pConversionSvc->commitOutput(m_outputName, true);
00201         }
00202         else   {
00203           m_pConversionSvc->commitOutput(m_outputName, false);
00204         }
00205       }
00206     }
00207   }
00208   return status;
00209 }
00210 
00211 // Place holder to create configurable data store agent
00212 bool OutputStream::collect(IRegistry* dir, int level)    {
00213   if ( level < m_currentItem->depth() )   {
00214     if ( dir->object() != 0 )   {
00215       /*
00216       std::cout << "Analysing ("
00217                 << dir->name()
00218                 << ") Object:"
00219                 << ((dir->object()==0) ? "UNLOADED" : "LOADED")
00220                 << std::endl;
00221       */
00222       m_objects.push_back(dir->object());
00223       return true;
00224     }
00225   }
00226   return false;
00227 }
00228 
00230 StatusCode OutputStream::collectObjects()   {
00231   MsgStream log(msgSvc(), name());
00232   StatusCode status = StatusCode::SUCCESS;
00233   Items::iterator i;
00234   // Traverse the tree and collect the requested objects
00235   for ( i = m_itemList.begin(); i != m_itemList.end(); i++ )    {
00236     DataObject* obj = 0;
00237     m_currentItem = (*i);
00238     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00239     if ( iret.isSuccess() )  {
00240       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00241       if ( !iret.isSuccess() )  {
00242         status = iret;
00243       }
00244     }
00245     else  {
00246       log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00247           << m_currentItem->path() << endmsg;
00248       status = iret;
00249     }
00250   }
00251   // Traverse the tree and collect the requested objects (tolerate missing itmes here)
00252   for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ )    {
00253     DataObject* obj = 0;
00254     m_currentItem = (*i);
00255     StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00256     if ( iret.isSuccess() )  {
00257       iret = m_pDataManager->traverseSubTree(obj, m_agent);
00258     }
00259     if ( !iret.isSuccess() )    {
00260       log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00261           << m_currentItem->path() << endmsg;
00262     }
00263   }
00264   return status;
00265 }
00266 
00267 // Clear collected object list
00268 void OutputStream::clearSelection()     {
00269   m_objects.erase(m_objects.begin(), m_objects.end());
00270 }
00271 
00272 // Remove all items from the output streamer list;
00273 void OutputStream::clearItems(Items& itms)     {
00274   for ( Items::iterator i = itms.begin(); i != itms.end(); i++ )    {
00275     delete (*i);
00276   }
00277   itms.erase(itms.begin(), itms.end());
00278 }
00279 
00280 // Find single item identified by its path (exact match)
00281 DataStoreItem*
00282 OutputStream::findItem(const std::string& path)  {
00283   for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i)  {
00284     if ( (*i)->path() == path )  return (*i);
00285   }
00286   for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j)  {
00287     if ( (*j)->path() == path )  return (*j);
00288   }
00289   return 0;
00290 }
00291 
00292 // Add item to output streamer list
00293 void OutputStream::addItem(Items& itms, const std::string& descriptor)   {
00294         MsgStream log(msgSvc(), name());
00295   int level = 0;
00296   size_t sep = descriptor.rfind("#");
00297   std::string obj_path (descriptor,0,sep);
00298   std::string slevel   (descriptor,sep+1,descriptor.length());
00299   if ( slevel == "*" )  {
00300     level = 9999999;
00301   }
00302   else   {
00303     level = atoi(slevel.c_str());
00304   }
00305   if ( m_verifyItems )  {
00306     size_t idx = obj_path.find("/",1);
00307     while(idx != std::string::npos)  {
00308       std::string sub_item = obj_path.substr(0,idx);
00309       if ( 0 == findItem(sub_item) )   {
00310         addItem(itms, sub_item+"#1");
00311       }
00312       idx = obj_path.find("/",idx+1);
00313     }
00314   }
00315   DataStoreItem* item = new DataStoreItem(obj_path, level);
00316   log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00317       << " with " << item->depth()
00318       << " level(s)." << endmsg;
00319   itms.push_back( item );
00320 }
00321 
00322 // Connect to proper conversion service
00323 StatusCode OutputStream::connectConversionSvc()   {
00324   StatusCode status = StatusCode(StatusCode::FAILURE, true);
00325   MsgStream log(msgSvc(), name());
00326   // Get output file from input
00327   std::string dbType, svc, shr;
00328   Tokenizer tok(true);
00329   tok.analyse(m_output, " ", "", "", "=", "'", "'");
00330   for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i)   {
00331     const std::string& tag = (*i).tag();
00332     const std::string& val = (*i).value();
00333     switch( ::toupper(tag[0]) )    {
00334     case 'D':
00335       m_outputName = val;
00336       break;
00337     case 'T':
00338       dbType = val;
00339       break;
00340     case 'S':
00341       switch( ::toupper(tag[1]) )   {
00342       case 'V':    svc = val;      break;
00343       case 'H':    shr = "YES";    break;
00344       }
00345       break;
00346     case 'O':   // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>'
00347       switch( ::toupper(val[0]) )   {
00348       case 'R':
00349         if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00350           m_outputType = "RECREATE";
00351         else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00352           m_outputType = "READ";
00353         break;
00354       case 'C':
00355       case 'N':
00356       case 'W':
00357         m_outputType = "NEW";
00358         break;
00359       case 'U':
00360         m_outputType = "UPDATE";
00361         break;
00362       default:
00363         m_outputType = "???";
00364         break;
00365       }
00366       break;
00367     default:
00368       break;
00369     }
00370   }
00371   if ( !shr.empty() ) m_outputType += "|SHARED";
00372   // Get access to the default Persistency service
00373   // The default service is the same for input as for output.
00374   // If this is not desired, then a specialized OutputStream must overwrite
00375   // this value.
00376   if ( dbType.length() > 0 && svc.length()==0 )   {
00377     SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00378     if( !ipers.isValid() )   {
00379       log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00380       return StatusCode::FAILURE;
00381     }
00382     IConversionSvc *cnvSvc = 0;
00383     status = ipers->getService(dbType, cnvSvc);
00384     if( !status.isSuccess() )   {
00385       log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endmsg;
00386       return status;
00387     }
00388     // Increase reference count and keep service.
00389     m_pConversionSvc = cnvSvc;
00390   }
00391   else if ( svc.length() > 0 )    {
00392     // On success reference count is automatically increased.
00393     m_pConversionSvc = serviceLocator()->service(svc);
00394     if( !m_pConversionSvc.isValid() )   {
00395       log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endmsg;
00396       return StatusCode::FAILURE;
00397     }
00398   }
00399   else    {
00400     log << MSG::FATAL
00401         << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00402         << "You either have to specify a technology name or a service name!" << endmsg
00403         << "Please correct the job option \"" << name() << ".Output\" !"     << endmsg;
00404     return StatusCode::FAILURE;
00405   }
00406   return StatusCode::SUCCESS;
00407 }
00408 
00409 StatusCode OutputStream::decodeAcceptAlgs( ) {
00410   return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00411 }
00412 
00413 void OutputStream::acceptAlgsHandler( Property& /* theProp */ )  {
00414   StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00415   if (sc.isFailure()) {
00416     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00417                          "OutputStream::acceptAlgsHandler",sc);
00418   }
00419 }
00420 
00421 StatusCode OutputStream::decodeRequireAlgs( )  {
00422   return decodeAlgorithms( m_requireNames, m_requireAlgs );
00423 }
00424 
00425 void OutputStream::requireAlgsHandler( Property& /* theProp */ )  {
00426   StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00427   if (sc.isFailure()) {
00428     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00429                          "OutputStream::requireAlgsHandler",sc);
00430   }
00431 }
00432 
00433 StatusCode OutputStream::decodeVetoAlgs( )  {
00434   return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00435 }
00436 
00437 void OutputStream::vetoAlgsHandler( Property& /* theProp */ )  {
00438   StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00439   if (sc.isFailure()) {
00440     throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00441                          "OutputStream::vetoAlgsHandler",sc);
00442   }
00443 }
00444 
00445 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00446                                            std::vector<Algorithm*>* theAlgs )
00447 {
00448   // Reset the list of Algorithms
00449   theAlgs->clear( );
00450 
00451   MsgStream log( msgSvc( ), name( ) );
00452 
00453   StatusCode result = StatusCode::FAILURE;
00454 
00455   SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00456   if ( theAlgMgr.isValid() ) {
00457     // Build the list of Algorithms from the names list
00458     const std::vector<std::string> nameList = theNames.value( );
00459     std::vector<std::string>::const_iterator it;
00460     std::vector<std::string>::const_iterator itend = nameList.end( );
00461     for (it = nameList.begin(); it != itend; ++it) {
00462       // Check whether the supplied name corresponds to an existing
00463       // Algorithm object.
00464       const std::string &theName = (*it);
00465       SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00466       Algorithm*  theAlgorithm;
00467       if ( theIAlg.isValid() ) {
00468         result = StatusCode::SUCCESS;
00469         try{
00470           theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00471         } catch(...){
00472           result = StatusCode::FAILURE;
00473         }
00474       }
00475       if ( result.isSuccess( ) ) {
00476         // Check that the specified algorithm doesn't already exist in the list
00477         std::vector<Algorithm*>::iterator ita;
00478         std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00479         for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00480           Algorithm* existAlgorithm = (*ita);
00481           if ( theAlgorithm == existAlgorithm ) {
00482             result = StatusCode::FAILURE;
00483             break;
00484           }
00485         }
00486         if ( result.isSuccess( ) ) {
00487           theAlgorithm->addRef();
00488           theAlgs->push_back( theAlgorithm );
00489         }
00490       }
00491       else {
00492         log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00493       }
00494     }
00495     result = StatusCode::SUCCESS;
00496   }
00497   else {
00498     log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00499   }
00500   return result;
00501 }
00502 
00503 bool OutputStream::isEventAccepted( ) const  {
00504   typedef std::vector<Algorithm*>::iterator AlgIter;
00505   bool result = true;
00506 
00507   // Loop over all Algorithms in the accept list to see
00508   // whether any have been executed and have their filter
00509   // passed flag set. Any match causes the event to be
00510   // provisionally accepted.
00511   if ( ! m_acceptAlgs->empty() ) {
00512     result = false;
00513     for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00514       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00515         result = true;
00516         break;
00517       }
00518     }
00519   }
00520 
00521   // Loop over all Algorithms in the required list to see
00522   // whether all have been executed and have their filter
00523   // passed flag set. Any mismatch causes the event to be
00524   // rejected.
00525   if ( result && ! m_requireAlgs->empty() ) {
00526     for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00527       if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00528         result = false;
00529         break;
00530       }
00531     }
00532   }
00533 
00534   // Loop over all Algorithms in the veto list to see
00535   // whether any have been executed and have their filter
00536   // passed flag set. Any match causes the event to be
00537   // rejected.
00538   if ( result && ! m_vetoAlgs->empty() ) {
00539     for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00540       if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00541         result = false;
00542         break;
00543       }
00544     }
00545   }
00546   return result;
00547 }

Generated at Mon Sep 7 18:05:50 2009 for Gaudi Framework, version v21r4 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004