![]() |
|
|
Generated: 8 Jan 2009 |
00001 // Framework include files 00002 #include "GaudiKernel/Debugger.h" 00003 #include "GaudiKernel/MsgStream.h" 00004 #include "GaudiKernel/strcasecmp.h" 00005 #include "GaudiKernel/DeclareFactoryEntries.h" 00006 #include "GaudiUtils/IFileCatalog.h" 00007 #include "IODataManager.h" 00008 00009 #include <set> 00010 00011 DECLARE_NAMESPACE_SERVICE_FACTORY(Gaudi,IODataManager); 00012 00013 using namespace Gaudi; 00014 00015 enum { S_OK = StatusCode::SUCCESS, S_ERROR=StatusCode::FAILURE }; 00016 00017 static std::set<std::string> s_badFiles; 00018 00019 IODataManager::IODataManager(CSTR nam, ISvcLocator* svcloc) 00020 : Service( nam, svcloc), m_ageLimit(2), m_catalog(0) 00021 { 00022 declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog"); 00023 declareProperty("UseGFAL", m_useGFAL = true); 00024 declareProperty("QuarantineFiles", m_quarantine = true); 00025 declareProperty("AgeLimit", m_ageLimit = 2); 00026 } 00027 00028 // IInterface::queryInterface 00029 StatusCode IODataManager::queryInterface(const InterfaceID& riid, void** ppvIf) { 00030 if ( riid == IID_IIODataManager ) { 00031 *ppvIf = (IIODataManager*)this; 00032 addRef(); 00033 return S_OK; 00034 } 00035 return Service::queryInterface(riid, ppvIf); 00036 } 00037 00039 StatusCode IODataManager::initialize() { 00040 // Initialize base class 00041 StatusCode status = Service::initialize(); 00042 MsgStream log(msgSvc(), name()); 00043 if ( !status.isSuccess() ) { 00044 log << MSG::ERROR << "Error initializing base class Service!" << endreq; 00045 return status; 00046 } 00047 // Retrieve conversion service handling event iteration 00048 status = serviceLocator()->service(m_catalogSvcName, m_catalog); 00049 if( !status.isSuccess() ) { 00050 log << MSG::ERROR 00051 << "Unable to localize interface IID_FileCatalog from service:" 00052 << m_catalogSvcName << endreq; 00053 return status; 00054 } 00055 return status; 00056 } 00057 00059 StatusCode IODataManager::finalize() { 00060 if ( m_catalog ) { 00061 m_catalog->release(); 00062 m_catalog = 0; 00063 } 00064 return Service::finalize(); 00065 } 00066 00067 // Small routine to issue exceptions 00068 StatusCode IODataManager::error(CSTR msg, bool rethrow) { 00069 MsgStream log(msgSvc(),name()); 00070 log << MSG::ERROR << "Error: " << msg << endmsg; 00071 if ( rethrow ) { 00072 System::breakExecution(); 00073 } 00074 return S_ERROR; 00075 } 00077 IODataManager::Connections IODataManager::connections(const IInterface* owner) const { 00078 Connections conns; 00079 for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) { 00080 IDataConnection* c = (*i).second->connection; 00081 if ( 0 == owner || c->owner() == owner ) 00082 conns.push_back(c); 00083 } 00084 return conns; 00085 } 00086 00088 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) { 00089 if ( !establishConnection(con) ) { 00090 return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con); 00091 } 00092 std::string dsn = con ? con->name() : std::string("Unknown"); 00093 return error("Failed to connect to data:"+dsn,false); 00094 } 00095 00097 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) { 00098 if ( !establishConnection(con) ) { 00099 return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con); 00100 } 00101 std::string dsn = con ? con->name() : std::string("Unknown"); 00102 return error("Failed to connect to data:"+dsn,false); 00103 } 00104 00106 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) { 00107 return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR; 00108 } 00109 00111 StatusCode IODataManager::write(Connection* con, const void* data, int len) { 00112 return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR; 00113 } 00114 00116 long long int IODataManager::seek(Connection* con, long long int where, int origin) { 00117 return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1; 00118 } 00119 00120 StatusCode IODataManager::disconnect(Connection* con) { 00121 if ( con ) { 00122 std::string dataset = con->name(); 00123 std::string dsn = dataset; 00124 StatusCode sc = con->disconnect(); 00125 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 ) 00126 dsn = dataset.substr(4); 00127 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 ) 00128 dsn = dataset.substr(4); 00129 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 ) 00130 dsn = dataset.substr(4); 00131 00132 FidMap::iterator j = m_fidMap.find(dataset); 00133 if ( j != m_fidMap.end() ) { 00134 std::string fid = (*j).second; 00135 std::string gfal_name = "gfal:guid:" + fid; 00136 ConnectionMap::iterator i=m_connectionMap.find(fid); 00137 m_fidMap.erase(j); 00138 if ( (j=m_fidMap.find(fid)) != m_fidMap.end() ) 00139 m_fidMap.erase(j); 00140 if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() ) 00141 m_fidMap.erase(j); 00142 if ( i != m_connectionMap.end() ) { 00143 if ( (*i).second ) { 00144 IDataConnection* c = (*i).second->connection; 00145 std::string pfn = c->pfn(); 00146 if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() ) 00147 m_fidMap.erase(j); 00148 if ( c->isConnected() ) { 00149 MsgStream log(msgSvc(),name()); 00150 c->disconnect(); 00151 log << MSG::INFO << "Disconnect from dataset " << dsn 00152 << " [" << fid << "]" << endmsg; 00153 } 00154 delete (*i).second; 00155 m_connectionMap.erase(i); 00156 } 00157 } 00158 } 00159 return sc; 00160 } 00161 return S_ERROR; 00162 } 00163 00164 StatusCode IODataManager::reconnect(Entry* e) { 00165 StatusCode sc = S_ERROR; 00166 if ( e && e->connection ) { 00167 switch(e->ioType) { 00168 case Connection::READ: 00169 sc = e->connection->connectRead(); 00170 break; 00171 case Connection::UPDATE: 00172 case Connection::CREATE: 00173 case Connection::RECREATE: 00174 sc = e->connection->connectWrite(e->ioType); 00175 break; 00176 default: 00177 return S_ERROR; 00178 } 00179 if ( sc.isSuccess() && e->ioType == Connection::READ ) { 00180 std::vector<Entry*> to_retire; 00181 e->connection->resetAge(); 00182 for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) { 00183 IDataConnection* c = (*i).second->connection; 00184 if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen ) { 00185 c->ageFile(); 00186 if ( c->age() > m_ageLimit ) { 00187 to_retire.push_back((*i).second); 00188 } 00189 } 00190 } 00191 if ( !to_retire.empty() ) { 00192 MsgStream log(msgSvc(),name()); 00193 for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j) { 00194 IDataConnection* c = (*j)->connection; 00195 c->disconnect(); 00196 log << MSG::INFO << "Disconnect from dataset " << c->pfn() 00197 << " [" << c->fid() << "]" << endmsg; 00198 } 00199 } 00200 } 00201 } 00202 return sc; 00203 } 00204 00206 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const { 00207 FidMap::const_iterator j = m_fidMap.find(dataset); 00208 if ( j != m_fidMap.end() ) { 00209 ConnectionMap::const_iterator i=m_connectionMap.find((*j).second); 00210 return (i != m_connectionMap.end()) ? (*i).second->connection : 0; 00211 } 00212 return 0; 00213 } 00214 00215 StatusCode IODataManager::establishConnection(Connection* con) { 00216 if ( con ) { 00217 if ( !con->isConnected() ) { 00218 ConnectionMap::const_iterator i=m_connectionMap.find(con->name()); 00219 if ( i != m_connectionMap.end() ) { 00220 Connection* c = (*i).second->connection; 00221 if ( c != con ) { 00222 return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true); 00223 } 00224 if ( reconnect((*i).second).isSuccess() ) { 00225 return S_OK; 00226 } 00227 } 00228 return S_ERROR; 00229 } 00230 con->resetAge(); 00231 return S_OK; 00232 } 00233 return error("Severe logic bug: No connection object avalible.",true); 00234 } 00235 00236 StatusCode 00237 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) { 00238 MsgStream log(msgSvc(),name()); 00239 std::string dsn = dataset; 00240 try { 00241 StatusCode sc(StatusCode::SUCCESS,true); 00242 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 ) 00243 dsn = dataset.substr(4), typ = FID; 00244 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 ) 00245 dsn = dataset.substr(4), typ = LFN; 00246 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 ) 00247 dsn = dataset.substr(4), typ = PFN; 00248 else if ( typ == UNKNOWN ) 00249 return connectDataIO(PFN, rw, dsn, technology, keep_open, connection); 00250 00251 if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) { 00252 return IDataConnection::BAD_DATA_CONNECTION; 00253 } 00254 if ( typ == FID ) { 00255 ConnectionMap::iterator fi = m_connectionMap.find(dsn); 00256 if ( fi == m_connectionMap.end() ) { 00257 IFileCatalog::Files files; 00258 m_catalog->getPFN(dsn,files); 00259 if ( files.size() == 0 ) { 00260 if ( !m_useGFAL ) { 00261 if ( m_quarantine ) s_badFiles.insert(dsn); 00262 error("connectDataIO> failed to resolve FID:"+dsn,false); 00263 return IDataConnection::BAD_DATA_CONNECTION; 00264 } 00265 else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) { 00266 std::string gfal_name = "gfal:guid:" + dsn; 00267 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn; 00268 sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection); 00269 if ( sc.isSuccess() ) return sc; 00270 if ( m_quarantine ) s_badFiles.insert(dsn); 00271 } 00272 if ( m_quarantine ) s_badFiles.insert(dsn); 00273 error("connectDataIO> Failed to resolve FID:"+dsn,false); 00274 return IDataConnection::BAD_DATA_CONNECTION; 00275 } 00276 std::string pfn = files[0].first; 00277 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn; 00278 sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection); 00279 if ( !sc.isSuccess() ) { 00280 if ( m_quarantine ) s_badFiles.insert(pfn); 00281 return IDataConnection::BAD_DATA_CONNECTION; 00282 } 00283 return sc; 00284 } 00285 return S_ERROR; 00286 //Connection* c = (*fi).second->connection; 00287 //sc = connectDataIO(PFN, rw, c->pfn(), technology, keep_open, connection); 00288 //if ( !sc.isSuccess() && m_quarantine ) s_badFiles.insert(c->pfn()); 00289 //return sc; 00290 } 00291 std::string fid; 00292 FidMap::iterator j = m_fidMap.find(dsn); 00293 if ( j == m_fidMap.end() ) { 00294 IFileCatalog::Files files; 00295 switch(typ) { 00296 case LFN: 00297 fid = m_catalog->lookupLFN(dsn); 00298 if ( fid.empty() ) { 00299 log << MSG::ERROR << "Failed to resolve LFN:" << dsn 00300 << " Cannot access this dataset." << endmsg; 00301 return IDataConnection::BAD_DATA_CONNECTION; 00302 } 00303 break; 00304 case PFN: 00305 fid = m_catalog->lookupPFN(dsn); 00306 if ( !fid.empty() ) m_catalog->getPFN(fid, files); 00307 if ( files.empty() ) { 00308 if ( rw == Connection::CREATE || rw == Connection::RECREATE ) { 00309 if ( fid.empty() ) fid = m_catalog->createFID(); 00310 m_catalog->registerPFN(fid,dsn,technology); 00311 log << MSG::INFO << "Referring to dataset " << dsn 00312 << " by its file ID:" << fid << endmsg; 00313 } 00314 else { 00315 fid = dsn; 00316 } 00317 } 00318 break; 00319 } 00320 } 00321 else { 00322 fid = (*j).second; 00323 } 00324 if ( typ == PFN ) { 00325 // Open PFN 00326 ConnectionMap::iterator fi = m_connectionMap.find(fid); 00327 if ( fi == m_connectionMap.end() ) { 00328 connection->setFID(fid); 00329 connection->setPFN(dsn); 00330 Entry* e = new Entry(technology, keep_open, rw, connection); 00331 // Here we open the file! 00332 if ( !reconnect(e).isSuccess() ) { 00333 delete e; 00334 if ( m_quarantine ) s_badFiles.insert(dsn); 00335 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false); 00336 return IDataConnection::BAD_DATA_CONNECTION; 00337 } 00338 fid = connection->fid(); 00339 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid; 00340 if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) { 00341 if ( strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) { 00342 log << MSG::ERROR << "Referring to existing dataset " << dsn 00343 << " by its physical name." << endmsg; 00344 log << "You may not be able to nagivate back to the input file" 00345 << " -- processing continues" << endmsg; 00346 } 00347 } 00348 m_connectionMap.insert(std::make_pair(fid,e)).first; 00349 return S_OK; 00350 } 00351 // Here we open the file! 00352 if ( !reconnect((*fi).second).isSuccess() ) { 00353 if ( m_quarantine ) s_badFiles.insert(dsn); 00354 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false); 00355 return IDataConnection::BAD_DATA_CONNECTION; 00356 } 00357 return S_OK; 00358 } 00359 sc = connectDataIO(FID, rw, fid, technology, keep_open, connection); 00360 if ( !sc.isSuccess() && m_quarantine ) { 00361 s_badFiles.insert(fid); 00362 } 00363 else if ( typ == LFN ) { 00364 m_fidMap[dataset] = fid; 00365 } 00366 return sc; 00367 } 00368 catch (std::exception& e) { 00369 error(std::string("connectDataIO> Caught exception:")+e.what(), false); 00370 } 00371 catch(...) { 00372 } 00373 error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore(); 00374 s_badFiles.insert(dsn); 00375 return IDataConnection::BAD_DATA_CONNECTION; 00376 }