00001
00002 #include "GaudiKernel/Debugger.h"
00003 #include "GaudiKernel/MsgStream.h"
00004 #include "GaudiKernel/strcasecmp.h"
00005 #include "GaudiKernel/DeclareFactoryEntries.h"
00006 #include "GaudiUtils/IFileCatalog.h"
00007 #include "IODataManager.h"
00008 #include "GaudiKernel/SmartIF.h"
00009 #include "GaudiKernel/Incident.h"
00010 #include "GaudiKernel/IIncidentSvc.h"
00011
00012 #include <set>
00013
00014 DECLARE_NAMESPACE_SERVICE_FACTORY(Gaudi,IODataManager)
00015
00016 using namespace Gaudi;
00017
00018 enum { S_OK = StatusCode::SUCCESS, S_ERROR=StatusCode::FAILURE };
00019
00020 static std::set<std::string> s_badFiles;
00021
00022 IODataManager::IODataManager(CSTR nam, ISvcLocator* svcloc)
00023 : base_class(nam, svcloc), m_ageLimit(2)
00024 {
00025 declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
00026 declareProperty("UseGFAL", m_useGFAL = true);
00027 declareProperty("QuarantineFiles", m_quarantine = true);
00028 declareProperty("AgeLimit", m_ageLimit = 2);
00029 }
00030
00032 StatusCode IODataManager::initialize() {
00033
00034 StatusCode status = Service::initialize();
00035 MsgStream log(msgSvc(), name());
00036 if ( !status.isSuccess() ) {
00037 log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
00038 return status;
00039 }
00040
00041 m_catalog = serviceLocator()->service(m_catalogSvcName);
00042 if( !m_catalog.isValid() ) {
00043 log << MSG::ERROR
00044 << "Unable to localize interface IFileCatalog from service:"
00045 << m_catalogSvcName << endmsg;
00046 return StatusCode::FAILURE;
00047 }
00048 m_incSvc = serviceLocator()->service("IncidentSvc");
00049 if( !m_incSvc.isValid() ) {
00050 log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
00051 return status;
00052 }
00053
00054 return status;
00055 }
00056
00058 StatusCode IODataManager::finalize() {
00059 m_catalog = 0;
00060 return Service::finalize();
00061 }
00062
00063
00064 StatusCode IODataManager::error(CSTR msg, bool rethrow) {
00065 MsgStream log(msgSvc(),name());
00066 log << MSG::ERROR << "Error: " << msg << endmsg;
00067 if ( rethrow ) {
00068 System::breakExecution();
00069 }
00070 return S_ERROR;
00071 }
00073 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
00074 Connections conns;
00075 for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00076 IDataConnection* c = (*i).second->connection;
00077 if ( 0 == owner || c->owner() == owner )
00078 conns.push_back(c);
00079 }
00080 return conns;
00081 }
00082
00084 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
00085 if ( !establishConnection(con) ) {
00086 return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
00087 }
00088 std::string dsn = con ? con->name() : std::string("Unknown");
00089 return error("Failed to connect to data:"+dsn,false);
00090 }
00091
00093 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
00094 if ( !establishConnection(con) ) {
00095 return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
00096 }
00097 std::string dsn = con ? con->name() : std::string("Unknown");
00098 return error("Failed to connect to data:"+dsn,false);
00099 }
00100
00102 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
00103 return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
00104 }
00105
00107 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
00108 return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
00109 }
00110
00112 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
00113 return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
00114 }
00115
00116 StatusCode IODataManager::disconnect(Connection* con) {
00117 if ( con ) {
00118 std::string dataset = con->name();
00119 std::string dsn = dataset;
00120 StatusCode sc = con->disconnect();
00121 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00122 dsn = dataset.substr(4);
00123 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00124 dsn = dataset.substr(4);
00125 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00126 dsn = dataset.substr(4);
00127
00128 FidMap::iterator j = m_fidMap.find(dataset);
00129 if ( j != m_fidMap.end() ) {
00130 std::string fid = (*j).second;
00131 std::string gfal_name = "gfal:guid:" + fid;
00132 ConnectionMap::iterator i=m_connectionMap.find(fid);
00133 m_fidMap.erase(j);
00134 if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
00135 m_fidMap.erase(j);
00136 if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
00137 m_fidMap.erase(j);
00138 if ( i != m_connectionMap.end() ) {
00139 if ( (*i).second ) {
00140 IDataConnection* c = (*i).second->connection;
00141 std::string pfn = c->pfn();
00142 if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
00143 m_fidMap.erase(j);
00144 if ( c->isConnected() ) {
00145 MsgStream log(msgSvc(),name());
00146 c->disconnect();
00147 log << MSG::INFO << "Disconnect from dataset " << dsn
00148 << " [" << fid << "]" << endmsg;
00149 }
00150 delete (*i).second;
00151 m_connectionMap.erase(i);
00152 }
00153 }
00154 }
00155 return sc;
00156 }
00157 return S_ERROR;
00158 }
00159
00160 StatusCode IODataManager::reconnect(Entry* e) {
00161 StatusCode sc = S_ERROR;
00162 if ( e && e->connection ) {
00163 switch(e->ioType) {
00164 case Connection::READ:
00165 sc = e->connection->connectRead();
00166 break;
00167 case Connection::UPDATE:
00168 case Connection::CREATE:
00169 case Connection::RECREATE:
00170 sc = e->connection->connectWrite(e->ioType);
00171 break;
00172 default:
00173 return S_ERROR;
00174 }
00175 if ( sc.isSuccess() && e->ioType == Connection::READ ) {
00176 std::vector<Entry*> to_retire;
00177 e->connection->resetAge();
00178 for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
00179 IDataConnection* c = (*i).second->connection;
00180 if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen ) {
00181 c->ageFile();
00182 if ( c->age() > m_ageLimit ) {
00183 to_retire.push_back((*i).second);
00184 }
00185 }
00186 }
00187 if ( !to_retire.empty() ) {
00188 MsgStream log(msgSvc(),name());
00189 for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j) {
00190 IDataConnection* c = (*j)->connection;
00191 c->disconnect();
00192 log << MSG::INFO << "Disconnect from dataset " << c->pfn()
00193 << " [" << c->fid() << "]" << endmsg;
00194 }
00195 }
00196 }
00197 }
00198 return sc;
00199 }
00200
00202 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const {
00203 FidMap::const_iterator j = m_fidMap.find(dataset);
00204 if ( j != m_fidMap.end() ) {
00205 ConnectionMap::const_iterator i=m_connectionMap.find((*j).second);
00206 return (i != m_connectionMap.end()) ? (*i).second->connection : 0;
00207 }
00208 return 0;
00209 }
00210
00211 StatusCode IODataManager::establishConnection(Connection* con) {
00212 if ( con ) {
00213 if ( !con->isConnected() ) {
00214 ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
00215 if ( i != m_connectionMap.end() ) {
00216 Connection* c = (*i).second->connection;
00217 if ( c != con ) {
00218 m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
00219 return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
00220 }
00221 if ( reconnect((*i).second).isSuccess() ) {
00222 return S_OK;
00223 }
00224 }
00225 return S_ERROR;
00226 }
00227 con->resetAge();
00228 return S_OK;
00229 }
00230 return error("Severe logic bug: No connection object avalible.",true);
00231 }
00232
00233 StatusCode
00234 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
00235 MsgStream log(msgSvc(),name());
00236 std::string dsn = dataset;
00237 try {
00238 StatusCode sc(StatusCode::SUCCESS,true);
00239 if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
00240 dsn = dataset.substr(4), typ = FID;
00241 else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
00242 dsn = dataset.substr(4), typ = LFN;
00243 else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
00244 dsn = dataset.substr(4), typ = PFN;
00245 else if ( typ == UNKNOWN )
00246 return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
00247
00248 if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) {
00249 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00250 return IDataConnection::BAD_DATA_CONNECTION;
00251 }
00252 if ( typ == FID ) {
00253 ConnectionMap::iterator fi = m_connectionMap.find(dsn);
00254 if ( fi == m_connectionMap.end() ) {
00255 IFileCatalog::Files files;
00256 m_catalog->getPFN(dsn,files);
00257 if ( files.size() == 0 ) {
00258 if ( !m_useGFAL ) {
00259 if ( m_quarantine ) s_badFiles.insert(dsn);
00260 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00261 error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
00262 return IDataConnection::BAD_DATA_CONNECTION;
00263 }
00264 else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
00265 std::string gfal_name = "gfal:guid:" + dsn;
00266 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
00267 sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
00268 if ( sc.isSuccess() ) return sc;
00269 if ( m_quarantine ) s_badFiles.insert(dsn);
00270 }
00271 if ( m_quarantine ) s_badFiles.insert(dsn);
00272 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00273 error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
00274 return IDataConnection::BAD_DATA_CONNECTION;
00275 }
00276 std::string pfn = files[0].first;
00277 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
00278 sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
00279 if ( !sc.isSuccess() ) {
00280 if ( m_quarantine ) s_badFiles.insert(pfn);
00281 m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
00282 return IDataConnection::BAD_DATA_CONNECTION;
00283 }
00284
00285 return sc;
00286 }
00287 return S_ERROR;
00288
00289
00290
00291
00292 }
00293 std::string fid;
00294 FidMap::iterator j = m_fidMap.find(dsn);
00295 if ( j == m_fidMap.end() ) {
00296 IFileCatalog::Files files;
00297 switch(typ) {
00298 case LFN:
00299 fid = m_catalog->lookupLFN(dsn);
00300 if ( fid.empty() ) {
00301 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00302 log << MSG::ERROR << "Failed to resolve LFN:" << dsn
00303 << " Cannot access this dataset." << endmsg;
00304 return IDataConnection::BAD_DATA_CONNECTION;
00305 }
00306 break;
00307 case PFN:
00308 fid = m_catalog->lookupPFN(dsn);
00309 if ( !fid.empty() ) m_catalog->getPFN(fid, files);
00310 if ( files.empty() ) {
00311 if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
00312 if ( fid.empty() ) fid = m_catalog->createFID();
00313 m_catalog->registerPFN(fid,dsn,technology);
00314 log << MSG::INFO << "Referring to dataset " << dsn
00315 << " by its file ID:" << fid << endmsg;
00316 }
00317 else {
00318 fid = dsn;
00319 }
00320 }
00321 break;
00322 }
00323 }
00324 else {
00325 fid = (*j).second;
00326 }
00327 if ( typ == PFN ) {
00328
00329 ConnectionMap::iterator fi = m_connectionMap.find(fid);
00330 if ( fi == m_connectionMap.end() ) {
00331 connection->setFID(fid);
00332 connection->setPFN(dsn);
00333 Entry* e = new Entry(technology, keep_open, rw, connection);
00334
00335 if ( !reconnect(e).isSuccess() ) {
00336 delete e;
00337 if ( m_quarantine ) s_badFiles.insert(dsn);
00338 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00339 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
00340 return IDataConnection::BAD_DATA_CONNECTION;
00341 }
00342 fid = connection->fid();
00343 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
00344 if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
00345 if ( strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
00346 log << MSG::ERROR << "Referring to existing dataset " << dsn
00347 << " by its physical name." << endmsg;
00348 log << "You may not be able to navigate back to the input file"
00349 << " -- processing continues" << endmsg;
00350 }
00351 }
00352 m_connectionMap.insert(std::make_pair(fid,e)).first;
00353 return S_OK;
00354 }
00355
00356 if ( !reconnect((*fi).second).isSuccess() ) {
00357 if ( m_quarantine ) s_badFiles.insert(dsn);
00358 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00359 error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
00360 return IDataConnection::BAD_DATA_CONNECTION;
00361 }
00362 return S_OK;
00363 }
00364 sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
00365 if ( !sc.isSuccess() && m_quarantine ) {
00366 s_badFiles.insert(fid);
00367 }
00368 else if ( typ == LFN ) {
00369 m_fidMap[dataset] = fid;
00370 }
00371 return sc;
00372 }
00373 catch (std::exception& e) {
00374 error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
00375 }
00376 catch(...) {
00377 error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
00378 }
00379 m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
00380 error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
00381 s_badFiles.insert(dsn);
00382 return IDataConnection::BAD_DATA_CONNECTION;
00383 }