00001
00002 #define GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_CPP
00003
00004
00005 #include "GaudiKernel/Tokenizer.h"
00006 #include "GaudiKernel/AlgFactory.h"
00007 #include "GaudiKernel/IRegistry.h"
00008 #include "GaudiKernel/IAlgManager.h"
00009 #include "GaudiKernel/ISvcLocator.h"
00010 #include "GaudiKernel/IConversionSvc.h"
00011 #include "GaudiKernel/IDataManagerSvc.h"
00012 #include "GaudiKernel/IDataProviderSvc.h"
00013 #include "GaudiKernel/IPersistencySvc.h"
00014 #include "GaudiKernel/IOpaqueAddress.h"
00015 #include "GaudiKernel/Incident.h"
00016 #include "GaudiKernel/IIncidentSvc.h"
00017
00018 #include "GaudiKernel/MsgStream.h"
00019 #include "GaudiKernel/strcasecmp.h"
00020 #include "GaudiKernel/DataObject.h"
00021 #include "GaudiKernel/DataStoreItem.h"
00022 #include "OutputStream.h"
00023 #include "OutputStreamAgent.h"
00024
00025 #include <set>
00026
00027
00028 DECLARE_ALGORITHM_FACTORY(OutputStream)
00029
00030
00031 OutputStream::OutputStream(const std::string& name, ISvcLocator* pSvcLocator)
00032 : Algorithm(name, pSvcLocator)
00033 {
00034 m_doPreLoad = true;
00035 m_doPreLoadOpt = false;
00036 m_verifyItems = true;
00037 m_output = "";
00038 m_outputName = "";
00039 m_outputType = "UPDATE";
00040 m_storeName = "EventDataSvc";
00041 m_persName = "EventPersistencySvc";
00042 m_agent = new OutputStreamAgent(this);
00043 m_acceptAlgs = new std::vector<Algorithm*>();
00044 m_requireAlgs = new std::vector<Algorithm*>();
00045 m_vetoAlgs = new std::vector<Algorithm*>();
00048 m_fireIncidents = true;
00049 declareProperty("ItemList", m_itemNames);
00050 declareProperty("OptItemList", m_optItemNames);
00051 declareProperty("Preload", m_doPreLoad);
00052 declareProperty("PreloadOptItems", m_doPreLoadOpt);
00053 declareProperty("Output", m_output);
00054 declareProperty("OutputFile", m_outputName);
00055 declareProperty("EvtDataSvc", m_storeName);
00056 declareProperty("EvtConversionSvc", m_persName);
00057 declareProperty("AcceptAlgs", m_acceptNames);
00058 declareProperty("RequireAlgs", m_requireNames);
00059 declareProperty("VetoAlgs", m_vetoNames);
00060 declareProperty("VerifyItems", m_verifyItems);
00063
00064
00065 m_acceptNames.declareUpdateHandler ( &OutputStream::acceptAlgsHandler , this );
00066 m_requireNames.declareUpdateHandler( &OutputStream::requireAlgsHandler, this );
00067 m_vetoNames.declareUpdateHandler ( &OutputStream::vetoAlgsHandler , this );
00068 }
00069
00070
00071 OutputStream::~OutputStream() {
00072 delete m_agent;
00073 delete m_acceptAlgs;
00074 delete m_requireAlgs;
00075 delete m_vetoAlgs;
00076 }
00077
00078
00079 StatusCode OutputStream::initialize() {
00080 MsgStream log(msgSvc(), name());
00081
00082
00083 m_events = 0;
00084
00085 m_pDataManager = serviceLocator()->service(m_storeName);
00086 if( !m_pDataManager.isValid() ) {
00087 log << MSG::FATAL << "Unable to locate IDataManagerSvc interface" << endmsg;
00088 return StatusCode::FAILURE;
00089 }
00090
00091 m_incidentSvc = serviceLocator()->service("IncidentSvc");
00092 if( !m_incidentSvc.isValid() ) {
00093 log << MSG::WARNING << "Error retrieving IncidentSvc." << endmsg;
00094 return StatusCode::FAILURE;
00095 }
00096
00097 m_pDataProvider = serviceLocator()->service(m_storeName);
00098 if( !m_pDataProvider.isValid() ) {
00099 log << MSG::FATAL << "Unable to locate IDataProviderSvc interface of " << m_storeName << endmsg;
00100 return StatusCode::FAILURE;
00101 }
00102 if ( hasInput() ) {
00103 StatusCode status = connectConversionSvc();
00104 if( !status.isSuccess() ) {
00105 log << MSG::FATAL << "Unable to connect to conversion service." << endmsg;
00106 if(m_outputName!="" && m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00107 IncidentType::FailOutputFile));
00108 return status;
00109 }
00110 }
00111
00112
00113 clearItems(m_optItemList);
00114
00115 clearItems(m_itemList);
00116
00117 ItemNames::iterator i;
00118
00119 for(i = m_itemNames.begin(); i != m_itemNames.end(); i++) {
00120 addItem( m_itemList, *i );
00121 }
00122
00123
00124 for(i = m_optItemNames.begin(); i != m_optItemNames.end(); i++) {
00125 addItem( m_optItemList, *i );
00126 }
00127
00128
00129 if ( m_doPreLoad ) {
00130 for(Items::iterator j = m_itemList.begin(); j != m_itemList.end(); j++) {
00131 m_pDataProvider->addPreLoadItem( *(*j) ).ignore();
00132 }
00133
00134 }
00135
00136 if ( m_doPreLoadOpt ) {
00137 for(Items::iterator j=m_optItemList.begin(); j!=m_optItemList.end(); j++) {
00138 m_pDataProvider->addPreLoadItem( *(*j) );
00139 }
00140 }
00141 log << MSG::INFO << "Data source: " << m_storeName << " output: " << m_output << endmsg;
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152 decodeAcceptAlgs ().ignore();
00153 decodeRequireAlgs().ignore();
00154 decodeVetoAlgs ().ignore();
00155 return StatusCode::SUCCESS;
00156 }
00157
00158
00159 StatusCode OutputStream::finalize() {
00160 MsgStream log(msgSvc(), name());
00161 log << MSG::INFO << "Events output: " << m_events << endmsg;
00162 if(m_fireIncidents) m_incidentSvc->fireIncident(Incident(m_outputName,
00163 IncidentType::EndOutputFile));
00164 m_incidentSvc.reset();
00165 m_pDataProvider.reset();
00166 m_pDataManager.reset();
00167 m_pConversionSvc.reset();
00168 clearItems(m_optItemList);
00169 clearItems(m_itemList);
00170 return StatusCode::SUCCESS;
00171 }
00172
00173
00174 StatusCode OutputStream::execute() {
00175
00176 clearSelection();
00177
00178 if ( isEventAccepted() ) {
00179 StatusCode sc = writeObjects();
00180 clearSelection();
00181 m_events++;
00182 if(sc.isSuccess() && m_fireIncidents)
00183 m_incidentSvc->fireIncident(Incident(m_outputName,
00184 IncidentType::WroteToOutputFile));
00185 else if(m_fireIncidents)
00186 m_incidentSvc->fireIncident(Incident(m_outputName,
00187 IncidentType::FailOutputFile));
00188 return sc;
00189 }
00190 return StatusCode::SUCCESS;
00191 }
00192
00193
00194 StatusCode OutputStream::writeObjects() {
00195
00196 StatusCode status = collectObjects();
00197 if ( status.isSuccess() ) {
00198 IDataSelector* sel = selectedObjects();
00199 if ( sel->begin() != sel->end() ) {
00200 status = m_pConversionSvc->connectOutput(m_outputName, m_outputType);
00201 if ( status.isSuccess() ) {
00202
00203 IDataSelector::iterator j;
00204 IOpaqueAddress* pAddress = 0;
00205 for ( j = sel->begin(); j != sel->end(); j++ ) {
00206 StatusCode iret = m_pConversionSvc->createRep( *j, pAddress );
00207 if ( !iret.isSuccess() ) {
00208 status = iret;
00209 continue;
00210 }
00211 IRegistry* pReg = (*j)->registry();
00212 pReg->setAddress(pAddress);
00213 }
00214 for ( j = sel->begin(); j != sel->end(); j++ ) {
00215 IRegistry* pReg = (*j)->registry();
00216 StatusCode iret = m_pConversionSvc->fillRepRefs( pReg->address(), *j );
00217 if ( !iret.isSuccess() ) {
00218 status = iret;
00219 }
00220 }
00221
00222 if ( status.isSuccess() ) {
00223 status = m_pConversionSvc->commitOutput(m_outputName, true);
00224 }
00225 else {
00226 m_pConversionSvc->commitOutput(m_outputName, false).ignore();
00227 }
00228 }
00229 }
00230 }
00231 return status;
00232 }
00233
00234
00235 bool OutputStream::collect(IRegistry* dir, int level) {
00236 if ( level < m_currentItem->depth() ) {
00237 if ( dir->object() != 0 ) {
00238
00239
00240
00241
00242
00243
00244
00245 m_objects.push_back(dir->object());
00246 return true;
00247 }
00248 }
00249 return false;
00250 }
00251
00253 StatusCode OutputStream::collectObjects() {
00254 MsgStream log(msgSvc(), name());
00255 StatusCode status = StatusCode::SUCCESS;
00256 Items::iterator i;
00257
00258 for ( i = m_itemList.begin(); i != m_itemList.end(); i++ ) {
00259 DataObject* obj = 0;
00260 m_currentItem = (*i);
00261 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00262 if ( iret.isSuccess() ) {
00263 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00264 if ( !iret.isSuccess() ) {
00265 status = iret;
00266 }
00267 }
00268 else {
00269 log << MSG::ERROR << "Cannot write mandatory object(s) (Not found) "
00270 << m_currentItem->path() << endmsg;
00271 status = iret;
00272 }
00273 }
00274
00275 for ( i = m_optItemList.begin(); i != m_optItemList.end(); i++ ) {
00276 DataObject* obj = 0;
00277 m_currentItem = (*i);
00278 StatusCode iret = m_pDataProvider->retrieveObject(m_currentItem->path(), obj);
00279 if ( iret.isSuccess() ) {
00280 iret = m_pDataManager->traverseSubTree(obj, m_agent);
00281 }
00282 if ( !iret.isSuccess() ) {
00283 log << MSG::DEBUG << "Ignore request to write non-mandatory object(s) "
00284 << m_currentItem->path() << endmsg;
00285 }
00286 }
00287
00288 if (status.isSuccess()){
00289
00290 std::set<DataObject*> unique;
00291 std::vector<DataObject*> tmp;
00292 tmp.reserve(m_objects.size());
00293 for (std::vector<DataObject*>::iterator o = m_objects.begin(); o != m_objects.end(); ++o) {
00294 if (!unique.count(*o)) {
00295
00296 unique.insert(*o);
00297 tmp.push_back(*o);
00298 }
00299 }
00300 m_objects.swap(tmp);
00301 }
00302
00303 return status;
00304 }
00305
00306
00307 void OutputStream::clearSelection() {
00308 m_objects.erase(m_objects.begin(), m_objects.end());
00309 }
00310
00311
00312 void OutputStream::clearItems(Items& itms) {
00313 for ( Items::iterator i = itms.begin(); i != itms.end(); i++ ) {
00314 delete (*i);
00315 }
00316 itms.erase(itms.begin(), itms.end());
00317 }
00318
00319
00320 DataStoreItem*
00321 OutputStream::findItem(const std::string& path) {
00322 for(Items::const_iterator i=m_itemList.begin(); i != m_itemList.end(); ++i) {
00323 if ( (*i)->path() == path ) return (*i);
00324 }
00325 for(Items::const_iterator j=m_optItemList.begin(); j != m_optItemList.end(); ++j) {
00326 if ( (*j)->path() == path ) return (*j);
00327 }
00328 return 0;
00329 }
00330
00331
00332 void OutputStream::addItem(Items& itms, const std::string& descriptor) {
00333 MsgStream log(msgSvc(), name());
00334 int level = 0;
00335 size_t sep = descriptor.rfind("#");
00336 std::string obj_path (descriptor,0,sep);
00337 std::string slevel (descriptor,sep+1,descriptor.length());
00338 if ( slevel == "*" ) {
00339 level = 9999999;
00340 }
00341 else {
00342 level = atoi(slevel.c_str());
00343 }
00344 if ( m_verifyItems ) {
00345 size_t idx = obj_path.find("/",1);
00346 while(idx != std::string::npos) {
00347 std::string sub_item = obj_path.substr(0,idx);
00348 if ( 0 == findItem(sub_item) ) {
00349 addItem(itms, sub_item+"#1");
00350 }
00351 idx = obj_path.find("/",idx+1);
00352 }
00353 }
00354 DataStoreItem* item = new DataStoreItem(obj_path, level);
00355 log << MSG::DEBUG << "Adding OutputStream item " << item->path()
00356 << " with " << item->depth()
00357 << " level(s)." << endmsg;
00358 itms.push_back( item );
00359 }
00360
00361
00362 StatusCode OutputStream::connectConversionSvc() {
00363 StatusCode status = StatusCode(StatusCode::FAILURE, true);
00364 MsgStream log(msgSvc(), name());
00365
00366 std::string dbType, svc, shr;
00367 Tokenizer tok(true);
00368 tok.analyse(m_output, " ", "", "", "=", "'", "'");
00369 for(Tokenizer::Items::iterator i = tok.items().begin(); i != tok.items().end(); ++i) {
00370 const std::string& tag = (*i).tag();
00371 const std::string& val = (*i).value();
00372 switch( ::toupper(tag[0]) ) {
00373 case 'D':
00374 m_outputName = val;
00375 break;
00376 case 'T':
00377 dbType = val;
00378 break;
00379 case 'S':
00380 switch( ::toupper(tag[1]) ) {
00381 case 'V': svc = val; break;
00382 case 'H': shr = "YES"; break;
00383 }
00384 break;
00385 case 'O':
00386 switch( ::toupper(val[0]) ) {
00387 case 'R':
00388 if ( ::strncasecmp(val.c_str(),"RECREATE",3)==0 )
00389 m_outputType = "RECREATE";
00390 else if ( ::strncasecmp(val.c_str(),"READ",3)==0 )
00391 m_outputType = "READ";
00392 break;
00393 case 'C':
00394 case 'N':
00395 case 'W':
00396 m_outputType = "NEW";
00397 break;
00398 case 'U':
00399 m_outputType = "UPDATE";
00400 break;
00401 default:
00402 m_outputType = "???";
00403 break;
00404 }
00405 break;
00406 default:
00407 break;
00408 }
00409 }
00410 if ( !shr.empty() ) m_outputType += "|SHARED";
00411
00412
00413
00414
00415 if ( dbType.length() > 0 || svc.length() > 0 ) {
00416 std::string typ = dbType.length()>0 ? dbType : svc;
00417 SmartIF<IPersistencySvc> ipers(serviceLocator()->service(m_persName));
00418 if( !ipers.isValid() ) {
00419 log << MSG::FATAL << "Unable to locate IPersistencySvc interface of " << m_persName << endmsg;
00420 return StatusCode::FAILURE;
00421 }
00422 IConversionSvc *cnvSvc = 0;
00423 status = ipers->getService(typ, cnvSvc);
00424 if( !status.isSuccess() ) {
00425 log << MSG::FATAL << "Unable to locate IConversionSvc interface of database type " << typ << endmsg;
00426 return status;
00427 }
00428
00429 m_pConversionSvc = cnvSvc;
00430 }
00431 else {
00432 log << MSG::FATAL
00433 << "Unable to locate IConversionSvc interface (Unknown technology) " << endmsg
00434 << "You either have to specify a technology name or a service name!" << endmsg
00435 << "Please correct the job option \"" << name() << ".Output\" !" << endmsg;
00436 return StatusCode::FAILURE;
00437 }
00438 return StatusCode::SUCCESS;
00439 }
00440
00441 StatusCode OutputStream::decodeAcceptAlgs( ) {
00442 return decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00443 }
00444
00445 void OutputStream::acceptAlgsHandler( Property& ) {
00446 StatusCode sc = decodeAlgorithms( m_acceptNames, m_acceptAlgs );
00447 if (sc.isFailure()) {
00448 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00449 "OutputStream::acceptAlgsHandler",sc);
00450 }
00451 }
00452
00453 StatusCode OutputStream::decodeRequireAlgs( ) {
00454 return decodeAlgorithms( m_requireNames, m_requireAlgs );
00455 }
00456
00457 void OutputStream::requireAlgsHandler( Property& ) {
00458 StatusCode sc = decodeAlgorithms( m_requireNames, m_requireAlgs );
00459 if (sc.isFailure()) {
00460 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00461 "OutputStream::requireAlgsHandler",sc);
00462 }
00463 }
00464
00465 StatusCode OutputStream::decodeVetoAlgs( ) {
00466 return decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00467 }
00468
00469 void OutputStream::vetoAlgsHandler( Property& ) {
00470 StatusCode sc = decodeAlgorithms( m_vetoNames, m_vetoAlgs );
00471 if (sc.isFailure()) {
00472 throw GaudiException("Failure in OutputStream::decodeAlgorithms",
00473 "OutputStream::vetoAlgsHandler",sc);
00474 }
00475 }
00476
00477 StatusCode OutputStream::decodeAlgorithms( StringArrayProperty& theNames,
00478 std::vector<Algorithm*>* theAlgs )
00479 {
00480
00481 theAlgs->clear( );
00482
00483 MsgStream log( msgSvc( ), name( ) );
00484
00485 StatusCode result = StatusCode::FAILURE;
00486
00487 SmartIF<IAlgManager> theAlgMgr(serviceLocator());
00488 if ( theAlgMgr.isValid() ) {
00489
00490 const std::vector<std::string> nameList = theNames.value( );
00491 std::vector<std::string>::const_iterator it;
00492 std::vector<std::string>::const_iterator itend = nameList.end( );
00493 for (it = nameList.begin(); it != itend; ++it) {
00494
00495
00496 const std::string &theName = (*it);
00497 SmartIF<IAlgorithm> &theIAlg = theAlgMgr->algorithm(theName);
00498 Algorithm* theAlgorithm;
00499 if ( theIAlg.isValid() ) {
00500 result = StatusCode::SUCCESS;
00501 try{
00502 theAlgorithm = dynamic_cast<Algorithm*>(theIAlg.get());
00503 } catch(...){
00504 result = StatusCode::FAILURE;
00505 }
00506 }
00507 if ( result.isSuccess( ) ) {
00508
00509 std::vector<Algorithm*>::iterator ita;
00510 std::vector<Algorithm*>::iterator itaend = theAlgs->end( );
00511 for (ita = theAlgs->begin(); ita != itaend; ++ita) {
00512 Algorithm* existAlgorithm = (*ita);
00513 if ( theAlgorithm == existAlgorithm ) {
00514 result = StatusCode::FAILURE;
00515 break;
00516 }
00517 }
00518 if ( result.isSuccess( ) ) {
00519 theAlgorithm->addRef();
00520 theAlgs->push_back( theAlgorithm );
00521 }
00522 }
00523 else {
00524 log << MSG::INFO << theName << " doesn't exist - ignored" << endmsg;
00525 }
00526 }
00527 result = StatusCode::SUCCESS;
00528 }
00529 else {
00530 log << MSG::FATAL << "Can't locate ApplicationMgr!!!" << endmsg;
00531 }
00532 return result;
00533 }
00534
00535 bool OutputStream::isEventAccepted( ) const {
00536 typedef std::vector<Algorithm*>::iterator AlgIter;
00537 bool result = true;
00538
00539
00540
00541
00542
00543 if ( ! m_acceptAlgs->empty() ) {
00544 result = false;
00545 for(AlgIter i=m_acceptAlgs->begin(),end=m_acceptAlgs->end(); i != end; ++i) {
00546 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00547 result = true;
00548 break;
00549 }
00550 }
00551 }
00552
00553
00554
00555
00556
00557 if ( result && ! m_requireAlgs->empty() ) {
00558 for(AlgIter i=m_requireAlgs->begin(),end=m_requireAlgs->end(); i != end; ++i) {
00559 if ( !(*i)->isExecuted() || !(*i)->filterPassed() ) {
00560 result = false;
00561 break;
00562 }
00563 }
00564 }
00565
00566
00567
00568
00569
00570 if ( result && ! m_vetoAlgs->empty() ) {
00571 for(AlgIter i=m_vetoAlgs->begin(),end=m_vetoAlgs->end(); i != end; ++i) {
00572 if ( (*i)->isExecuted() && (*i)->filterPassed() ) {
00573 result = false;
00574 break;
00575 }
00576 }
00577 }
00578 return result;
00579 }
00580
00581 bool OutputStream::hasInput() const {
00582 return !(m_itemNames.empty() && m_optItemNames.empty());
00583 }