00001
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003
00004
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 #include "GaudiKernel/Incident.h"
00016 #include "GaudiKernel/IIncidentSvc.h"
00017
00018 #include "GaudiKernel/MsgStream.h"
00019 #include "GaudiKernel/strcasecmp.h"
00020 #include "GaudiKernel/DataObject.h"
00021 #include "GaudiKernel/DataStoreItem.h"
00022 #include "OutputStream.h"
00023 #include "OutputStreamAgent.h"
00024
00025
00026 DECLARE_ALGORITHM_FACTORY(OutputStream)
00027
00028
00029 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00030 : Algorithm(name, pSvcLocator)
00031 {
00032 m_doPreLoad = true;
00033 m_doPreLoadOpt = false;
00034 m_verifyItems = true;
00035 m_output = "";
00036 m_outputName = "";
00037 m_outputType = "UPDATE";
00038 m_storeName = "EventDataSvc";
00039 m_persName = "EventPersistencySvc";
00040 m_agent = new OutputStreamAgent(this);
00041 m_acceptAlgs = new std::vector<Algorithm*>();
00042 m_requireAlgs = new std::vector<Algorithm*>();
00043 m_vetoAlgs = new std::vector<Algorithm*>();
00046 m_fireIncidents = true;
00047 declareProperty("ItemList", m_itemNames);
00048 declareProperty("OptItemList", m_optItemNames);
00049 declareProperty("Preload", m_doPreLoad);
00050 declareProperty("PreloadOptItems", m_doPreLoadOpt);
00051 declareProperty("Output", m_output);
00052 declareProperty("OutputFile", m_outputName);
00053 declareProperty("EvtDataSvc", m_storeName);
00054 declareProperty("EvtConversionSvc", m_persName);
00055 declareProperty("AcceptAlgs", m_acceptNames);
00056 declareProperty("RequireAlgs", m_requireNames);
00057 declareProperty("VetoAlgs", m_vetoNames);
00058 declareProperty("VerifyItems", m_verifyItems);
00061
00062
00063 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00064 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00065 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
00066 }
00067
00068
00069 OutputStream::~OutputStream() {
00070 delete m_agent;
00071 delete m_acceptAlgs;
00072 delete m_requireAlgs;
00073 delete m_vetoAlgs;
00074 }
00075
00076
00077 StatusCode OutputStream::initialize() {
00078 MsgStream log(msgSvc(), name());
00079
00080
00081 m_events = 0;
00082
00083 m_pDataManager = serviceLocator()->service(m_storeName);
00084 if( !m_pDataManager.isValid() ) {
00085 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00086 return StatusCode::FAILURE;
00087 }
00088
00089 m_incidentSvc = serviceLocator()->service("IncidentSvc");
00090 if( !m_incidentSvc.isValid() ) {
00091 log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
00092 return StatusCode::FAILURE;
00093 }
00094
00095 m_pDataProvider = serviceLocator()->service(m_storeName);
00096 if( !m_pDataProvider.isValid() ) {
00097 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00098 return StatusCode::FAILURE;
00099 }
00100 if ( !(m_itemNames.empty() && m_optItemNames.empty()) ) {
00101 StatusCode status = connectConversionSvc();
00102 if( !status.isSuccess() ) {
00103 log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00104 if(m_outputName!="" && m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00105 IncidentType::FailOutputFile));
00106 return status;
00107 }
00108 }
00109
00110
00111 clearItems(m_optItemList);
00112
00113 clearItems(m_itemList);
00114
00115 ItemNames::iterator i;
00116
00117 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
00118 addItem( m_itemList, *i );
00119 }
00120
00121
00122 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
00123 addItem( m_optItemList, *i );
00124 }
00125
00126
00127 if ( m_doPreLoad ) {
00128 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
00129 m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00130 }
00131
00132 }
00133
00134 if ( m_doPreLoadOpt ) {
00135 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00136 m_pDataProvider->addPreLoadItem( *(*j) );
00137 }
00138 }
00139 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 decodeAcceptAlgs ().ignore();
00151 decodeRequireAlgs().ignore();
00152 decodeVetoAlgs ().ignore();
00153 return StatusCode::SUCCESS;
00154 }
00155
00156
00157 StatusCode OutputStream::finalize() {
00158 MsgStream log(msgSvc(), name());
00159 log << MSG::INFO << "Events output: " << m_events << endmsg;
00160 if(m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00161 IncidentType::EndOutputFile));
00162 m_incidentSvc.reset();
00163 m_pDataProvider.reset();
00164 m_pDataManager.reset();
00165 m_pConversionSvc.reset();
00166 clearItems(m_optItemList);
00167 clearItems(m_itemList);
00168 return StatusCode::SUCCESS;
00169 }
00170
00171
00172 StatusCode OutputStream::execute() {
00173
00174 clearSelection();
00175
00176 if ( isEventAccepted() ) {
00177 StatusCode sc = writeObjects();
00178 clearSelection();
00179 m_events++;
00180 if(sc.isSuccess() && m_fireIncidents)
00181 m_incidentSvc->fireIncident(Incident(m_outputName,
00182 IncidentType::WroteToOutputFile));
00183 else if(m_fireIncidents)
00184 m_incidentSvc->fireIncident(Incident(m_outputName,
00185 IncidentType::FailOutputFile));
00186 return sc;
00187 }
00188 return StatusCode::SUCCESS;
00189 }
00190
00191
00192 StatusCode OutputStream::writeObjects() {
00193
00194 StatusCode status = collectObjects();
00195 if ( status.isSuccess() ) {
00196 IDataSelector* sel = selectedObjects();
00197 if ( sel->begin() != sel->end() ) {
00198 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00199 if ( status.isSuccess() ) {
00200
00201 IDataSelector::iterator j;
00202 IOpaqueAddress* pAddress = 0;
00203 for ( j = sel->begin(); j != sel->end(); j++ ) {
00204 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00205 if ( !iret.isSuccess() ) {
00206 status = iret;
00207 continue;
00208 }
00209 IRegistry* pReg = (*j)->registry();
00210 pReg->setAddress(pAddress);
00211 }
00212 for ( j = sel->begin(); j != sel->end(); j++ ) {
00213 IRegistry* pReg = (*j)->registry();
00214 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00215 if ( !iret.isSuccess() ) {
00216 status = iret;
00217 }
00218 }
00219
00220 if ( status.isSuccess() ) {
00221 status = m_pConversionSvc->commitOutput(m_outputName, true);
00222 }
00223 else {
00224 m_pConversionSvc->commitOutput(m_outputName, false);
00225 }
00226 }
00227 }
00228 }
00229 return status;
00230 }
00231
00232
00233 bool OutputStream::collect(IRegistry* dir, int level) {
00234 if ( level < m_currentItem->depth() ) {
00235 if ( dir->object() != 0 ) {
00236
00237
00238
00239
00240
00241
00242
00243 m_objects.push_back(dir->object());
00244 return true;
00245 }
00246 }
00247 return false;
00248 }
00249
00251 StatusCode OutputStream::collectObjects() {
00252 MsgStream log(msgSvc(), name());
00253 StatusCode status = StatusCode::SUCCESS;
00254 Items::iterator i;
00255
00256 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
00257 DataObject* obj = 0;
00258 m_currentItem = (*i);
00259 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00260 if ( iret.isSuccess() ) {
00261 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00262 if ( !iret.isSuccess() ) {
00263 status = iret;
00264 }
00265 }
00266 else {
00267 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00268 << m_currentItem->path() << endmsg;
00269 status = iret;
00270 }
00271 }
00272
00273 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
00274 DataObject* obj = 0;
00275 m_currentItem = (*i);
00276 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00277 if ( iret.isSuccess() ) {
00278 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00279 }
00280 if ( !iret.isSuccess() ) {
00281 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00282 << m_currentItem->path() << endmsg;
00283 }
00284 }
00285 return status;
00286 }
00287
00288
00289 void OutputStream::clearSelection() {
00290 m_objects.erase(m_objects.begin(), m_objects.end());
00291 }
00292
00293
00294 void OutputStream::clearItems(Items& itms) {
00295 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
00296 delete (*i);
00297 }
00298 itms.erase(itms.begin(), itms.end());
00299 }
00300
00301
00302 DataStoreItem*
00303 OutputStream::findItem(const std::string& path) {
00304 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
00305 if ( (*i)->path() == path ) return (*i);
00306 }
00307 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
00308 if ( (*j)->path() == path ) return (*j);
00309 }
00310 return 0;
00311 }
00312
00313
00314 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
00315 MsgStream log(msgSvc(), name());
00316 int level = 0;
00317 size_t sep = descriptor.rfind("#");
00318 std::string obj_path (descriptor,0,sep);
00319 std::string slevel (descriptor,sep+1,descriptor.length());
00320 if ( slevel == "*" ) {
00321 level = 9999999;
00322 }
00323 else {
00324 level = atoi(slevel.c_str());
00325 }
00326 if ( m_verifyItems ) {
00327 size_t idx = obj_path.find("/",1);
00328 while(idx != std::string::npos) {
00329 std::string sub_item = obj_path.substr(0,idx);
00330 if ( 0 == findItem(sub_item) ) {
00331 addItem(itms, sub_item+"#1");
00332 }
00333 idx = obj_path.find("/",idx+1);
00334 }
00335 }
00336 DataStoreItem* item = new DataStoreItem(obj_path, level);
00337 log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00338 << " with " << item->depth()
00339 << " level(s)." << endmsg;
00340 itms.push_back( item );
00341 }
00342
00343
00344 StatusCode OutputStream::connectConversionSvc() {
00345 StatusCode status = StatusCode(StatusCode::FAILURE, true);
00346 MsgStream log(msgSvc(), name());
00347
00348 std::string dbType, svc, shr;
00349 Tokenizer tok(true);
00350 tok.analyse(m_output, " ", "", "", "=", "'", "'");
00351 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
00352 const std::string& tag = (*i).tag();
00353 const std::string& val = (*i).value();
00354 switch( ::toupper(tag[0]) ) {
00355 case 'D':
00356 m_outputName = val;
00357 break;
00358 case 'T':
00359 dbType = val;
00360 break;
00361 case 'S':
00362 switch( ::toupper(tag[1]) ) {
00363 case 'V': svc = val; break;
00364 case 'H': shr = "YES"; break;
00365 }
00366 break;
00367 case 'O':
00368 switch( ::toupper(val[0]) ) {
00369 case 'R':
00370 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00371 m_outputType = "RECREATE";
00372 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00373 m_outputType = "READ";
00374 break;
00375 case 'C':
00376 case 'N':
00377 case 'W':
00378 m_outputType = "NEW";
00379 break;
00380 case 'U':
00381 m_outputType = "UPDATE";
00382 break;
00383 default:
00384 m_outputType = "???";
00385 break;
00386 }
00387 break;
00388 default:
00389 break;
00390 }
00391 }
00392 if ( !shr.empty() ) m_outputType += "|SHARED";
00393
00394
00395
00396
00397 if ( dbType.length() > 0 && svc.length()==0 ) {
00398 SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00399 if( !ipers.isValid() ) {
00400 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00401 return StatusCode::FAILURE;
00402 }
00403 IConversionSvc *cnvSvc = 0;
00404 status = ipers->getService(dbType, cnvSvc);
00405 if( !status.isSuccess() ) {
00406 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endmsg;
00407 return status;
00408 }
00409
00410 m_pConversionSvc = cnvSvc;
00411 }
00412 else if ( svc.length() > 0 ) {
00413
00414 m_pConversionSvc = serviceLocator()->service(svc);
00415 if( !m_pConversionSvc.isValid() ) {
00416 log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endmsg;
00417 return StatusCode::FAILURE;
00418 }
00419 }
00420 else {
00421 log << MSG::FATAL
00422 << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00423 << "You either have to specify a technology name or a service name!" << endmsg
00424 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
00425 return StatusCode::FAILURE;
00426 }
00427 return StatusCode::SUCCESS;
00428 }
00429
00430 StatusCode OutputStream::decodeAcceptAlgs( ) {
00431 return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00432 }
00433
00434 void OutputStream::acceptAlgsHandler( Property& ) {
00435 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00436 if (sc.isFailure()) {
00437 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00438 "OutputStream::acceptAlgsHandler",sc);
00439 }
00440 }
00441
00442 StatusCode OutputStream::decodeRequireAlgs( ) {
00443 return decodeAlgorithms( m_requireNames, m_requireAlgs );
00444 }
00445
00446 void OutputStream::requireAlgsHandler( Property& ) {
00447 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00448 if (sc.isFailure()) {
00449 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00450 "OutputStream::requireAlgsHandler",sc);
00451 }
00452 }
00453
00454 StatusCode OutputStream::decodeVetoAlgs( ) {
00455 return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00456 }
00457
00458 void OutputStream::vetoAlgsHandler( Property& ) {
00459 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00460 if (sc.isFailure()) {
00461 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00462 "OutputStream::vetoAlgsHandler",sc);
00463 }
00464 }
00465
00466 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00467 std::vector<Algorithm*>* theAlgs )
00468 {
00469
00470 theAlgs->clear( );
00471
00472 MsgStream log( msgSvc( ), name( ) );
00473
00474 StatusCode result = StatusCode::FAILURE;
00475
00476 SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00477 if ( theAlgMgr.isValid() ) {
00478
00479 const std::vector<std::string> nameList = theNames.value( );
00480 std::vector<std::string>::const_iterator it;
00481 std::vector<std::string>::const_iterator itend = nameList.end( );
00482 for (it = nameList.begin(); it != itend; ++it) {
00483
00484
00485 const std::string &theName = (*it);
00486 SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00487 Algorithm* theAlgorithm;
00488 if ( theIAlg.isValid() ) {
00489 result = StatusCode::SUCCESS;
00490 try{
00491 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00492 } catch(...){
00493 result = StatusCode::FAILURE;
00494 }
00495 }
00496 if ( result.isSuccess( ) ) {
00497
00498 std::vector<Algorithm*>::iterator ita;
00499 std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00500 for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00501 Algorithm* existAlgorithm = (*ita);
00502 if ( theAlgorithm == existAlgorithm ) {
00503 result = StatusCode::FAILURE;
00504 break;
00505 }
00506 }
00507 if ( result.isSuccess( ) ) {
00508 theAlgorithm->addRef();
00509 theAlgs->push_back( theAlgorithm );
00510 }
00511 }
00512 else {
00513 log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00514 }
00515 }
00516 result = StatusCode::SUCCESS;
00517 }
00518 else {
00519 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00520 }
00521 return result;
00522 }
00523
00524 bool OutputStream::isEventAccepted( ) const {
00525 typedef std::vector<Algorithm*>::iterator AlgIter;
00526 bool result = true;
00527
00528
00529
00530
00531
00532 if ( ! m_acceptAlgs->empty() ) {
00533 result = false;
00534 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00535 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00536 result = true;
00537 break;
00538 }
00539 }
00540 }
00541
00542
00543
00544
00545
00546 if ( result && ! m_requireAlgs->empty() ) {
00547 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00548 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00549 result = false;
00550 break;
00551 }
00552 }
00553 }
00554
00555
00556
00557
00558
00559 if ( result && ! m_vetoAlgs->empty() ) {
00560 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00561 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00562 result = false;
00563 break;
00564 }
00565 }
00566 }
00567 return result;
00568 }