![]() |
|
|
Generated: 8 Jan 2009 |
00001 // $Id: OutputStream.cpp,v 1.23 2008/01/15 13:46:52 marcocle Exp $ 00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP 00003 00004 // Framework include files 00005 #include "GaudiKernel/Tokenizer.h" 00006 #include "GaudiKernel/AlgFactory.h" 00007 #include "GaudiKernel/IRegistry.h" 00008 #include "GaudiKernel/IAlgManager.h" 00009 #include "GaudiKernel/ISvcLocator.h" 00010 #include "GaudiKernel/IConversionSvc.h" 00011 #include "GaudiKernel/IDataManagerSvc.h" 00012 #include "GaudiKernel/IDataProviderSvc.h" 00013 #include "GaudiKernel/IPersistencySvc.h" 00014 #include "GaudiKernel/IOpaqueAddress.h" 00015 00016 #include "GaudiKernel/MsgStream.h" 00017 #include "GaudiKernel/strcasecmp.h" 00018 #include "GaudiKernel/DataObject.h" 00019 #include "GaudiKernel/DataStoreItem.h" 00020 #include "OutputStream.h" 00021 #include "OutputStreamAgent.h" 00022 00023 // Define the algorithm factory for the standard output data writer 00024 DECLARE_ALGORITHM_FACTORY(OutputStream) 00025 00026 // Standard Constructor 00027 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator) 00028 : Algorithm(name, pSvcLocator) 00029 { 00030 m_doPreLoad = true; 00031 m_doPreLoadOpt = false; 00032 m_verifyItems = true; 00033 m_output = ""; 00034 m_outputName = ""; 00035 m_outputType = "UPDATE"; 00036 m_storeName = "EventDataSvc"; 00037 m_persName = "EventPersistencySvc"; 00038 m_agent = new OutputStreamAgent(this); 00039 m_pDataManager = 0; 00040 m_pDataProvider = 0; 00041 m_pConversionSvc = 0; 00042 m_acceptAlgs = new std::vector<Algorithm*>(); 00043 m_requireAlgs = new std::vector<Algorithm*>(); 00044 m_vetoAlgs = new std::vector<Algorithm*>(); 00045 declareProperty("ItemList", m_itemNames); 00046 declareProperty("OptItemList", m_optItemNames); 00047 declareProperty("Preload", m_doPreLoad); 00048 declareProperty("PreloadOptItems", m_doPreLoadOpt); 00049 declareProperty("Output", m_output); 00050 declareProperty("OutputFile", m_outputName); 00051 declareProperty("EvtDataSvc", m_storeName); 00052 declareProperty("EvtConversionSvc", m_persName); 00053 declareProperty("AcceptAlgs", m_acceptNames); 00054 declareProperty("RequireAlgs", m_requireNames); 00055 declareProperty("VetoAlgs", m_vetoNames); 00056 declareProperty("VerifyItems", m_verifyItems); 00057 00058 // Associate action handlers with the AcceptAlgs, RequireAlgs & VetoAlgs properties 00059 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this ); 00060 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this ); 00061 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this ); 00062 } 00063 00064 // Standard Destructor 00065 OutputStream::~OutputStream() { 00066 delete m_agent; 00067 delete m_acceptAlgs; 00068 delete m_requireAlgs; 00069 delete m_vetoAlgs; 00070 } 00071 00072 // initialize data writer 00073 StatusCode OutputStream::initialize() { 00074 StatusCode status = StatusCode::SUCCESS; 00075 MsgStream log(msgSvc(), name()); 00076 00077 // Reset the number of events written 00078 m_events = 0; 00079 // Get access to the DataManagerSvc 00080 status = serviceLocator()->service(m_storeName, m_pDataManager, true); 00081 if( !status.isSuccess() ) { 00082 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endreq; 00083 return status; 00084 } 00085 // Get access to the assigned data service 00086 status = serviceLocator()->service(m_storeName, m_pDataProvider, true); 00087 if( !status.isSuccess() ) { 00088 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endreq; 00089 return status; 00090 } 00091 if ( !(m_itemNames.empty() && m_optItemNames.empty()) ) { 00092 status = connectConversionSvc(); 00093 if( !status.isSuccess() ) { 00094 log << MSG::FATAL << "Unable to connect to conversion service." << endreq; 00095 return status; 00096 } 00097 } 00098 00099 // Clear the list with optional items 00100 clearItems(m_optItemList); 00101 // Clear the item list 00102 clearItems(m_itemList); 00103 00104 ItemNames::iterator i; 00105 // Take the new item list from the properties. 00106 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) { 00107 addItem( m_itemList, *i ); 00108 } 00109 00110 // Take the new item list from the properties. 00111 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) { 00112 addItem( m_optItemList, *i ); 00113 } 00114 00115 // Take the item list to the data service preload list. 00116 if ( m_doPreLoad ) { 00117 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) { 00118 m_pDataProvider->addPreLoadItem( *(*j) ).ignore(); 00119 } 00120 // Not working: bad reference counting! pdataSvc->release(); 00121 } 00122 00123 if ( m_doPreLoadOpt ) { 00124 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) { 00125 m_pDataProvider->addPreLoadItem( *(*j) ); 00126 } 00127 } 00128 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endreq; 00129 00130 // Decode the accept, required and veto Algorithms. The logic is the following: 00131 // a. The event is accepted if all lists are empty. 00132 // b. The event is provisionally accepted if any Algorithm in the accept list 00133 // has been executed and has indicated that its filter is passed. This 00134 // provisional acceptance can be overridden by the other lists. 00135 // c. The event is rejected unless all Algorithms in the required list have 00136 // been executed and have indicated that their filter passed. 00137 // d. The event is rejected if any Algorithm in the veto list has been 00138 // executed and has indicated that its filter has passed. 00139 decodeAcceptAlgs ().ignore(); 00140 decodeRequireAlgs().ignore(); 00141 decodeVetoAlgs ().ignore(); 00142 return status; 00143 } 00144 00145 // terminate data writer 00146 StatusCode OutputStream::finalize() { 00147 MsgStream log(msgSvc(), name()); 00148 log << MSG::INFO << "Events output: " << m_events << endreq; 00149 if ( m_pDataProvider ) m_pDataProvider->release(); 00150 m_pDataProvider = 0; 00151 if ( m_pDataManager ) m_pDataManager->release(); 00152 m_pDataManager = 0; 00153 if ( m_pConversionSvc ) m_pConversionSvc->release(); 00154 m_pConversionSvc = 0; 00155 clearItems(m_optItemList); 00156 clearItems(m_itemList); 00157 return StatusCode::SUCCESS; 00158 } 00159 00160 // Work entry point 00161 StatusCode OutputStream::execute() { 00162 // Clear any previously existing item list 00163 clearSelection(); 00164 // Test whether this event should be output 00165 if ( isEventAccepted() ) { 00166 StatusCode sc = writeObjects(); 00167 clearSelection(); 00168 m_events++; 00169 return sc; 00170 } 00171 return StatusCode::SUCCESS; 00172 } 00173 00174 // Select the different objects and write them to file 00175 StatusCode OutputStream::writeObjects() { 00176 // Connect the output file to the service 00177 StatusCode status = collectObjects(); 00178 if ( status.isSuccess() ) { 00179 IDataSelector* sel = selectedObjects(); 00180 if ( sel->begin() != sel->end() ) { 00181 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType); 00182 if ( status.isSuccess() ) { 00183 // Now pass the collection to the persistency service 00184 IDataSelector::iterator j; 00185 IOpaqueAddress* pAddress = 0; 00186 for ( j = sel->begin(); j != sel->end(); j++ ) { 00187 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress ); 00188 if ( !iret.isSuccess() ) { 00189 status = iret; 00190 continue; 00191 } 00192 IRegistry* pReg = (*j)->registry(); 00193 pReg->setAddress(pAddress); 00194 } 00195 for ( j = sel->begin(); j != sel->end(); j++ ) { 00196 IRegistry* pReg = (*j)->registry(); 00197 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j ); 00198 if ( !iret.isSuccess() ) { 00199 status = iret; 00200 } 00201 } 00202 // Commit the data if there was no error; otherwise possibly discard 00203 if ( status.isSuccess() ) { 00204 status = m_pConversionSvc->commitOutput(m_outputName, true); 00205 } 00206 else { 00207 m_pConversionSvc->commitOutput(m_outputName, false); 00208 } 00209 } 00210 } 00211 } 00212 return status; 00213 } 00214 00215 // Place holder to create configurable data store agent 00216 bool OutputStream::collect(IRegistry* dir, int level) { 00217 if ( level < m_currentItem->depth() ) { 00218 if ( dir->object() != 0 ) { 00219 /* 00220 std::cout << "Analysing (" 00221 << dir->name() 00222 << ") Object:" 00223 << ((dir->object()==0) ? "UNLOADED" : "LOADED") 00224 << std::endl; 00225 */ 00226 m_objects.push_back(dir->object()); 00227 return true; 00228 } 00229 } 00230 return false; 00231 } 00232 00234 StatusCode OutputStream::collectObjects() { 00235 MsgStream log(msgSvc(), name()); 00236 StatusCode status = StatusCode::SUCCESS; 00237 Items::iterator i; 00238 // Traverse the tree and collect the requested objects 00239 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) { 00240 DataObject* obj = 0; 00241 m_currentItem = (*i); 00242 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj); 00243 if ( iret.isSuccess() ) { 00244 iret = m_pDataManager->traverseSubTree(obj, m_agent); 00245 if ( !iret.isSuccess() ) { 00246 status = iret; 00247 } 00248 } 00249 else { 00250 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) " 00251 << m_currentItem->path() << endreq; 00252 status = iret; 00253 } 00254 } 00255 // Traverse the tree and collect the requested objects (tolerate missing itmes here) 00256 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) { 00257 DataObject* obj = 0; 00258 m_currentItem = (*i); 00259 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj); 00260 if ( iret.isSuccess() ) { 00261 iret = m_pDataManager->traverseSubTree(obj, m_agent); 00262 } 00263 if ( !iret.isSuccess() ) { 00264 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) " 00265 << m_currentItem->path() << endreq; 00266 } 00267 } 00268 return status; 00269 } 00270 00271 // Clear collected object list 00272 void OutputStream::clearSelection() { 00273 m_objects.erase(m_objects.begin(), m_objects.end()); 00274 } 00275 00276 // Remove all items from the output streamer list; 00277 void OutputStream::clearItems(Items& itms) { 00278 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) { 00279 delete (*i); 00280 } 00281 itms.erase(itms.begin(), itms.end()); 00282 } 00283 00284 // Find single item identified by its path (exact match) 00285 DataStoreItem* 00286 OutputStream::findItem(const std::string& path) { 00287 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) { 00288 if ( (*i)->path() == path ) return (*i); 00289 } 00290 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) { 00291 if ( (*j)->path() == path ) return (*j); 00292 } 00293 return 0; 00294 } 00295 00296 // Add item to output streamer list 00297 void OutputStream::addItem(Items& itms, const std::string& descriptor) { 00298 MsgStream log(msgSvc(), name()); 00299 int level = 0; 00300 size_t sep = descriptor.rfind("#"); 00301 std::string obj_path (descriptor,0,sep); 00302 std::string slevel (descriptor,sep+1,descriptor.length()); 00303 if ( slevel == "*" ) { 00304 level = 9999999; 00305 } 00306 else { 00307 level = atoi(slevel.c_str()); 00308 } 00309 if ( m_verifyItems ) { 00310 size_t idx = obj_path.find("/",1); 00311 while(idx != std::string::npos) { 00312 std::string sub_item = obj_path.substr(0,idx); 00313 if ( 0 == findItem(sub_item) ) { 00314 addItem(itms, sub_item+"#1"); 00315 } 00316 idx = obj_path.find("/",idx+1); 00317 } 00318 } 00319 DataStoreItem* item = new DataStoreItem(obj_path, level); 00320 log << MSG::DEBUG << "Adding OutputStream item " << item->path() 00321 << " with " << item->depth() 00322 << " level(s)." << endreq; 00323 itms.push_back( item ); 00324 } 00325 00326 // Connect to proper conversion service 00327 StatusCode OutputStream::connectConversionSvc() { 00328 StatusCode status = StatusCode::FAILURE; 00329 MsgStream log(msgSvc(), name()); 00330 // Get output file from input 00331 std::string dbType, svc, shr; 00332 Tokenizer tok(true); 00333 tok.analyse(m_output, " ", "", "", "=", "'", "'"); 00334 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) { 00335 const std::string& tag = (*i).tag(); 00336 const std::string& val = (*i).value(); 00337 switch( ::toupper(tag[0]) ) { 00338 case 'D': 00339 m_outputName = val; 00340 break; 00341 case 'T': 00342 dbType = val; 00343 break; 00344 case 'S': 00345 switch( ::toupper(tag[1]) ) { 00346 case 'V': svc = val; break; 00347 case 'H': shr = "YES"; break; 00348 } 00349 break; 00350 case 'O': // OPT='<NEW<CREATE,WRITE,RECREATE>, UPDATE>' 00351 switch( ::toupper(val[0]) ) { 00352 case 'R': 00353 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 ) 00354 m_outputType = "RECREATE"; 00355 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 ) 00356 m_outputType = "READ"; 00357 break; 00358 case 'C': 00359 case 'N': 00360 case 'W': 00361 m_outputType = "NEW"; 00362 break; 00363 case 'U': 00364 m_outputType = "UPDATE"; 00365 break; 00366 default: 00367 m_outputType = "???"; 00368 break; 00369 } 00370 break; 00371 default: 00372 break; 00373 } 00374 } 00375 if ( !shr.empty() ) m_outputType += "|SHARED"; 00376 // Get access to the default Persistency service 00377 // The default service is the same for input as for output. 00378 // If this is not desired, then a specialized OutputStream must overwrite 00379 // this value. 00380 if ( dbType.length() > 0 && svc.length()==0 ) { 00381 IPersistencySvc* ipers = 0; 00382 status = serviceLocator()->service(m_persName, ipers ); 00383 if( !status.isSuccess() ) { 00384 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endreq; 00385 return status; 00386 } 00387 status = ipers->getService(dbType, m_pConversionSvc); 00388 if( !status.isSuccess() ) { 00389 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endreq; 00390 return status; 00391 } 00392 // Increase reference count and keep service. 00393 m_pConversionSvc->addRef(); 00394 } 00395 else if ( svc.length() > 0 ) { 00396 // On success reference count is automatically increased. 00397 status = serviceLocator()->service(svc, m_pConversionSvc, true ); 00398 if( !status.isSuccess() ) { 00399 log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endreq; 00400 return status; 00401 } 00402 } 00403 else { 00404 log << MSG::FATAL 00405 << "Unable to locate IConversionSvc interface (Unknown technology) " << endreq 00406 << "You either have to specify a technology name or a service name!" << endreq 00407 << "Please correct the job option \"" << name() << ".Output\" !" << endreq; 00408 status = StatusCode::FAILURE; 00409 } 00410 return status; 00411 } 00412 00413 StatusCode OutputStream::decodeAcceptAlgs( ) { 00414 return decodeAlgorithms( m_acceptNames, m_acceptAlgs ); 00415 } 00416 00417 void OutputStream::acceptAlgsHandler( Property& /* theProp */ ) { 00418 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs ); 00419 if (sc.isFailure()) { 00420 throw GaudiException("Failure in OutputStream::decodeAlgorithms", 00421 "OutputStream::acceptAlgsHandler",sc); 00422 } 00423 } 00424 00425 StatusCode OutputStream::decodeRequireAlgs( ) { 00426 return decodeAlgorithms( m_requireNames, m_requireAlgs ); 00427 } 00428 00429 void OutputStream::requireAlgsHandler( Property& /* theProp */ ) { 00430 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs ); 00431 if (sc.isFailure()) { 00432 throw GaudiException("Failure in OutputStream::decodeAlgorithms", 00433 "OutputStream::requireAlgsHandler",sc); 00434 } 00435 } 00436 00437 StatusCode OutputStream::decodeVetoAlgs( ) { 00438 return decodeAlgorithms( m_vetoNames, m_vetoAlgs ); 00439 } 00440 00441 void OutputStream::vetoAlgsHandler( Property& /* theProp */ ) { 00442 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs ); 00443 if (sc.isFailure()) { 00444 throw GaudiException("Failure in OutputStream::decodeAlgorithms", 00445 "OutputStream::vetoAlgsHandler",sc); 00446 } 00447 } 00448 00449 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames, 00450 std::vector<Algorithm*>* theAlgs ) 00451 { 00452 // Reset the list of Algorithms 00453 theAlgs->clear( ); 00454 00455 MsgStream log( msgSvc( ), name( ) ); 00456 00457 IAlgManager* theAlgMgr; 00458 StatusCode result = serviceLocator()->getService( "ApplicationMgr", 00459 IID_IAlgManager, 00460 *pp_cast<IInterface>(&theAlgMgr) ); 00461 if ( result.isSuccess( ) ) { 00462 // Build the list of Algorithms from the names list 00463 const std::vector<std::string> nameList = theNames.value( ); 00464 std::vector<std::string>::const_iterator it; 00465 std::vector<std::string>::const_iterator itend = nameList.end( ); 00466 for (it = nameList.begin(); it != itend; it++) { 00467 // Check whether the suppied name corresponds to an existing 00468 // Algorithm object. 00469 std::string theName = (*it); 00470 IAlgorithm* theIAlg; 00471 Algorithm* theAlgorithm; 00472 result = theAlgMgr->getAlgorithm( theName, theIAlg ); 00473 if ( result.isSuccess( ) ) { 00474 try{ 00475 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg); 00476 } catch(...){ 00477 result = StatusCode::FAILURE; 00478 } 00479 } 00480 if ( result.isSuccess( ) ) { 00481 // Check that the specified algorithm doesn't already exist in the list 00482 std::vector<Algorithm*>::iterator ita; 00483 std::vector<Algorithm*>::iterator itaend = theAlgs->end( ); 00484 for (ita = theAlgs->begin(); ita != itaend; ita++) { 00485 Algorithm* existAlgorithm = (*ita); 00486 if ( theAlgorithm == existAlgorithm ) { 00487 result = StatusCode::FAILURE; 00488 break; 00489 } 00490 } 00491 if ( result.isSuccess( ) ) { 00492 theAlgs->push_back( theAlgorithm ); 00493 } 00494 } 00495 else { 00496 log << MSG::INFO << theName << " doesn't exist - ignored" << endreq; 00497 } 00498 } 00499 result = StatusCode::SUCCESS; 00500 } 00501 else { 00502 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endreq; 00503 } 00504 return result; 00505 } 00506 00507 bool OutputStream::isEventAccepted( ) const { 00508 typedef std::vector<Algorithm*>::iterator AlgIter; 00509 bool result = true; 00510 00511 // Loop over all Algorithms in the accept list to see 00512 // whether any have been executed and have their filter 00513 // passed flag set. Any match causes the event to be 00514 // provisionally accepted. 00515 if ( ! m_acceptAlgs->empty() ) { 00516 result = false; 00517 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) { 00518 if ( (*i)->isExecuted() && (*i)->filterPassed() ) { 00519 result = true; 00520 break; 00521 } 00522 } 00523 } 00524 00525 // Loop over all Algorithms in the required list to see 00526 // whether all have been executed and have their filter 00527 // passed flag set. Any mismatch causes the event to be 00528 // rejected. 00529 if ( result && ! m_requireAlgs->empty() ) { 00530 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) { 00531 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) { 00532 result = false; 00533 break; 00534 } 00535 } 00536 } 00537 00538 // Loop over all Algorithms in the veto list to see 00539 // whether any have been executed and have their filter 00540 // passed flag set. Any match causes the event to be 00541 // rejected. 00542 if ( result && ! m_vetoAlgs->empty() ) { 00543 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) { 00544 if ( (*i)->isExecuted() && (*i)->filterPassed() ) { 00545 result = false; 00546 break; 00547 } 00548 } 00549 } 00550 return result; 00551 }