00001
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003
00004
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 #include "GaudiKernel/Incident.h"
00016 #include "GaudiKernel/IIncidentSvc.h"
00017
00018 #include "GaudiKernel/MsgStream.h"
00019 #include "GaudiKernel/strcasecmp.h"
00020 #include "GaudiKernel/DataObject.h"
00021 #include "GaudiKernel/DataStoreItem.h"
00022 #include "OutputStream.h"
00023 #include "OutputStreamAgent.h"
00024
00025 #include <set>
00026
00027
00028 DECLARE_ALGORITHM_FACTORY(OutputStream)
00029
00030
00031 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00032 : Algorithm(name, pSvcLocator)
00033 {
00034 m_doPreLoad = true;
00035 m_doPreLoadOpt = false;
00036 m_verifyItems = true;
00037 m_output = "";
00038 m_outputName = "";
00039 m_outputType = "UPDATE";
00040 m_storeName = "EventDataSvc";
00041 m_persName = "EventPersistencySvc";
00042 m_agent = new OutputStreamAgent(this);
00043 m_acceptAlgs = new std::vector<Algorithm*>();
00044 m_requireAlgs = new std::vector<Algorithm*>();
00045 m_vetoAlgs = new std::vector<Algorithm*>();
00048 m_fireIncidents = true;
00049 declareProperty("ItemList", m_itemNames);
00050 declareProperty("OptItemList", m_optItemNames);
00051 declareProperty("Preload", m_doPreLoad);
00052 declareProperty("PreloadOptItems", m_doPreLoadOpt);
00053 declareProperty("Output", m_output);
00054 declareProperty("OutputFile", m_outputName);
00055 declareProperty("EvtDataSvc", m_storeName);
00056 declareProperty("EvtConversionSvc", m_persName);
00057 declareProperty("AcceptAlgs", m_acceptNames);
00058 declareProperty("RequireAlgs", m_requireNames);
00059 declareProperty("VetoAlgs", m_vetoNames);
00060 declareProperty("VerifyItems", m_verifyItems);
00063
00064
00065 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00066 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00067 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
00068 }
00069
00070
00071 OutputStream::~OutputStream() {
00072 delete m_agent;
00073 delete m_acceptAlgs;
00074 delete m_requireAlgs;
00075 delete m_vetoAlgs;
00076 }
00077
00078
00079 StatusCode OutputStream::initialize() {
00080 MsgStream log(msgSvc(), name());
00081
00082
00083 m_events = 0;
00084
00085 m_pDataManager = serviceLocator()->service(m_storeName);
00086 if( !m_pDataManager.isValid() ) {
00087 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00088 return StatusCode::FAILURE;
00089 }
00090
00091 m_incidentSvc = serviceLocator()->service("IncidentSvc");
00092 if( !m_incidentSvc.isValid() ) {
00093 log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
00094 return StatusCode::FAILURE;
00095 }
00096
00097 m_pDataProvider = serviceLocator()->service(m_storeName);
00098 if( !m_pDataProvider.isValid() ) {
00099 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00100 return StatusCode::FAILURE;
00101 }
00102 if ( hasInput() ) {
00103 StatusCode status = connectConversionSvc();
00104 if( !status.isSuccess() ) {
00105 log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00106 if(m_outputName!="" && m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00107 IncidentType::FailOutputFile));
00108 return status;
00109 }
00110 }
00111
00112
00113 clearItems(m_optItemList);
00114
00115 clearItems(m_itemList);
00116
00117 ItemNames::iterator i;
00118
00119 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
00120 addItem( m_itemList, *i );
00121 }
00122
00123
00124 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
00125 addItem( m_optItemList, *i );
00126 }
00127
00128
00129 if ( m_doPreLoad ) {
00130 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
00131 m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00132 }
00133
00134 }
00135
00136 if ( m_doPreLoadOpt ) {
00137 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00138 m_pDataProvider->addPreLoadItem( *(*j) );
00139 }
00140 }
00141 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152 decodeAcceptAlgs ().ignore();
00153 decodeRequireAlgs().ignore();
00154 decodeVetoAlgs ().ignore();
00155 return StatusCode::SUCCESS;
00156 }
00157
00158
00159 StatusCode OutputStream::finalize() {
00160 MsgStream log(msgSvc(), name());
00161 log << MSG::INFO << "Events output: " << m_events << endmsg;
00162 if(m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00163 IncidentType::EndOutputFile));
00164 m_incidentSvc.reset();
00165 m_pDataProvider.reset();
00166 m_pDataManager.reset();
00167 m_pConversionSvc.reset();
00168 clearItems(m_optItemList);
00169 clearItems(m_itemList);
00170 return StatusCode::SUCCESS;
00171 }
00172
00173
00174 StatusCode OutputStream::execute() {
00175
00176 clearSelection();
00177
00178 if ( isEventAccepted() ) {
00179 StatusCode sc = writeObjects();
00180 clearSelection();
00181 m_events++;
00182 if(sc.isSuccess() && m_fireIncidents)
00183 m_incidentSvc->fireIncident(Incident(m_outputName,
00184 IncidentType::WroteToOutputFile));
00185 else if(m_fireIncidents)
00186 m_incidentSvc->fireIncident(Incident(m_outputName,
00187 IncidentType::FailOutputFile));
00188 return sc;
00189 }
00190 return StatusCode::SUCCESS;
00191 }
00192
00193
00194 StatusCode OutputStream::writeObjects() {
00195
00196 StatusCode status = collectObjects();
00197 if ( status.isSuccess() ) {
00198 IDataSelector* sel = selectedObjects();
00199 if ( sel->begin() != sel->end() ) {
00200 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00201 if ( status.isSuccess() ) {
00202
00203 IDataSelector::iterator j;
00204 IOpaqueAddress* pAddress = 0;
00205 for ( j = sel->begin(); j != sel->end(); j++ ) {
00206 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00207 if ( !iret.isSuccess() ) {
00208 status = iret;
00209 continue;
00210 }
00211 IRegistry* pReg = (*j)->registry();
00212 pReg->setAddress(pAddress);
00213 }
00214 for ( j = sel->begin(); j != sel->end(); j++ ) {
00215 IRegistry* pReg = (*j)->registry();
00216 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00217 if ( !iret.isSuccess() ) {
00218 status = iret;
00219 }
00220 }
00221
00222 if ( status.isSuccess() ) {
00223 status = m_pConversionSvc->commitOutput(m_outputName, true);
00224 }
00225 else {
00226 m_pConversionSvc->commitOutput(m_outputName, false).ignore();
00227 }
00228 }
00229 }
00230 }
00231 return status;
00232 }
00233
00234
00235 bool OutputStream::collect(IRegistry* dir, int level) {
00236 if ( level < m_currentItem->depth() ) {
00237 if ( dir->object() != 0 ) {
00238
00239
00240
00241
00242
00243
00244
00245 m_objects.push_back(dir->object());
00246 return true;
00247 }
00248 }
00249 return false;
00250 }
00251
00253 StatusCode OutputStream::collectObjects() {
00254 MsgStream log(msgSvc(), name());
00255 StatusCode status = StatusCode::SUCCESS;
00256 Items::iterator i;
00257
00258 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
00259 DataObject* obj = 0;
00260 m_currentItem = (*i);
00261 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00262 if ( iret.isSuccess() ) {
00263 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00264 if ( !iret.isSuccess() ) {
00265 status = iret;
00266 }
00267 }
00268 else {
00269 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00270 << m_currentItem->path() << endmsg;
00271 status = iret;
00272 }
00273 }
00274
00275 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
00276 DataObject* obj = 0;
00277 m_currentItem = (*i);
00278 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00279 if ( iret.isSuccess() ) {
00280 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00281 }
00282 if ( !iret.isSuccess() ) {
00283 if(log.level() <= MSG::DEBUG )
00284 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00285 << m_currentItem->path() << endmsg;
00286 }
00287 }
00288
00289 if (status.isSuccess()){
00290
00291 std::set<DataObject*> unique;
00292 std::vector<DataObject*> tmp;
00293 tmp.reserve(m_objects.size());
00294 for (std::vector<DataObject*>::iterator o = m_objects.begin(); o != m_objects.end(); ++o) {
00295 if (!unique.count(*o)) {
00296
00297 unique.insert(*o);
00298 tmp.push_back(*o);
00299 }
00300 }
00301 m_objects.swap(tmp);
00302 }
00303
00304 return status;
00305 }
00306
00307
00308 void OutputStream::clearSelection() {
00309 m_objects.erase(m_objects.begin(), m_objects.end());
00310 }
00311
00312
00313 void OutputStream::clearItems(Items& itms) {
00314 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
00315 delete (*i);
00316 }
00317 itms.erase(itms.begin(), itms.end());
00318 }
00319
00320
00321 DataStoreItem*
00322 OutputStream::findItem(const std::string& path) {
00323 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
00324 if ( (*i)->path() == path ) return (*i);
00325 }
00326 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
00327 if ( (*j)->path() == path ) return (*j);
00328 }
00329 return 0;
00330 }
00331
00332
00333 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
00334 MsgStream log(msgSvc(), name());
00335 int level = 0;
00336 size_t sep = descriptor.rfind("#");
00337 std::string obj_path (descriptor,0,sep);
00338 std::string slevel (descriptor,sep+1,descriptor.length());
00339 if ( slevel == "*" ) {
00340 level = 9999999;
00341 }
00342 else {
00343 level = atoi(slevel.c_str());
00344 }
00345 if ( m_verifyItems ) {
00346 size_t idx = obj_path.find("/",1);
00347 while(idx != std::string::npos) {
00348 std::string sub_item = obj_path.substr(0,idx);
00349 if ( 0 == findItem(sub_item) ) {
00350 addItem(itms, sub_item+"#1");
00351 }
00352 idx = obj_path.find("/",idx+1);
00353 }
00354 }
00355 DataStoreItem* item = new DataStoreItem(obj_path, level);
00356 if(log.level() <= MSG::DEBUG )
00357 log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00358 << " with " << item->depth()
00359 << " level(s)." << endmsg;
00360 itms.push_back( item );
00361 }
00362
00363
00364 StatusCode OutputStream::connectConversionSvc() {
00365 StatusCode status = StatusCode(StatusCode::FAILURE, true);
00366 MsgStream log(msgSvc(), name());
00367
00368 std::string dbType, svc, shr;
00369 Tokenizer tok(true);
00370 tok.analyse(m_output, " ", "", "", "=", "'", "'");
00371 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
00372 const std::string& tag = (*i).tag();
00373 const std::string& val = (*i).value();
00374 switch( ::toupper(tag[0]) ) {
00375 case 'D':
00376 m_outputName = val;
00377 break;
00378 case 'T':
00379 dbType = val;
00380 break;
00381 case 'S':
00382 switch( ::toupper(tag[1]) ) {
00383 case 'V': svc = val; break;
00384 case 'H': shr = "YES"; break;
00385 }
00386 break;
00387 case 'O':
00388 switch( ::toupper(val[0]) ) {
00389 case 'R':
00390 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00391 m_outputType = "RECREATE";
00392 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00393 m_outputType = "READ";
00394 break;
00395 case 'C':
00396 case 'N':
00397 case 'W':
00398 m_outputType = "NEW";
00399 break;
00400 case 'U':
00401 m_outputType = "UPDATE";
00402 break;
00403 default:
00404 m_outputType = "???";
00405 break;
00406 }
00407 break;
00408 default:
00409 break;
00410 }
00411 }
00412 if ( !shr.empty() ) m_outputType += "|SHARED";
00413
00414
00415
00416
00417 if ( dbType.length() > 0 || svc.length() > 0 ) {
00418 std::string typ = dbType.length()>0 ? dbType : svc;
00419 SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00420 if( !ipers.isValid() ) {
00421 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00422 return StatusCode::FAILURE;
00423 }
00424 IConversionSvc *cnvSvc = 0;
00425 status = ipers->getService(typ, cnvSvc);
00426 if( !status.isSuccess() ) {
00427 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
00428 return status;
00429 }
00430
00431 m_pConversionSvc = cnvSvc;
00432 }
00433 else {
00434 log << MSG::FATAL
00435 << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00436 << "You either have to specify a technology name or a service name!" << endmsg
00437 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
00438 return StatusCode::FAILURE;
00439 }
00440 return StatusCode::SUCCESS;
00441 }
00442
00443 StatusCode OutputStream::decodeAcceptAlgs( ) {
00444 return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00445 }
00446
00447 void OutputStream::acceptAlgsHandler( Property& ) {
00448 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00449 if (sc.isFailure()) {
00450 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00451 "OutputStream::acceptAlgsHandler",sc);
00452 }
00453 }
00454
00455 StatusCode OutputStream::decodeRequireAlgs( ) {
00456 return decodeAlgorithms( m_requireNames, m_requireAlgs );
00457 }
00458
00459 void OutputStream::requireAlgsHandler( Property& ) {
00460 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00461 if (sc.isFailure()) {
00462 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00463 "OutputStream::requireAlgsHandler",sc);
00464 }
00465 }
00466
00467 StatusCode OutputStream::decodeVetoAlgs( ) {
00468 return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00469 }
00470
00471 void OutputStream::vetoAlgsHandler( Property& ) {
00472 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00473 if (sc.isFailure()) {
00474 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00475 "OutputStream::vetoAlgsHandler",sc);
00476 }
00477 }
00478
00479 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00480 std::vector<Algorithm*>* theAlgs )
00481 {
00482
00483 theAlgs->clear( );
00484
00485 MsgStream log( msgSvc( ), name( ) );
00486
00487 StatusCode result = StatusCode::FAILURE;
00488
00489 SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00490 if ( theAlgMgr.isValid() ) {
00491
00492 const std::vector<std::string> nameList = theNames.value( );
00493 std::vector<std::string>::const_iterator it;
00494 std::vector<std::string>::const_iterator itend = nameList.end( );
00495 for (it = nameList.begin(); it != itend; ++it) {
00496
00497
00498 const std::string &theName = (*it);
00499 SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00500 Algorithm* theAlgorithm;
00501 if ( theIAlg.isValid() ) {
00502 result = StatusCode::SUCCESS;
00503 try{
00504 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00505 } catch(...){
00506 result = StatusCode::FAILURE;
00507 }
00508 }
00509 if ( result.isSuccess( ) ) {
00510
00511 std::vector<Algorithm*>::iterator ita;
00512 std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00513 for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00514 Algorithm* existAlgorithm = (*ita);
00515 if ( theAlgorithm == existAlgorithm ) {
00516 result = StatusCode::FAILURE;
00517 break;
00518 }
00519 }
00520 if ( result.isSuccess( ) ) {
00521 theAlgorithm->addRef();
00522 theAlgs->push_back( theAlgorithm );
00523 }
00524 }
00525 else {
00526 log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00527 }
00528 }
00529 result = StatusCode::SUCCESS;
00530 }
00531 else {
00532 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00533 }
00534 return result;
00535 }
00536
00537 bool OutputStream::isEventAccepted( ) const {
00538 typedef std::vector<Algorithm*>::iterator AlgIter;
00539 bool result = true;
00540
00541
00542
00543
00544
00545 if ( ! m_acceptAlgs->empty() ) {
00546 result = false;
00547 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00548 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00549 result = true;
00550 break;
00551 }
00552 }
00553 }
00554
00555
00556
00557
00558
00559 if ( result && ! m_requireAlgs->empty() ) {
00560 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00561 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00562 result = false;
00563 break;
00564 }
00565 }
00566 }
00567
00568
00569
00570
00571
00572 if ( result && ! m_vetoAlgs->empty() ) {
00573 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00574 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00575 result = false;
00576 break;
00577 }
00578 }
00579 }
00580 return result;
00581 }
00582
00583 bool OutputStream::hasInput() const {
00584 return !(m_itemNames.empty() && m_optItemNames.empty());
00585 }