Gaudi Framework, version v22r0

Home   Generated: 9 Feb 2011

IODataManager.cpp

Go to the documentation of this file.
00001 // Framework include files
00002 #include "GaudiKernel/Debugger.h"
00003 #include "GaudiKernel/MsgStream.h"
00004 #include "GaudiKernel/strcasecmp.h"
00005 #include "GaudiKernel/DeclareFactoryEntries.h"
00006 #include "GaudiUtils/IFileCatalog.h"
00007 #include "IODataManager.h"
00008 #include "GaudiKernel/SmartIF.h"
00009 #include "GaudiKernel/Incident.h"
00010 #include "GaudiKernel/IIncidentSvc.h"
00011 
00012 #include <set>
00013 
00014 DECLARE_NAMESPACE_SERVICE_FACTORY(Gaudi,IODataManager)
00015 
00016 using namespace Gaudi;
00017 
00018 enum { S_OK = StatusCode::SUCCESS, S_ERROR=StatusCode::FAILURE };
00019 
00020 static std::set<std::string>    s_badFiles;
00021 
00022 IODataManager::IODataManager(CSTR nam, ISvcLocator* svcloc)
00023   : base_class(nam, svcloc), m_ageLimit(2)
00024 {
00025   declareProperty("CatalogType",     m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
00026   declareProperty("UseGFAL",         m_useGFAL = true);
00027   declareProperty("QuarantineFiles", m_quarantine = true);
00028   declareProperty("AgeLimit",        m_ageLimit = 2);
00029 }
00030 
00032 StatusCode IODataManager::initialize()  {
00033   // Initialize base class
00034   StatusCode status = Service::initialize();
00035   MsgStream log(msgSvc(), name());
00036   if ( !status.isSuccess() )    {
00037     log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
00038     return status;
00039   }
00040   // Retrieve conversion service handling event iteration
00041   m_catalog = serviceLocator()->service(m_catalogSvcName);
00042   if( !m_catalog.isValid() ) {
00043     log << MSG::ERROR
00044         << "Unable to localize interface IFileCatalog from service:"
00045         << m_catalogSvcName << endmsg;
00046     return StatusCode::FAILURE;
00047   }
00048   m_incSvc = serviceLocator()->service("IncidentSvc");
00049   if( !m_incSvc.isValid() ) {
00050     log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
00051     return status;
00052   }
00053   
00054   return status;
00055 }
00056 
00058 StatusCode IODataManager::finalize()  {
00059   m_catalog = 0; // release
00060   return Service::finalize();
00061 }
00062 
00063 // Small routine to issue exceptions
00064 StatusCode IODataManager::error(CSTR msg, bool rethrow)  {
00065   MsgStream log(msgSvc(),name());
00066   log << MSG::ERROR << "Error: " << msg << endmsg;
00067   if ( rethrow )  {
00068     System::breakExecution();
00069   }
00070   return S_ERROR;
00071 }
00073 IODataManager::Connections IODataManager::connections(const IInterface* owner) const  {
00074   Connections conns;
00075   for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00076     IDataConnection* c = (*i).second->connection;
00077     if ( 0 == owner || c->owner() == owner )
00078       conns.push_back(c);
00079   }
00080   return conns;
00081 }
00082 
00084 StatusCode IODataManager::connectRead(bool keep_open, Connection* con)  {
00085   if ( !establishConnection(con) )  {
00086     return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
00087   }
00088   std::string dsn = con ? con->name() : std::string("Unknown");
00089   return error("Failed to connect to data:"+dsn,false);
00090 }
00091 
00093 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype)  {
00094   if ( !establishConnection(con) )  {
00095     return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
00096   }
00097   std::string dsn = con ? con->name() : std::string("Unknown");
00098   return error("Failed to connect to data:"+dsn,false);
00099 }
00100 
00102 StatusCode IODataManager::read(Connection* con, void* const data, size_t len)  {
00103   return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
00104 }
00105 
00107 StatusCode IODataManager::write(Connection* con, const void* data, int len)  {
00108   return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
00109 }
00110 
00112 long long int IODataManager::seek(Connection* con, long long int where, int origin)  {
00113   return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
00114 }
00115 
00116 StatusCode IODataManager::disconnect(Connection* con)    {
00117   if ( con )  {
00118     std::string dataset = con->name();
00119     std::string dsn = dataset;
00120     StatusCode sc = con->disconnect();
00121     if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00122       dsn = dataset.substr(4);
00123     else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00124       dsn = dataset.substr(4);
00125     else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00126       dsn = dataset.substr(4);
00127 
00128     FidMap::iterator j = m_fidMap.find(dataset);
00129     if ( j != m_fidMap.end() )  {
00130       std::string fid = (*j).second;
00131       std::string gfal_name = "gfal:guid:" + fid;
00132       ConnectionMap::iterator i=m_connectionMap.find(fid);
00133       m_fidMap.erase(j);
00134       if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
00135         m_fidMap.erase(j);
00136       if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
00137         m_fidMap.erase(j);
00138       if ( i != m_connectionMap.end() ) {
00139         if ( (*i).second )  {
00140           IDataConnection* c = (*i).second->connection;
00141           std::string pfn = c->pfn();
00142           if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
00143             m_fidMap.erase(j);
00144           if ( c->isConnected() )  {
00145             MsgStream log(msgSvc(),name());
00146             c->disconnect();
00147             log << MSG::INFO << "Disconnect from dataset " << dsn
00148                 << " [" << fid << "]" << endmsg;
00149           }
00150           delete (*i).second;
00151           m_connectionMap.erase(i);
00152         }
00153       }
00154     }
00155     return sc;
00156   }
00157   return S_ERROR;
00158 }
00159 
00160 StatusCode IODataManager::reconnect(Entry* e)  {
00161   StatusCode sc = S_ERROR;
00162   if ( e && e->connection )  {
00163     switch(e->ioType)  {
00164     case Connection::READ:
00165       sc = e->connection->connectRead();
00166       break;
00167     case Connection::UPDATE:
00168     case Connection::CREATE:
00169     case Connection::RECREATE:
00170       sc = e->connection->connectWrite(e->ioType);
00171       break;
00172     default:
00173       return S_ERROR;
00174     }
00175     if ( sc.isSuccess() && e->ioType == Connection::READ )  {
00176       std::vector<Entry*> to_retire;
00177       e->connection->resetAge();
00178       for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00179         IDataConnection* c = (*i).second->connection;
00180         if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen )  {
00181           c->ageFile();
00182           if ( c->age() > m_ageLimit ) {
00183             to_retire.push_back((*i).second);
00184           }
00185         }
00186       }
00187       if ( !to_retire.empty() )  {
00188         MsgStream log(msgSvc(),name());
00189         for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j)  {
00190           IDataConnection* c = (*j)->connection;
00191           c->disconnect();
00192           log << MSG::INFO << "Disconnect from dataset " << c->pfn()
00193               << " [" << c->fid() << "]" << endmsg;
00194         }
00195       }
00196     }
00197   }
00198   return sc;
00199 }
00200 
00202 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const  {
00203   FidMap::const_iterator j = m_fidMap.find(dataset);
00204   if ( j != m_fidMap.end() )  {
00205     ConnectionMap::const_iterator i=m_connectionMap.find((*j).second);
00206     return (i != m_connectionMap.end()) ? (*i).second->connection : 0;
00207   }
00208   return 0;
00209 }
00210 
00211 StatusCode IODataManager::establishConnection(Connection* con)  {
00212   if ( con )  {
00213     if ( !con->isConnected() )  {
00214       ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
00215       if ( i != m_connectionMap.end() )  {
00216         Connection* c = (*i).second->connection;
00217         if ( c != con )  { 
00218           m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
00219           return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
00220         }
00221         if ( reconnect((*i).second).isSuccess() ) {
00222           return S_OK;
00223         }
00224       }
00225       return S_ERROR;
00226     }
00227     con->resetAge();
00228     return S_OK;
00229   }
00230   return error("Severe logic bug: No connection object avalible.",true);
00231 }
00232 
00233 StatusCode
00234 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection)  {
00235   MsgStream log(msgSvc(),name());
00236   std::string dsn = dataset;
00237   try  {
00238     StatusCode sc(StatusCode::SUCCESS,true);
00239     if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00240       dsn = dataset.substr(4), typ = FID;
00241     else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00242       dsn = dataset.substr(4), typ = LFN;
00243     else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00244       dsn = dataset.substr(4), typ = PFN;
00245     else if ( typ == UNKNOWN )
00246       return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
00247 
00248     if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end())  {
00249       m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00250       return IDataConnection::BAD_DATA_CONNECTION;
00251     }
00252     if ( typ == FID )  {
00253       ConnectionMap::iterator fi = m_connectionMap.find(dsn);
00254       if ( fi == m_connectionMap.end() )  {
00255         IFileCatalog::Files files;
00256         m_catalog->getPFN(dsn,files);
00257         if ( files.size() == 0 ) {
00258           if ( !m_useGFAL )   {
00259             if ( m_quarantine ) s_badFiles.insert(dsn);
00260             m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00261             error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
00262             return IDataConnection::BAD_DATA_CONNECTION;
00263           }
00264           else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' )  {
00265             std::string gfal_name = "gfal:guid:" + dsn;
00266             m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
00267             sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
00268             if ( sc.isSuccess() ) return sc;
00269             if ( m_quarantine ) s_badFiles.insert(dsn);
00270           }
00271           if ( m_quarantine ) s_badFiles.insert(dsn);
00272           m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00273           error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
00274           return IDataConnection::BAD_DATA_CONNECTION;
00275         }
00276         std::string pfn = files[0].first;
00277         m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
00278         sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
00279         if ( !sc.isSuccess() )  {
00280           if ( m_quarantine ) s_badFiles.insert(pfn);
00281           m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
00282           return IDataConnection::BAD_DATA_CONNECTION;
00283         }
00284         
00285         return sc;
00286       }
00287       return S_ERROR;
00288       //Connection* c = (*fi).second->connection;
00289       //sc = connectDataIO(PFN, rw, c->pfn(), technology, keep_open, connection);
00290       //if ( !sc.isSuccess() && m_quarantine ) s_badFiles.insert(c->pfn());
00291       //return sc;
00292     }
00293     std::string fid;
00294     FidMap::iterator j = m_fidMap.find(dsn);
00295     if ( j == m_fidMap.end() )  {
00296       IFileCatalog::Files files;
00297       switch(typ)  {
00298       case LFN:
00299         fid = m_catalog->lookupLFN(dsn);
00300         if ( fid.empty() )  {
00301           m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00302           log << MSG::ERROR << "Failed to resolve LFN:" << dsn
00303               << " Cannot access this dataset." << endmsg;
00304           return IDataConnection::BAD_DATA_CONNECTION;
00305         }
00306         break;
00307       case PFN:
00308         fid = m_catalog->lookupPFN(dsn);
00309         if ( !fid.empty() ) m_catalog->getPFN(fid, files);
00310         if ( files.empty() )   {
00311           if ( rw == Connection::CREATE || rw == Connection::RECREATE )  {
00312             if ( fid.empty() ) fid = m_catalog->createFID();
00313             m_catalog->registerPFN(fid,dsn,technology);
00314             log << MSG::INFO << "Referring to dataset " << dsn
00315                 << " by its file ID:" << fid << endmsg;
00316           }
00317           else  {
00318             fid = dsn;
00319           }
00320         }
00321         break;
00322       }
00323     }
00324     else {
00325       fid = (*j).second;
00326     }
00327     if ( typ == PFN )  {
00328       // Open PFN
00329       ConnectionMap::iterator fi = m_connectionMap.find(fid);
00330       if ( fi == m_connectionMap.end() )  {
00331         connection->setFID(fid);
00332         connection->setPFN(dsn);
00333         Entry* e = new Entry(technology, keep_open, rw, connection);
00334         // Here we open the file!
00335         if ( !reconnect(e).isSuccess() )   {
00336           delete e;
00337           if ( m_quarantine ) s_badFiles.insert(dsn);
00338           m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00339           error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
00340           return IDataConnection::BAD_DATA_CONNECTION;
00341         }
00342         fid = connection->fid();
00343         m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
00344         if (  !(rw==Connection::CREATE || rw==Connection::RECREATE) )  {
00345           if ( strcasecmp(dsn.c_str(),fid.c_str()) == 0 )  {
00346             log << MSG::ERROR << "Referring to existing dataset " << dsn
00347                 << " by its physical name." << endmsg;
00348             log << "You may not be able to navigate back to the input file"
00349                 << " -- processing continues" << endmsg;
00350           }
00351         }
00352         m_connectionMap.insert(std::make_pair(fid,e)).first;
00353         return S_OK;
00354       }
00355       // Here we open the file!
00356       if ( !reconnect((*fi).second).isSuccess() )   {
00357         if ( m_quarantine ) s_badFiles.insert(dsn);
00358         m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00359         error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
00360         return IDataConnection::BAD_DATA_CONNECTION;
00361       }
00362       return S_OK;
00363     }
00364     sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
00365     if ( !sc.isSuccess() && m_quarantine ) {
00366       s_badFiles.insert(fid);
00367     }
00368     else if ( typ == LFN ) {
00369       m_fidMap[dataset] = fid;
00370     }
00371     return sc;
00372   }
00373   catch (std::exception& e)  {
00374     error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
00375   }
00376   catch(...) {
00377     error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
00378   }
00379   m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00380   error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
00381   s_badFiles.insert(dsn);
00382   return IDataConnection::BAD_DATA_CONNECTION;
00383 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Wed Feb 9 16:25:04 2011 for Gaudi Framework, version v22r0 by Doxygen version 1.6.2 written by Dimitri van Heesch, © 1997-2004