00001
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003
00004
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 #include "GaudiKernel/Incident.h"
00016 #include "GaudiKernel/IIncidentSvc.h"
00017
00018 #include "GaudiKernel/MsgStream.h"
00019 #include "GaudiKernel/strcasecmp.h"
00020 #include "GaudiKernel/DataObject.h"
00021 #include "GaudiKernel/DataStoreItem.h"
00022 #include "OutputStream.h"
00023 #include "OutputStreamAgent.h"
00024
00025 #include <set>
00026
00027
00028 DECLARE_ALGORITHM_FACTORY(OutputStream)
00029
00030
00031 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00032 : Algorithm(name, pSvcLocator)
00033 {
00034 m_doPreLoad = true;
00035 m_doPreLoadOpt = false;
00036 m_verifyItems = true;
00037 m_output = "";
00038 m_outputName = "";
00039 m_outputType = "UPDATE";
00040 m_storeName = "EventDataSvc";
00041 m_persName = "EventPersistencySvc";
00042 m_agent = new OutputStreamAgent(this);
00043 m_acceptAlgs = new std::vector<Algorithm*>();
00044 m_requireAlgs = new std::vector<Algorithm*>();
00045 m_vetoAlgs = new std::vector<Algorithm*>();
00048 m_fireIncidents = true;
00049 declareProperty("ItemList", m_itemNames);
00050 declareProperty("OptItemList", m_optItemNames);
00051 declareProperty("Preload", m_doPreLoad);
00052 declareProperty("PreloadOptItems", m_doPreLoadOpt);
00053 declareProperty("Output", m_output);
00054 declareProperty("OutputFile", m_outputName);
00055 declareProperty("EvtDataSvc", m_storeName);
00056 declareProperty("EvtConversionSvc", m_persName);
00057 declareProperty("AcceptAlgs", m_acceptNames);
00058 declareProperty("RequireAlgs", m_requireNames);
00059 declareProperty("VetoAlgs", m_vetoNames);
00060 declareProperty("VerifyItems", m_verifyItems);
00063
00064
00065 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00066 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00067 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
00068 }
00069
00070
00071 OutputStream::~OutputStream() {
00072 delete m_agent;
00073 delete m_acceptAlgs;
00074 delete m_requireAlgs;
00075 delete m_vetoAlgs;
00076 }
00077
00078
00079 StatusCode OutputStream::initialize() {
00080 MsgStream log(msgSvc(), name());
00081
00082
00083 m_events = 0;
00084
00085 m_pDataManager = serviceLocator()->service(m_storeName);
00086 if( !m_pDataManager.isValid() ) {
00087 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00088 return StatusCode::FAILURE;
00089 }
00090
00091 m_incidentSvc = serviceLocator()->service("IncidentSvc");
00092 if( !m_incidentSvc.isValid() ) {
00093 log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
00094 return StatusCode::FAILURE;
00095 }
00096
00097 m_pDataProvider = serviceLocator()->service(m_storeName);
00098 if( !m_pDataProvider.isValid() ) {
00099 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00100 return StatusCode::FAILURE;
00101 }
00102 if ( hasInput() ) {
00103 StatusCode status = connectConversionSvc();
00104 if( !status.isSuccess() ) {
00105 log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00106 if(m_outputName!="" && m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00107 IncidentType::FailOutputFile));
00108 return status;
00109 }
00110 }
00111
00112
00113 clearItems(m_optItemList);
00114
00115 clearItems(m_itemList);
00116
00117 ItemNames::iterator i;
00118
00119 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
00120 addItem( m_itemList, *i );
00121 }
00122
00123
00124 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
00125 addItem( m_optItemList, *i );
00126 }
00127
00128
00129 if ( m_doPreLoad ) {
00130 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
00131 m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00132 }
00133
00134 }
00135
00136 if ( m_doPreLoadOpt ) {
00137 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00138 m_pDataProvider->addPreLoadItem( *(*j) );
00139 }
00140 }
00141 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152 decodeAcceptAlgs ().ignore();
00153 decodeRequireAlgs().ignore();
00154 decodeVetoAlgs ().ignore();
00155 return StatusCode::SUCCESS;
00156 }
00157
00158
00159 StatusCode OutputStream::finalize() {
00160 MsgStream log(msgSvc(), name());
00161 log << MSG::INFO << "Events output: " << m_events << endmsg;
00162 if(m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00163 IncidentType::EndOutputFile));
00164 m_incidentSvc.reset();
00165 m_pDataProvider.reset();
00166 m_pDataManager.reset();
00167 m_pConversionSvc.reset();
00168 clearItems(m_optItemList);
00169 clearItems(m_itemList);
00170 return StatusCode::SUCCESS;
00171 }
00172
00173
00174 StatusCode OutputStream::execute() {
00175
00176 clearSelection();
00177
00178 if ( isEventAccepted() ) {
00179 StatusCode sc = writeObjects();
00180 clearSelection();
00181 m_events++;
00182 if(sc.isSuccess() && m_fireIncidents)
00183 m_incidentSvc->fireIncident(Incident(m_outputName,
00184 IncidentType::WroteToOutputFile));
00185 else if(m_fireIncidents)
00186 m_incidentSvc->fireIncident(Incident(m_outputName,
00187 IncidentType::FailOutputFile));
00188 return sc;
00189 }
00190 return StatusCode::SUCCESS;
00191 }
00192
00193
00194 StatusCode OutputStream::writeObjects() {
00195
00196 StatusCode status = collectObjects();
00197 if ( status.isSuccess() ) {
00198 IDataSelector* sel = selectedObjects();
00199 if ( sel->begin() != sel->end() ) {
00200 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00201 if ( status.isSuccess() ) {
00202
00203 IDataSelector::iterator j;
00204 IOpaqueAddress* pAddress = 0;
00205 for ( j = sel->begin(); j != sel->end(); j++ ) {
00206 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00207 if ( !iret.isSuccess() ) {
00208 status = iret;
00209 continue;
00210 }
00211 IRegistry* pReg = (*j)->registry();
00212 pReg->setAddress(pAddress);
00213 }
00214 for ( j = sel->begin(); j != sel->end(); j++ ) {
00215 IRegistry* pReg = (*j)->registry();
00216 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00217 if ( !iret.isSuccess() ) {
00218 status = iret;
00219 }
00220 }
00221
00222 if ( status.isSuccess() ) {
00223 status = m_pConversionSvc->commitOutput(m_outputName, true);
00224 }
00225 else {
00226 m_pConversionSvc->commitOutput(m_outputName, false);
00227 }
00228 }
00229 }
00230 }
00231 return status;
00232 }
00233
00234
00235 bool OutputStream::collect(IRegistry* dir, int level) {
00236 if ( level < m_currentItem->depth() ) {
00237 if ( dir->object() != 0 ) {
00238
00239
00240
00241
00242
00243
00244
00245 m_objects.push_back(dir->object());
00246 return true;
00247 }
00248 }
00249 return false;
00250 }
00251
00253 StatusCode OutputStream::collectObjects() {
00254 MsgStream log(msgSvc(), name());
00255 StatusCode status = StatusCode::SUCCESS;
00256 Items::iterator i;
00257
00258 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
00259 DataObject* obj = 0;
00260 m_currentItem = (*i);
00261 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00262 if ( iret.isSuccess() ) {
00263 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00264 if ( !iret.isSuccess() ) {
00265 status = iret;
00266 }
00267 }
00268 else {
00269 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00270 << m_currentItem->path() << endmsg;
00271 status = iret;
00272 }
00273 }
00274
00275 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
00276 DataObject* obj = 0;
00277 m_currentItem = (*i);
00278 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00279 if ( iret.isSuccess() ) {
00280 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00281 }
00282 if ( !iret.isSuccess() ) {
00283 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00284 << m_currentItem->path() << endmsg;
00285 }
00286 }
00287
00288 if (status.isSuccess()){
00289
00290 std::set<DataObject*> unique;
00291 std::vector<DataObject*> tmp;
00292 tmp.reserve(m_objects.size());
00293 for (std::vector<DataObject*>::iterator o = m_objects.begin(); o != m_objects.end(); ++o) {
00294 if (!unique.count(*o)) {
00295
00296 unique.insert(*o);
00297 tmp.push_back(*o);
00298 }
00299 }
00300 m_objects.swap(tmp);
00301 }
00302
00303 return status;
00304 }
00305
00306
00307 void OutputStream::clearSelection() {
00308 m_objects.erase(m_objects.begin(), m_objects.end());
00309 }
00310
00311
00312 void OutputStream::clearItems(Items& itms) {
00313 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
00314 delete (*i);
00315 }
00316 itms.erase(itms.begin(), itms.end());
00317 }
00318
00319
00320 DataStoreItem*
00321 OutputStream::findItem(const std::string& path) {
00322 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
00323 if ( (*i)->path() == path ) return (*i);
00324 }
00325 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
00326 if ( (*j)->path() == path ) return (*j);
00327 }
00328 return 0;
00329 }
00330
00331
00332 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
00333 MsgStream log(msgSvc(), name());
00334 int level = 0;
00335 size_t sep = descriptor.rfind("#");
00336 std::string obj_path (descriptor,0,sep);
00337 std::string slevel (descriptor,sep+1,descriptor.length());
00338 if ( slevel == "*" ) {
00339 level = 9999999;
00340 }
00341 else {
00342 level = atoi(slevel.c_str());
00343 }
00344 if ( m_verifyItems ) {
00345 size_t idx = obj_path.find("/",1);
00346 while(idx != std::string::npos) {
00347 std::string sub_item = obj_path.substr(0,idx);
00348 if ( 0 == findItem(sub_item) ) {
00349 addItem(itms, sub_item+"#1");
00350 }
00351 idx = obj_path.find("/",idx+1);
00352 }
00353 }
00354 DataStoreItem* item = new DataStoreItem(obj_path, level);
00355 log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00356 << " with " << item->depth()
00357 << " level(s)." << endmsg;
00358 itms.push_back( item );
00359 }
00360
00361
00362 StatusCode OutputStream::connectConversionSvc() {
00363 StatusCode status = StatusCode(StatusCode::FAILURE, true);
00364 MsgStream log(msgSvc(), name());
00365
00366 std::string dbType, svc, shr;
00367 Tokenizer tok(true);
00368 tok.analyse(m_output, " ", "", "", "=", "'", "'");
00369 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
00370 const std::string& tag = (*i).tag();
00371 const std::string& val = (*i).value();
00372 switch( ::toupper(tag[0]) ) {
00373 case 'D':
00374 m_outputName = val;
00375 break;
00376 case 'T':
00377 dbType = val;
00378 break;
00379 case 'S':
00380 switch( ::toupper(tag[1]) ) {
00381 case 'V': svc = val; break;
00382 case 'H': shr = "YES"; break;
00383 }
00384 break;
00385 case 'O':
00386 switch( ::toupper(val[0]) ) {
00387 case 'R':
00388 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00389 m_outputType = "RECREATE";
00390 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00391 m_outputType = "READ";
00392 break;
00393 case 'C':
00394 case 'N':
00395 case 'W':
00396 m_outputType = "NEW";
00397 break;
00398 case 'U':
00399 m_outputType = "UPDATE";
00400 break;
00401 default:
00402 m_outputType = "???";
00403 break;
00404 }
00405 break;
00406 default:
00407 break;
00408 }
00409 }
00410 if ( !shr.empty() ) m_outputType += "|SHARED";
00411
00412
00413
00414
00415 if ( dbType.length() > 0 && svc.length()==0 ) {
00416 SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00417 if( !ipers.isValid() ) {
00418 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00419 return StatusCode::FAILURE;
00420 }
00421 IConversionSvc *cnvSvc = 0;
00422 status = ipers->getService(dbType, cnvSvc);
00423 if( !status.isSuccess() ) {
00424 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << dbType << endmsg;
00425 return status;
00426 }
00427
00428 m_pConversionSvc = cnvSvc;
00429 }
00430 else if ( svc.length() > 0 ) {
00431
00432 m_pConversionSvc = serviceLocator()->service(svc);
00433 if( !m_pConversionSvc.isValid() ) {
00434 log << MSG::FATAL << "Unable to locate IConversionSvc interface of " << svc << endmsg;
00435 return StatusCode::FAILURE;
00436 }
00437 }
00438 else {
00439 log << MSG::FATAL
00440 << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00441 << "You either have to specify a technology name or a service name!" << endmsg
00442 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
00443 return StatusCode::FAILURE;
00444 }
00445 return StatusCode::SUCCESS;
00446 }
00447
00448 StatusCode OutputStream::decodeAcceptAlgs( ) {
00449 return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00450 }
00451
00452 void OutputStream::acceptAlgsHandler( Property& ) {
00453 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00454 if (sc.isFailure()) {
00455 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00456 "OutputStream::acceptAlgsHandler",sc);
00457 }
00458 }
00459
00460 StatusCode OutputStream::decodeRequireAlgs( ) {
00461 return decodeAlgorithms( m_requireNames, m_requireAlgs );
00462 }
00463
00464 void OutputStream::requireAlgsHandler( Property& ) {
00465 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00466 if (sc.isFailure()) {
00467 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00468 "OutputStream::requireAlgsHandler",sc);
00469 }
00470 }
00471
00472 StatusCode OutputStream::decodeVetoAlgs( ) {
00473 return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00474 }
00475
00476 void OutputStream::vetoAlgsHandler( Property& ) {
00477 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00478 if (sc.isFailure()) {
00479 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00480 "OutputStream::vetoAlgsHandler",sc);
00481 }
00482 }
00483
00484 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00485 std::vector<Algorithm*>* theAlgs )
00486 {
00487
00488 theAlgs->clear( );
00489
00490 MsgStream log( msgSvc( ), name( ) );
00491
00492 StatusCode result = StatusCode::FAILURE;
00493
00494 SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00495 if ( theAlgMgr.isValid() ) {
00496
00497 const std::vector<std::string> nameList = theNames.value( );
00498 std::vector<std::string>::const_iterator it;
00499 std::vector<std::string>::const_iterator itend = nameList.end( );
00500 for (it = nameList.begin(); it != itend; ++it) {
00501
00502
00503 const std::string &theName = (*it);
00504 SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00505 Algorithm* theAlgorithm;
00506 if ( theIAlg.isValid() ) {
00507 result = StatusCode::SUCCESS;
00508 try{
00509 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00510 } catch(...){
00511 result = StatusCode::FAILURE;
00512 }
00513 }
00514 if ( result.isSuccess( ) ) {
00515
00516 std::vector<Algorithm*>::iterator ita;
00517 std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00518 for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00519 Algorithm* existAlgorithm = (*ita);
00520 if ( theAlgorithm == existAlgorithm ) {
00521 result = StatusCode::FAILURE;
00522 break;
00523 }
00524 }
00525 if ( result.isSuccess( ) ) {
00526 theAlgorithm->addRef();
00527 theAlgs->push_back( theAlgorithm );
00528 }
00529 }
00530 else {
00531 log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00532 }
00533 }
00534 result = StatusCode::SUCCESS;
00535 }
00536 else {
00537 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00538 }
00539 return result;
00540 }
00541
00542 bool OutputStream::isEventAccepted( ) const {
00543 typedef std::vector<Algorithm*>::iterator AlgIter;
00544 bool result = true;
00545
00546
00547
00548
00549
00550 if ( ! m_acceptAlgs->empty() ) {
00551 result = false;
00552 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00553 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00554 result = true;
00555 break;
00556 }
00557 }
00558 }
00559
00560
00561
00562
00563
00564 if ( result && ! m_requireAlgs->empty() ) {
00565 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00566 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00567 result = false;
00568 break;
00569 }
00570 }
00571 }
00572
00573
00574
00575
00576
00577 if ( result && ! m_vetoAlgs->empty() ) {
00578 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00579 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00580 result = false;
00581 break;
00582 }
00583 }
00584 }
00585 return result;
00586 }
00587
00588 bool OutputStream::hasInput() const {
00589 return !(m_itemNames.empty() && m_optItemNames.empty());
00590 }