00001
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003
00004
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015
00016 #include "GaudiKernel/MsgStream.h"
00017 #include "GaudiKernel/strcasecmp.h"
00018 #include "GaudiKernel/DataObject.h"
00019 #include "GaudiKernel/DataStoreItem.h"
00020 #include "OutputStream.h"
00021 #include "OutputStreamAgent.h"
00022
00023
00024 DECLARE_ALGORITHM_FACTORY(OutputStream)
00025
00026
00027 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00028 : Algorithm(name, pSvcLocator)
00029 {
00030 m_doPreLoad = true;
00031 m_doPreLoadOpt = false;
00032 m_verifyItems = true;
00033 m_output = "";
00034 m_outputName = "";
00035 m_outputType = "UPDATE";
00036 m_storeName = "EventDataSvc";
00037 m_persName = "EventPersistencySvc";
00038 m_agent = new OutputStreamAgent(this);
00039 m_acceptAlgs = new std::vector<Algorithm*>();
00040 m_requireAlgs = new std::vector<Algorithm*>();
00041 m_vetoAlgs = new std::vector<Algorithm*>();
00042 declareProperty("ItemList", m_itemNames);
00043 declareProperty("OptItemList", m_optItemNames);
00044 declareProperty("Preload", m_doPreLoad);
00045 declareProperty("PreloadOptItems", m_doPreLoadOpt);
00046 declareProperty("Output", m_output);
00047 declareProperty("OutputFile", m_outputName);
00048 declareProperty("EvtDataSvc", m_storeName);
00049 declareProperty("EvtConversionSvc", m_persName);
00050 declareProperty("AcceptAlgs", m_acceptNames);
00051 declareProperty("RequireAlgs", m_requireNames);
00052 declareProperty("VetoAlgs", m_vetoNames);
00053 declareProperty("VerifyItems", m_verifyItems);
00054
00055
00056 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00057 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00058 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
00059 }
00060
00061
00062 OutputStream::~OutputStream() {
00063 delete m_agent;
00064 delete m_acceptAlgs;
00065 delete m_requireAlgs;
00066 delete m_vetoAlgs;
00067 }
00068
00069
00070 StatusCode OutputStream::initialize() {
00071 MsgStream log(msgSvc(), name());
00072
00073
00074 m_events = 0;
00075
00076 m_pDataManager = serviceLocator()->service(m_storeName);
00077 if( !m_pDataManager.isValid() ) {
00078 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00079 return StatusCode::FAILURE;
00080 }
00081
00082 m_pDataProvider = serviceLocator()->service(m_storeName);
00083 if( !m_pDataProvider.isValid() ) {
00084 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00085 return StatusCode::FAILURE;
00086 }
00087 if ( !(m_itemNames.empty() && m_optItemNames.empty()) ) {
00088 StatusCode status = connectConversionSvc();
00089 if( !status.isSuccess() ) {
00090 log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00091 return status;
00092 }
00093 }
00094
00095
00096 clearItems(m_optItemList);
00097
00098 clearItems(m_itemList);
00099
00100 ItemNames::iterator i;
00101
00102 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
00103 addItem( m_itemList, *i );
00104 }
00105
00106
00107 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
00108 addItem( m_optItemList, *i );
00109 }
00110
00111
00112 if ( m_doPreLoad ) {
00113 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
00114 m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00115 }
00116
00117 }
00118
00119 if ( m_doPreLoadOpt ) {
00120 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00121 m_pDataProvider->addPreLoadItem( *(*j) );
00122 }
00123 }
00124 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 decodeAcceptAlgs ().ignore();
00136 decodeRequireAlgs().ignore();
00137 decodeVetoAlgs ().ignore();
00138 return StatusCode::SUCCESS;
00139 }
00140
00141
00142 StatusCode OutputStream::finalize() {
00143 MsgStream log(msgSvc(), name());
00144 log << MSG::INFO << "Events output: " << m_events << endmsg;
00145 if ( m_pDataProvider ) m_pDataProvider->release();
00146 m_pDataProvider = 0;
00147 if ( m_pDataManager ) m_pDataManager->release();
00148 m_pDataManager = 0;
00149 if ( m_pConversionSvc ) m_pConversionSvc->release();
00150 m_pConversionSvc = 0;
00151 clearItems(m_optItemList);
00152 clearItems(m_itemList);
00153 return StatusCode::SUCCESS;
00154 }
00155
00156
00157 StatusCode OutputStream::execute() {
00158
00159 clearSelection();
00160
00161 if ( isEventAccepted() ) {
00162 StatusCode sc = writeObjects();
00163 clearSelection();
00164 m_events++;
00165 return sc;
00166 }
00167 return StatusCode::SUCCESS;
00168 }
00169
00170
00171 StatusCode OutputStream::writeObjects() {
00172
00173 StatusCode status = collectObjects();
00174 if ( status.isSuccess() ) {
00175 IDataSelector* sel = selectedObjects();
00176 if ( sel->begin() != sel->end() ) {
00177 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00178 if ( status.isSuccess() ) {
00179
00180 IDataSelector::iterator j;
00181 IOpaqueAddress* pAddress = 0;
00182 for ( j = sel->begin(); j != sel->end(); j++ ) {
00183 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00184 if ( !iret.isSuccess() ) {
00185 status = iret;
00186 continue;
00187 }
00188 IRegistry* pReg = (*j)->registry();
00189 pReg->setAddress(pAddress);
00190 }
00191 for ( j = sel->begin(); j != sel->end(); j++ ) {
00192 IRegistry* pReg = (*j)->registry();
00193 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00194 if ( !iret.isSuccess() ) {
00195 status = iret;
00196 }
00197 }
00198
00199 if ( status.isSuccess() ) {
00200 status = m_pConversionSvc->commitOutput(m_outputName, true);
00201 }
00202 else {
00203 m_pConversionSvc->commitOutput(m_outputName, false);
00204 }
00205 }
00206 }
00207 }
00208 return status;
00209 }
00210
00211
00212 bool OutputStream::collect(IRegistry* dir, int level) {
00213 if ( level < m_currentItem->depth() ) {
00214 if ( dir->object() != 0 ) {
00215
00216
00217
00218
00219
00220
00221
00222 m_objects.push_back(dir->object());
00223 return true;
00224 }
00225 }
00226 return false;
00227 }
00228
00230 StatusCode OutputStream::collectObjects() {
00231 MsgStream log(msgSvc(), name());
00232 StatusCode status = StatusCode::SUCCESS;
00233 Items::iterator i;
00234
00235 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
00236 DataObject* obj = 0;
00237 m_currentItem = (*i);
00238 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00239 if ( iret.isSuccess() ) {
00240 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00241 if ( !iret.isSuccess() ) {
00242 status = iret;
00243 }
00244 }
00245 else {
00246 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00247 << m_currentItem->path() << endmsg;
00248 status = iret;
00249 }
00250 }
00251
00252 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
00253 DataObject* obj = 0;
00254 m_currentItem = (*i);
00255 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00256 if ( iret.isSuccess() ) {
00257 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00258 }
00259 if ( !iret.isSuccess() ) {
00260 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00261 << m_currentItem->path() << endmsg;
00262 }
00263 }
00264 return status;
00265 }
00266
00267
00268 void OutputStream::clearSelection() {
00269 m_objects.erase(m_objects.begin(), m_objects.end());
00270 }
00271
00272
00273 void OutputStream::clearItems(Items& itms) {
00274 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
00275 delete (*i);
00276 }
00277 itms.erase(itms.begin(), itms.end());
00278 }
00279
00280
00281 DataStoreItem*
00282 OutputStream::findItem(const std::string& path) {
00283 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
00284 if ( (*i)->path() == path ) return (*i);
00285 }
00286 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
00287 if ( (*j)->path() == path ) return (*j);
00288 }
00289 return 0;
00290 }
00291
00292
00293 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
00294 MsgStream log(msgSvc(), name());
00295 int level = 0;
00296 size_t sep = descriptor.rfind("#");
00297 std::string obj_path (descriptor,0,sep);
00298 std::string slevel (descriptor,sep+1,descriptor.length());
00299 if ( slevel == "*" ) {
00300 level = 9999999;
00301 }
00302 else {
00303 level = atoi(slevel.c_str());
00304 }
00305 if ( m_verifyItems ) {
00306 size_t idx = obj_path.find("/",1);
00307 while(idx != std::string::npos) {
00308 std::string sub_item = obj_path.substr(0,idx);
00309 if ( 0 == findItem(sub_item) ) {
00310 addItem(itms, sub_item+"#1");
00311 }
00312 idx = obj_path.find("/",idx+1);
00313 }
00314 }
00315 DataStoreItem* item = new DataStoreItem(obj_path, level);
00316 log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00317 << " with " << item->depth()
00318 << " level(s)." << endmsg;
00319 itms.push_back( item );
00320 }
00321
00322
00323 StatusCode OutputStream::connectConversionSvc() {
00324 StatusCode status = StatusCode(StatusCode::FAILURE, true);
00325 MsgStream log(msgSvc(), name());
00326
00327 std::string dbType, svc, shr;
00328 Tokenizer tok(true);
00329 tok.analyse(m_output, " ", "", "", "=", "'", "'");
00330 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
00331 const std::string& tag = (*i).tag();
00332 const std::string& val = (*i).value();
00333 switch( ::toupper(tag[0]) ) {
00334 case 'D':
00335 m_outputName = val;
00336 break;
00337 case 'T':
00338 dbType = val;
00339 break;
00340 case 'S':
00341 switch( ::toupper(tag[1]) ) {
00342 case 'V': svc = val; break;
00343 case 'H': shr = "YES"; break;
00344 }
00345 break;
00346 case 'O':
00347 switch( ::toupper(val[0]) ) {
00348 case 'R':
00349 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00350 m_outputType = "RECREATE";
00351 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00352 m_outputType = "READ";
00353 break;
00354 case 'C':
00355 case 'N':
00356 case 'W':
00357 m_outputType = "NEW";
00358 break;
00359 case 'U':
00360 m_outputType = "UPDATE";
00361 break;
00362 default:
00363 m_outputType = "???";
00364 break;
00365 }
00366 break;
00367 default:
00368 break;
00369 }
00370 }
00371 if ( !shr.empty() ) m_outputType += "|SHARED";
00372
00373
00374
00375
00376 if ( dbType.length() > 0 && svc.length()==0 ) {
00377 SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00378 if( !ipers.isValid() ) {
00379 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00380 return StatusCode::FAILURE;
00381 }
00382 IConversionSvc *cnvSvc = 0;
00383 status = ipers->getService(dbType, cnvSvc);
00384 if( !status.isSuccess() ) {
00385 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endmsg;
00386 return status;
00387 }
00388
00389 m_pConversionSvc = cnvSvc;
00390 }
00391 else if ( svc.length() > 0 ) {
00392
00393 m_pConversionSvc = serviceLocator()->service(svc);
00394 if( !m_pConversionSvc.isValid() ) {
00395 log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endmsg;
00396 return StatusCode::FAILURE;
00397 }
00398 }
00399 else {
00400 log << MSG::FATAL
00401 << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00402 << "You either have to specify a technology name or a service name!" << endmsg
00403 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
00404 return StatusCode::FAILURE;
00405 }
00406 return StatusCode::SUCCESS;
00407 }
00408
00409 StatusCode OutputStream::decodeAcceptAlgs( ) {
00410 return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00411 }
00412
00413 void OutputStream::acceptAlgsHandler( Property& ) {
00414 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00415 if (sc.isFailure()) {
00416 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00417 "OutputStream::acceptAlgsHandler",sc);
00418 }
00419 }
00420
00421 StatusCode OutputStream::decodeRequireAlgs( ) {
00422 return decodeAlgorithms( m_requireNames, m_requireAlgs );
00423 }
00424
00425 void OutputStream::requireAlgsHandler( Property& ) {
00426 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00427 if (sc.isFailure()) {
00428 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00429 "OutputStream::requireAlgsHandler",sc);
00430 }
00431 }
00432
00433 StatusCode OutputStream::decodeVetoAlgs( ) {
00434 return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00435 }
00436
00437 void OutputStream::vetoAlgsHandler( Property& ) {
00438 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00439 if (sc.isFailure()) {
00440 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00441 "OutputStream::vetoAlgsHandler",sc);
00442 }
00443 }
00444
00445 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00446 std::vector<Algorithm*>* theAlgs )
00447 {
00448
00449 theAlgs->clear( );
00450
00451 MsgStream log( msgSvc( ), name( ) );
00452
00453 StatusCode result = StatusCode::FAILURE;
00454
00455 SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00456 if ( theAlgMgr.isValid() ) {
00457
00458 const std::vector<std::string> nameList = theNames.value( );
00459 std::vector<std::string>::const_iterator it;
00460 std::vector<std::string>::const_iterator itend = nameList.end( );
00461 for (it = nameList.begin(); it != itend; ++it) {
00462
00463
00464 const std::string &theName = (*it);
00465 SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00466 Algorithm* theAlgorithm;
00467 if ( theIAlg.isValid() ) {
00468 result = StatusCode::SUCCESS;
00469 try{
00470 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00471 } catch(...){
00472 result = StatusCode::FAILURE;
00473 }
00474 }
00475 if ( result.isSuccess( ) ) {
00476
00477 std::vector<Algorithm*>::iterator ita;
00478 std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00479 for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00480 Algorithm* existAlgorithm = (*ita);
00481 if ( theAlgorithm == existAlgorithm ) {
00482 result = StatusCode::FAILURE;
00483 break;
00484 }
00485 }
00486 if ( result.isSuccess( ) ) {
00487 theAlgorithm->addRef();
00488 theAlgs->push_back( theAlgorithm );
00489 }
00490 }
00491 else {
00492 log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00493 }
00494 }
00495 result = StatusCode::SUCCESS;
00496 }
00497 else {
00498 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00499 }
00500 return result;
00501 }
00502
00503 bool OutputStream::isEventAccepted( ) const {
00504 typedef std::vector<Algorithm*>::iterator AlgIter;
00505 bool result = true;
00506
00507
00508
00509
00510
00511 if ( ! m_acceptAlgs->empty() ) {
00512 result = false;
00513 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00514 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00515 result = true;
00516 break;
00517 }
00518 }
00519 }
00520
00521
00522
00523
00524
00525 if ( result && ! m_requireAlgs->empty() ) {
00526 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00527 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00528 result = false;
00529 break;
00530 }
00531 }
00532 }
00533
00534
00535
00536
00537
00538 if ( result && ! m_vetoAlgs->empty() ) {
00539 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00540 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00541 result = false;
00542 break;
00543 }
00544 }
00545 }
00546 return result;
00547 }