00001
00002 #include "GaudiKernel/Debugger.h"
00003 #include "GaudiKernel/MsgStream.h"
00004 #include "GaudiKernel/strcasecmp.h"
00005 #include "GaudiKernel/DeclareFactoryEntries.h"
00006 #include "GaudiUtils/IFileCatalog.h"
00007 #include "IODataManager.h"
00008
00009 #include <set>
00010
00011 DECLARE_NAMESPACE_SERVICE_FACTORY(Gaudi,IODataManager);
00012
00013 using namespace Gaudi;
00014
00015 enum { S_OK = StatusCode::SUCCESS, S_ERROR=StatusCode::FAILURE };
00016
00017 static std::set<std::string> s_badFiles;
00018
00019 IODataManager::IODataManager(CSTR nam, ISvcLocator* svcloc)
00020 : base_class(nam, svcloc), m_ageLimit(2)
00021 {
00022 declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
00023 declareProperty("UseGFAL", m_useGFAL = true);
00024 declareProperty("QuarantineFiles", m_quarantine = true);
00025 declareProperty("AgeLimit", m_ageLimit = 2);
00026 }
00027
00029 StatusCode IODataManager::initialize() {
00030
00031 StatusCode status = Service::initialize();
00032 MsgStream log(msgSvc(), name());
00033 if ( !status.isSuccess() ) {
00034 log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
00035 return status;
00036 }
00037
00038 m_catalog = serviceLocator()->service(m_catalogSvcName);
00039 if( !m_catalog.isValid() ) {
00040 log << MSG::ERROR
00041 << "Unable to localize interface IFileCatalog from service:"
00042 << m_catalogSvcName << endmsg;
00043 return StatusCode::FAILURE;
00044 }
00045 return status;
00046 }
00047
00049 StatusCode IODataManager::finalize() {
00050 m_catalog = 0;
00051 return Service::finalize();
00052 }
00053
00054
00055 StatusCode IODataManager::error(CSTR msg, bool rethrow) {
00056 MsgStream log(msgSvc(),name());
00057 log << MSG::ERROR << "Error: " << msg << endmsg;
00058 if ( rethrow ) {
00059 System::breakExecution();
00060 }
00061 return S_ERROR;
00062 }
00064 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
00065 Connections conns;
00066 for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00067 IDataConnection* c = (*i).second->connection;
00068 if ( 0 == owner || c->owner() == owner )
00069 conns.push_back(c);
00070 }
00071 return conns;
00072 }
00073
00075 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
00076 if ( !establishConnection(con) ) {
00077 return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
00078 }
00079 std::string dsn = con ? con->name() : std::string("Unknown");
00080 return error("Failed to connect to data:"+dsn,false);
00081 }
00082
00084 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
00085 if ( !establishConnection(con) ) {
00086 return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
00087 }
00088 std::string dsn = con ? con->name() : std::string("Unknown");
00089 return error("Failed to connect to data:"+dsn,false);
00090 }
00091
00093 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
00094 return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
00095 }
00096
00098 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
00099 return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
00100 }
00101
00103 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
00104 return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
00105 }
00106
00107 StatusCode IODataManager::disconnect(Connection* con) {
00108 if ( con ) {
00109 std::string dataset = con->name();
00110 std::string dsn = dataset;
00111 StatusCode sc = con->disconnect();
00112 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00113 dsn = dataset.substr(4);
00114 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00115 dsn = dataset.substr(4);
00116 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00117 dsn = dataset.substr(4);
00118
00119 FidMap::iterator j = m_fidMap.find(dataset);
00120 if ( j != m_fidMap.end() ) {
00121 std::string fid = (*j).second;
00122 std::string gfal_name = "gfal:guid:" + fid;
00123 ConnectionMap::iterator i=m_connectionMap.find(fid);
00124 m_fidMap.erase(j);
00125 if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
00126 m_fidMap.erase(j);
00127 if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
00128 m_fidMap.erase(j);
00129 if ( i != m_connectionMap.end() ) {
00130 if ( (*i).second ) {
00131 IDataConnection* c = (*i).second->connection;
00132 std::string pfn = c->pfn();
00133 if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
00134 m_fidMap.erase(j);
00135 if ( c->isConnected() ) {
00136 MsgStream log(msgSvc(),name());
00137 c->disconnect();
00138 log << MSG::INFO << "Disconnect from dataset " << dsn
00139 << " [" << fid << "]" << endmsg;
00140 }
00141 delete (*i).second;
00142 m_connectionMap.erase(i);
00143 }
00144 }
00145 }
00146 return sc;
00147 }
00148 return S_ERROR;
00149 }
00150
00151 StatusCode IODataManager::reconnect(Entry* e) {
00152 StatusCode sc = S_ERROR;
00153 if ( e && e->connection ) {
00154 switch(e->ioType) {
00155 case Connection::READ:
00156 sc = e->connection->connectRead();
00157 break;
00158 case Connection::UPDATE:
00159 case Connection::CREATE:
00160 case Connection::RECREATE:
00161 sc = e->connection->connectWrite(e->ioType);
00162 break;
00163 default:
00164 return S_ERROR;
00165 }
00166 if ( sc.isSuccess() && e->ioType == Connection::READ ) {
00167 std::vector<Entry*> to_retire;
00168 e->connection->resetAge();
00169 for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00170 IDataConnection* c = (*i).second->connection;
00171 if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen ) {
00172 c->ageFile();
00173 if ( c->age() > m_ageLimit ) {
00174 to_retire.push_back((*i).second);
00175 }
00176 }
00177 }
00178 if ( !to_retire.empty() ) {
00179 MsgStream log(msgSvc(),name());
00180 for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j) {
00181 IDataConnection* c = (*j)->connection;
00182 c->disconnect();
00183 log << MSG::INFO << "Disconnect from dataset " << c->pfn()
00184 << " [" << c->fid() << "]" << endmsg;
00185 }
00186 }
00187 }
00188 }
00189 return sc;
00190 }
00191
00193 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const {
00194 FidMap::const_iterator j = m_fidMap.find(dataset);
00195 if ( j != m_fidMap.end() ) {
00196 ConnectionMap::const_iterator i=m_connectionMap.find((*j).second);
00197 return (i != m_connectionMap.end()) ? (*i).second->connection : 0;
00198 }
00199 return 0;
00200 }
00201
00202 StatusCode IODataManager::establishConnection(Connection* con) {
00203 if ( con ) {
00204 if ( !con->isConnected() ) {
00205 ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
00206 if ( i != m_connectionMap.end() ) {
00207 Connection* c = (*i).second->connection;
00208 if ( c != con ) {
00209 return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
00210 }
00211 if ( reconnect((*i).second).isSuccess() ) {
00212 return S_OK;
00213 }
00214 }
00215 return S_ERROR;
00216 }
00217 con->resetAge();
00218 return S_OK;
00219 }
00220 return error("Severe logic bug: No connection object avalible.",true);
00221 }
00222
00223 StatusCode
00224 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
00225 MsgStream log(msgSvc(),name());
00226 std::string dsn = dataset;
00227 try {
00228 StatusCode sc(StatusCode::SUCCESS,true);
00229 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00230 dsn = dataset.substr(4), typ = FID;
00231 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00232 dsn = dataset.substr(4), typ = LFN;
00233 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00234 dsn = dataset.substr(4), typ = PFN;
00235 else if ( typ == UNKNOWN )
00236 return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
00237
00238 if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) {
00239 return IDataConnection::BAD_DATA_CONNECTION;
00240 }
00241 if ( typ == FID ) {
00242 ConnectionMap::iterator fi = m_connectionMap.find(dsn);
00243 if ( fi == m_connectionMap.end() ) {
00244 IFileCatalog::Files files;
00245 m_catalog->getPFN(dsn,files);
00246 if ( files.size() == 0 ) {
00247 if ( !m_useGFAL ) {
00248 if ( m_quarantine ) s_badFiles.insert(dsn);
00249 error("connectDataIO> failed to resolve FID:"+dsn,false);
00250 return IDataConnection::BAD_DATA_CONNECTION;
00251 }
00252 else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
00253 std::string gfal_name = "gfal:guid:" + dsn;
00254 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
00255 sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
00256 if ( sc.isSuccess() ) return sc;
00257 if ( m_quarantine ) s_badFiles.insert(dsn);
00258 }
00259 if ( m_quarantine ) s_badFiles.insert(dsn);
00260 error("connectDataIO> Failed to resolve FID:"+dsn,false);
00261 return IDataConnection::BAD_DATA_CONNECTION;
00262 }
00263 std::string pfn = files[0].first;
00264 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
00265 sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
00266 if ( !sc.isSuccess() ) {
00267 if ( m_quarantine ) s_badFiles.insert(pfn);
00268 return IDataConnection::BAD_DATA_CONNECTION;
00269 }
00270 return sc;
00271 }
00272 return S_ERROR;
00273
00274
00275
00276
00277 }
00278 std::string fid;
00279 FidMap::iterator j = m_fidMap.find(dsn);
00280 if ( j == m_fidMap.end() ) {
00281 IFileCatalog::Files files;
00282 switch(typ) {
00283 case LFN:
00284 fid = m_catalog->lookupLFN(dsn);
00285 if ( fid.empty() ) {
00286 log << MSG::ERROR << "Failed to resolve LFN:" << dsn
00287 << " Cannot access this dataset." << endmsg;
00288 return IDataConnection::BAD_DATA_CONNECTION;
00289 }
00290 break;
00291 case PFN:
00292 fid = m_catalog->lookupPFN(dsn);
00293 if ( !fid.empty() ) m_catalog->getPFN(fid, files);
00294 if ( files.empty() ) {
00295 if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
00296 if ( fid.empty() ) fid = m_catalog->createFID();
00297 m_catalog->registerPFN(fid,dsn,technology);
00298 log << MSG::INFO << "Referring to dataset " << dsn
00299 << " by its file ID:" << fid << endmsg;
00300 }
00301 else {
00302 fid = dsn;
00303 }
00304 }
00305 break;
00306 }
00307 }
00308 else {
00309 fid = (*j).second;
00310 }
00311 if ( typ == PFN ) {
00312
00313 ConnectionMap::iterator fi = m_connectionMap.find(fid);
00314 if ( fi == m_connectionMap.end() ) {
00315 connection->setFID(fid);
00316 connection->setPFN(dsn);
00317 Entry* e = new Entry(technology, keep_open, rw, connection);
00318
00319 if ( !reconnect(e).isSuccess() ) {
00320 delete e;
00321 if ( m_quarantine ) s_badFiles.insert(dsn);
00322 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false);
00323 return IDataConnection::BAD_DATA_CONNECTION;
00324 }
00325 fid = connection->fid();
00326 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
00327 if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
00328 if ( strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
00329 log << MSG::ERROR << "Referring to existing dataset " << dsn
00330 << " by its physical name." << endmsg;
00331 log << "You may not be able to navigate back to the input file"
00332 << " -- processing continues" << endmsg;
00333 }
00334 }
00335 m_connectionMap.insert(std::make_pair(fid,e)).first;
00336 return S_OK;
00337 }
00338
00339 if ( !reconnect((*fi).second).isSuccess() ) {
00340 if ( m_quarantine ) s_badFiles.insert(dsn);
00341 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false);
00342 return IDataConnection::BAD_DATA_CONNECTION;
00343 }
00344 return S_OK;
00345 }
00346 sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
00347 if ( !sc.isSuccess() && m_quarantine ) {
00348 s_badFiles.insert(fid);
00349 }
00350 else if ( typ == LFN ) {
00351 m_fidMap[dataset] = fid;
00352 }
00353 return sc;
00354 }
00355 catch (std::exception& e) {
00356 error(std::string("connectDataIO> Caught exception:")+e.what(), false);
00357 }
00358 catch(...) {
00359 }
00360 error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
00361 s_badFiles.insert(dsn);
00362 return IDataConnection::BAD_DATA_CONNECTION;
00363 }