Gaudi Framework, version v25r0

Home   Generated: Mon Feb 17 2014
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "GaudiKernel/Debugger.h"
6 #include "IODataManager.h"
7 #include "GaudiKernel/SmartIF.h"
8 #include "GaudiKernel/Incident.h"
10 
11 #include <set>
12 
13 using namespace Gaudi;
14 
16 
18 
20 
21 IODataManager::IODataManager(CSTR nam, ISvcLocator* svcloc)
22  : base_class(nam, svcloc), m_ageLimit(2)
23 {
24  declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
25  declareProperty("UseGFAL", m_useGFAL = true);
26  declareProperty("QuarantineFiles", m_quarantine = true);
27  declareProperty("AgeLimit", m_ageLimit = 2);
28  declareProperty("DisablePFNWarning", m_disablePFNWarning = false,
29  "if set to True, we will not report when a file "
30  "is opened by it's physical name");
31 }
32 
35  // Initialize base class
37  MsgStream log(msgSvc(), name());
38  if ( !status.isSuccess() ) {
39  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
40  return status;
41  }
42  // Retrieve conversion service handling event iteration
43  m_catalog = serviceLocator()->service(m_catalogSvcName);
44  if( !m_catalog.isValid() ) {
45  log << MSG::ERROR
46  << "Unable to localize interface IFileCatalog from service:"
47  << m_catalogSvcName << endmsg;
48  return StatusCode::FAILURE;
49  }
50  m_incSvc = serviceLocator()->service("IncidentSvc");
51  if( !m_incSvc.isValid() ) {
52  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
53  return status;
54  }
55 
56  return status;
57 }
58 
60 StatusCode IODataManager::finalize() {
61  m_catalog = 0; // release
62  return Service::finalize();
63 }
64 
66 StatusCode IODataManager::error(CSTR msg, bool rethrow) {
67  MsgStream log(msgSvc(),name());
68  log << MSG::ERROR << "Error: " << msg << endmsg;
69  if ( rethrow ) {
71  }
72  return S_ERROR;
73 }
74 
76 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
77  Connections conns;
78  for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
79  IDataConnection* c = (*i).second->connection;
80  if ( 0 == owner || c->owner() == owner )
81  conns.push_back(c);
82  }
83  return conns;
84 }
85 
87 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
88  if ( !establishConnection(con) ) {
89  return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
90  }
91  std::string dsn = con ? con->name() : std::string("Unknown");
92  return error("Failed to connect to data:"+dsn,false);
93 }
94 
96 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
97  if ( !establishConnection(con) ) {
98  return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
99  }
100  std::string dsn = con ? con->name() : std::string("Unknown");
101  return error("Failed to connect to data:"+dsn,false);
102 }
103 
105 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
106  return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
107 }
108 
110 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
111  return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
112 }
113 
115 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
116  return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
117 }
118 
119 StatusCode IODataManager::disconnect(Connection* con) {
120  if ( con ) {
121  std::string dataset = con->name();
122  std::string dsn = dataset;
123  StatusCode sc = con->disconnect();
124  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
125  dsn = dataset.substr(4);
126  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
127  dsn = dataset.substr(4);
128  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
129  dsn = dataset.substr(4);
130 
131  FidMap::iterator j = m_fidMap.find(dataset);
132  if ( j != m_fidMap.end() ) {
133  std::string fid = (*j).second;
134  std::string gfal_name = "gfal:guid:" + fid;
135  ConnectionMap::iterator i=m_connectionMap.find(fid);
136  m_fidMap.erase(j);
137  if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
138  m_fidMap.erase(j);
139  if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
140  m_fidMap.erase(j);
141  if ( i != m_connectionMap.end() ) {
142  if ( (*i).second ) {
143  IDataConnection* c = (*i).second->connection;
144  std::string pfn = c->pfn();
145  if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
146  m_fidMap.erase(j);
147  if ( c->isConnected() ) {
148  MsgStream log(msgSvc(),name());
149  c->disconnect();
150  log << MSG::INFO << "Disconnect from dataset " << dsn
151  << " [" << fid << "]" << endmsg;
152  }
153  delete (*i).second;
154  m_connectionMap.erase(i);
155  }
156  }
157  }
158  return sc;
159  }
160  return S_ERROR;
161 }
162 
163 StatusCode IODataManager::reconnect(Entry* e) {
165  if ( e && e->connection ) {
166  switch(e->ioType) {
167  case Connection::READ:
168  sc = e->connection->connectRead();
169  break;
170  case Connection::UPDATE:
171  case Connection::CREATE:
172  case Connection::RECREATE:
173  sc = e->connection->connectWrite(e->ioType);
174  break;
175  default:
176  return S_ERROR;
177  }
178  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
179  std::vector<Entry*> to_retire;
180  e->connection->resetAge();
181  for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
182  IDataConnection* c = (*i).second->connection;
183  if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen ) {
184  c->ageFile();
185  if ( c->age() > m_ageLimit ) {
186  to_retire.push_back((*i).second);
187  }
188  }
189  }
190  if ( !to_retire.empty() ) {
191  MsgStream log(msgSvc(),name());
192  for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j) {
193  IDataConnection* c = (*j)->connection;
194  c->disconnect();
195  log << MSG::INFO << "Disconnect from dataset " << c->pfn()
196  << " [" << c->fid() << "]" << endmsg;
197  }
198  }
199  }
200  }
201  return sc;
202 }
203 
205 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const {
206  FidMap::const_iterator j = m_fidMap.find(dataset);
207  if ( j != m_fidMap.end() ) {
208  ConnectionMap::const_iterator i=m_connectionMap.find((*j).second);
209  return (i != m_connectionMap.end()) ? (*i).second->connection : 0;
210  }
211  return 0;
212 }
213 
214 StatusCode IODataManager::establishConnection(Connection* con) {
215  if ( con ) {
216  if ( !con->isConnected() ) {
217  ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
218  if ( i != m_connectionMap.end() ) {
219  Connection* c = (*i).second->connection;
220  if ( c != con ) {
221  m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
222  return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
223  }
224  if ( reconnect((*i).second).isSuccess() ) {
225  return S_OK;
226  }
227  }
228  return S_ERROR;
229  }
230  con->resetAge();
231  return S_OK;
232  }
233  return error("Severe logic bug: No connection object avalible.",true);
234 }
235 
237 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
238  MsgStream log(msgSvc(),name());
239  std::string dsn = dataset;
240  try {
242  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
243  dsn = dataset.substr(4), typ = FID;
244  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
245  dsn = dataset.substr(4), typ = LFN;
246  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
247  dsn = dataset.substr(4), typ = PFN;
248  else if ( typ == UNKNOWN )
249  return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
250 
252  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
253  return IDataConnection::BAD_DATA_CONNECTION;
254  }
255  if ( typ == FID ) {
256  ConnectionMap::iterator fi = m_connectionMap.find(dsn);
257  if ( fi == m_connectionMap.end() ) {
258  IFileCatalog::Files files;
259  m_catalog->getPFN(dsn,files);
260  if ( files.size() == 0 ) {
261  if ( !m_useGFAL ) {
262  if ( m_quarantine ) s_badFiles.insert(dsn);
263  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
264  error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
265  return IDataConnection::BAD_DATA_CONNECTION;
266  }
267  else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
268  std::string gfal_name = "gfal:guid:" + dsn;
269  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
270  sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
271  if ( sc.isSuccess() ) return sc;
272  if ( m_quarantine ) s_badFiles.insert(dsn);
273  }
274  if ( m_quarantine ) s_badFiles.insert(dsn);
275  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
276  error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
277  return IDataConnection::BAD_DATA_CONNECTION;
278  }
279  for(IFileCatalog::Files::const_iterator i=files.begin(); i!=files.end(); ++i) {
280  std::string pfn = (*i).first;
281  if ( i != files.begin() ) {
282  log << MSG::WARNING << "Attempt to connect dsn:" << dsn
283  << " with next entry in data federation:" << pfn << "." << endmsg;
284  }
285  sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
286  if ( !sc.isSuccess() ) {
287  if ( m_quarantine ) s_badFiles.insert(pfn);
288  m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
289  }
290  else {
291  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
292  return sc;
293  }
294  }
295  log << MSG::ERROR << "Failed to open dsn:" << dsn
296  << " Federated file could not be resolved from "
297  << files.size() << " entries." << endmsg;
298  return IDataConnection::BAD_DATA_CONNECTION;
299  }
300  return S_ERROR;
301  }
302  std::string fid;
303  FidMap::iterator j = m_fidMap.find(dsn);
304  if ( j == m_fidMap.end() ) {
305  IFileCatalog::Files files;
306  switch(typ) {
307  case LFN:
308  fid = m_catalog->lookupLFN(dsn);
309  if ( fid.empty() ) {
310  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
311  log << MSG::ERROR << "Failed to resolve LFN:" << dsn
312  << " Cannot access this dataset." << endmsg;
313  return IDataConnection::BAD_DATA_CONNECTION;
314  }
315  break;
316  case PFN:
317  fid = m_catalog->lookupPFN(dsn);
318  if ( !fid.empty() ) m_catalog->getPFN(fid, files);
319  if ( files.empty() ) {
320  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
321  if ( fid.empty() ) fid = m_catalog->createFID();
322  m_catalog->registerPFN(fid,dsn,technology);
323  log << MSG::INFO << "Referring to dataset " << dsn
324  << " by its file ID:" << fid << endmsg;
325  }
326  else {
327  fid = dsn;
328  }
329  }
330  break;
331  }
332  }
333  else {
334  fid = (*j).second;
335  }
336  if ( typ == PFN ) {
337  // Open PFN
338  ConnectionMap::iterator fi = m_connectionMap.find(fid);
339  if ( fi == m_connectionMap.end() ) {
340  connection->setFID(fid);
341  connection->setPFN(dsn);
342  Entry* e = new Entry(technology, keep_open, rw, connection);
343  // Here we open the file!
344  if ( !reconnect(e).isSuccess() ) {
345  delete e;
346  if ( m_quarantine ) s_badFiles.insert(dsn);
347  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
348  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
349  return IDataConnection::BAD_DATA_CONNECTION;
350  }
351  fid = connection->fid();
352  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
353  if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
354  if ( ! m_disablePFNWarning && strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
355  log << MSG::ERROR << "Referring to existing dataset " << dsn
356  << " by its physical name." << endmsg;
357  log << "You may not be able to navigate back to the input file"
358  << " -- processing continues" << endmsg;
359  }
360  }
361  m_connectionMap.insert(std::make_pair(fid,e));
362  return S_OK;
363  }
364  // Here we open the file!
365  if ( !reconnect((*fi).second).isSuccess() ) {
366  if ( m_quarantine ) s_badFiles.insert(dsn);
367  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
368  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
369  return IDataConnection::BAD_DATA_CONNECTION;
370  }
371  return S_OK;
372  }
373  sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
374  if ( !sc.isSuccess() && m_quarantine ) {
375  s_badFiles.insert(fid);
376  }
377  else if ( typ == LFN ) {
378  m_fidMap[dataset] = fid;
379  }
380  return sc;
381  }
382  catch (std::exception& e) {
383  error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
384  }
385  catch(...) {
386  error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
387  }
388  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
389  error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
390  s_badFiles.insert(dsn);
391  return IDataConnection::BAD_DATA_CONNECTION;
392 }

Generated at Mon Feb 17 2014 14:37:49 for Gaudi Framework, version v25r0 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004