All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "GaudiKernel/Debugger.h"
6 #include "IODataManager.h"
7 #include "GaudiKernel/SmartIF.h"
8 #include "GaudiKernel/Incident.h"
11 
12 #include <set>
13 
14 using namespace Gaudi;
15 
17 
19 
20 static std::set<std::string> s_badFiles;
21 
23  : base_class(nam, svcloc), m_ageLimit(2)
24 {
25  declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
26  declareProperty("UseGFAL", m_useGFAL = true);
27  declareProperty("QuarantineFiles", m_quarantine = true);
28  declareProperty("AgeLimit", m_ageLimit = 2);
29  declareProperty("DisablePFNWarning", m_disablePFNWarning = false,
30  "if set to True, we will not report when a file "
31  "is opened by it's physical name");
32 }
33 
36  // Initialize base class
38  MsgStream log(msgSvc(), name());
39  if ( !status.isSuccess() ) {
40  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
41  return status;
42  }
43  // Retrieve conversion service handling event iteration
45  if( !m_catalog.isValid() ) {
46  log << MSG::ERROR
47  << "Unable to localize interface IFileCatalog from service:"
49  return StatusCode::FAILURE;
50  }
51  m_incSvc = serviceLocator()->service("IncidentSvc");
52  if( !m_incSvc.isValid() ) {
53  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
54  return status;
55  }
56 
57  return status;
58 }
59 
62  m_catalog = 0; // release
63  return Service::finalize();
64 }
65 
68  MsgStream log(msgSvc(),name());
69  log << MSG::ERROR << "Error: " << msg << endmsg;
70  if ( rethrow ) {
72  }
73  return S_ERROR;
74 }
75 
77 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
78  Connections conns;
79  for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
80  IDataConnection* c = (*i).second->connection;
81  if ( 0 == owner || c->owner() == owner )
82  conns.push_back(c);
83  }
84  return conns;
85 }
86 
88 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
89  if ( !establishConnection(con) ) {
90  return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
91  }
92  std::string dsn = con ? con->name() : std::string("Unknown");
93  return error("Failed to connect to data:"+dsn,false);
94 }
95 
97 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
98  if ( !establishConnection(con) ) {
99  return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
100  }
101  std::string dsn = con ? con->name() : std::string("Unknown");
102  return error("Failed to connect to data:"+dsn,false);
103 }
104 
106 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
107  return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
108 }
109 
111 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
112  return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
113 }
114 
116 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
117  return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
118 }
119 
121  if ( con ) {
122  std::string dataset = con->name();
123  std::string dsn = dataset;
124  StatusCode sc = con->disconnect();
125  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
126  dsn = dataset.substr(4);
127  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
128  dsn = dataset.substr(4);
129  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
130  dsn = dataset.substr(4);
131 
132  FidMap::iterator j = m_fidMap.find(dataset);
133  if ( j != m_fidMap.end() ) {
134  std::string fid = (*j).second;
135  std::string gfal_name = "gfal:guid:" + fid;
136  ConnectionMap::iterator i=m_connectionMap.find(fid);
137  m_fidMap.erase(j);
138  if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
139  m_fidMap.erase(j);
140  if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
141  m_fidMap.erase(j);
142  if ( i != m_connectionMap.end() ) {
143  if ( (*i).second ) {
144  IDataConnection* c = (*i).second->connection;
145  std::string pfn = c->pfn();
146  if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
147  m_fidMap.erase(j);
148  if ( c->isConnected() ) {
149  MsgStream log(msgSvc(),name());
150  c->disconnect();
151  log << MSG::INFO << "Disconnect from dataset " << dsn
152  << " [" << fid << "]" << endmsg;
153  }
154  delete (*i).second;
155  m_connectionMap.erase(i);
156  }
157  }
158  }
159  return sc;
160  }
161  return S_ERROR;
162 }
163 
166  if ( e && e->connection ) {
167  switch(e->ioType) {
168  case Connection::READ:
169  sc = e->connection->connectRead();
170  break;
171  case Connection::UPDATE:
172  case Connection::CREATE:
173  case Connection::RECREATE:
174  sc = e->connection->connectWrite(e->ioType);
175  break;
176  default:
177  return S_ERROR;
178  }
179  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
180  std::vector<Entry*> to_retire;
181  e->connection->resetAge();
182  for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
183  IDataConnection* c = (*i).second->connection;
184  if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen ) {
185  c->ageFile();
186  if ( c->age() > m_ageLimit ) {
187  to_retire.push_back((*i).second);
188  }
189  }
190  }
191  if ( !to_retire.empty() ) {
192  MsgStream log(msgSvc(),name());
193  for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j) {
194  IDataConnection* c = (*j)->connection;
195  c->disconnect();
196  log << MSG::INFO << "Disconnect from dataset " << c->pfn()
197  << " [" << c->fid() << "]" << endmsg;
198  }
199  }
200  }
201  }
202  return sc;
203 }
204 
207  FidMap::const_iterator j = m_fidMap.find(dataset);
208  if ( j != m_fidMap.end() ) {
209  ConnectionMap::const_iterator i=m_connectionMap.find((*j).second);
210  return (i != m_connectionMap.end()) ? (*i).second->connection : 0;
211  }
212  return 0;
213 }
214 
216  if ( con ) {
217  if ( !con->isConnected() ) {
218  ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
219  if ( i != m_connectionMap.end() ) {
220  Connection* c = (*i).second->connection;
221  if ( c != con ) {
222  m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
223  return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
224  }
225  if ( reconnect((*i).second).isSuccess() ) {
226  return S_OK;
227  }
228  }
229  return S_ERROR;
230  }
231  con->resetAge();
232  return S_OK;
233  }
234  return error("Severe logic bug: No connection object avalible.",true);
235 }
236 
238 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
239  MsgStream log(msgSvc(),name());
240  std::string dsn = dataset;
241  try {
243  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
244  dsn = dataset.substr(4), typ = FID;
245  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
246  dsn = dataset.substr(4), typ = LFN;
247  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
248  dsn = dataset.substr(4), typ = PFN;
249  else if ( typ == UNKNOWN )
250  return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
251 
252  if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) {
253  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
255  }
256  if ( typ == FID ) {
257  ConnectionMap::iterator fi = m_connectionMap.find(dsn);
258  if ( fi == m_connectionMap.end() ) {
259  IFileCatalog::Files files;
260  m_catalog->getPFN(dsn,files);
261  if ( files.size() == 0 ) {
262  if ( !m_useGFAL ) {
263  if ( m_quarantine ) s_badFiles.insert(dsn);
264  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
265  error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
267  }
268  else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
269  std::string gfal_name = "gfal:guid:" + dsn;
270  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
271  sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
272  if ( sc.isSuccess() ) return sc;
273  if ( m_quarantine ) s_badFiles.insert(dsn);
274  }
275  if ( m_quarantine ) s_badFiles.insert(dsn);
276  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
277  error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
279  }
280  // keep track of the current return code before we start iterating over
281  // replicas
283  int origReturnCode = Gaudi::getAppReturnCode(appmgr);
284  for(IFileCatalog::Files::const_iterator i=files.begin(); i!=files.end(); ++i) {
285  std::string pfn = (*i).first;
286  if ( i != files.begin() ) {
287  log << MSG::WARNING << "Attempt to connect dsn:" << dsn
288  << " with next entry in data federation:" << pfn << "." << endmsg;
289  }
290  sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
291  if ( !sc.isSuccess() ) {
292  if ( m_quarantine ) s_badFiles.insert(pfn);
293  m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
294  }
295  else {
296  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
297  // we found a working replica, let's reset the return code to the old value
298  Gaudi::setAppReturnCode(appmgr, origReturnCode, true).ignore();
299  return sc;
300  }
301  }
302  log << MSG::ERROR << "Failed to open dsn:" << dsn
303  << " Federated file could not be resolved from "
304  << files.size() << " entries." << endmsg;
306  }
307  return S_ERROR;
308  }
309  std::string fid;
310  FidMap::iterator j = m_fidMap.find(dsn);
311  if ( j == m_fidMap.end() ) {
312  IFileCatalog::Files files;
313  switch(typ) {
314  case LFN:
315  fid = m_catalog->lookupLFN(dsn);
316  if ( fid.empty() ) {
317  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
318  log << MSG::ERROR << "Failed to resolve LFN:" << dsn
319  << " Cannot access this dataset." << endmsg;
321  }
322  break;
323  case PFN:
324  fid = m_catalog->lookupPFN(dsn);
325  if ( !fid.empty() ) m_catalog->getPFN(fid, files);
326  if ( files.empty() ) {
327  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
328  if ( fid.empty() ) fid = m_catalog->createFID();
329  m_catalog->registerPFN(fid,dsn,technology);
330  log << MSG::INFO << "Referring to dataset " << dsn
331  << " by its file ID:" << fid << endmsg;
332  }
333  else {
334  fid = dsn;
335  }
336  }
337  break;
338  }
339  }
340  else {
341  fid = (*j).second;
342  }
343  if ( typ == PFN ) {
344  // Open PFN
345  ConnectionMap::iterator fi = m_connectionMap.find(fid);
346  if ( fi == m_connectionMap.end() ) {
347  connection->setFID(fid);
348  connection->setPFN(dsn);
349  Entry* e = new Entry(technology, keep_open, rw, connection);
350  // Here we open the file!
351  if ( !reconnect(e).isSuccess() ) {
352  delete e;
353  if ( m_quarantine ) s_badFiles.insert(dsn);
354  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
355  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
357  }
358  fid = connection->fid();
359  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
360  if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
361  if ( ! m_disablePFNWarning && strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
362  log << MSG::ERROR << "Referring to existing dataset " << dsn
363  << " by its physical name." << endmsg;
364  log << "You may not be able to navigate back to the input file"
365  << " -- processing continues" << endmsg;
366  }
367  }
368  m_connectionMap.insert(std::make_pair(fid,e));
369  return S_OK;
370  }
371  // Here we open the file!
372  if ( !reconnect((*fi).second).isSuccess() ) {
373  if ( m_quarantine ) s_badFiles.insert(dsn);
374  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
375  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
377  }
378  return S_OK;
379  }
380  sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
381  if ( !sc.isSuccess() && m_quarantine ) {
382  s_badFiles.insert(fid);
383  }
384  else if ( typ == LFN ) {
385  m_fidMap[dataset] = fid;
386  }
387  return sc;
388  }
389  catch (std::exception& e) {
390  error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
391  }
392  catch(...) {
393  error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
394  }
395  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
396  error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
397  s_badFiles.insert(dsn);
399 }
int age() const
Access age counter.
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:70
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
virtual Connections connections(const IInterface *owner) const
Get connection by owner instance (0=ALL)
int m_ageLimit
Property: Age limit.
Definition: IODataManager.h:47
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:26
virtual StatusCode read(Connection *ioDesc, void *const data, size_t len)
Read raw byte buffer from input stream.
virtual bool isConnected() const =0
Check if connected to data source.
bool m_useGFAL
Property: Flag for auto gfal data access.
Definition: IODataManager.h:49
const std::string & fid() const
Access file id.
virtual Connection * connection(const std::string &dsn) const
Retrieve known connection.
SmartIF< IMessageSvc > & msgSvc() const
The standard message service.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
tuple c
Definition: gaudirun.py:341
std::vector< NamedItem > Files
Definition: IFileCatalog.h:39
SmartIF< IIncidentSvc > m_incSvc
Definition: IODataManager.h:67
StatusCode establishConnection(Connection *con)
const std::string FailInputFile
could not open or read from this file
Definition: Incident.h:78
virtual StatusCode write(Connection *con, const void *data, int len)
Write raw byte buffer to output stream.
std::string m_catalogSvcName
Property: Name of the file catalog service.
Definition: IODataManager.h:45
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:47
const std::string & pfn() const
Access physical file name.
virtual StatusCode disconnect(Connection *ioDesc)
Release data stream.
bool m_disablePFNWarning
Property DisablePFNWarning: if set to True will not report when a file is opened by it's physical nam...
Definition: IODataManager.h:54
#define DECLARE_COMPONENT(type)
Definition: PluginService.h:36
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:51
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:30
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
Definition: IODataManager.h:59
Definition of the basic interface.
Definition: IInterface.h:160
virtual StatusCode connectRead(bool keep_open, Connection *ioDesc)
Open data stream in read mode.
virtual StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN")
Open data stream in write mode.
virtual long long int seek(Connection *ioDesc, long long int where, int origin)
Seek on the file described by ioDesc. Arguments as in ::seek()
Definition: IODataManager.h:32
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:50
bool m_quarantine
Property: Flag if unaccessible files should be quarantines in job.
Definition: IODataManager.h:51
virtual const std::string & name() const
Retrieve name of the service.
Definition: Service.cpp:331
const IInterface * owner() const
Owner instance.
virtual StatusCode initialize()
Initialization (from CONFIGURED to INITIALIZED).
Definition: Service.cpp:74
ABC describing basic data connection.
ConnectionMap m_connectionMap
Map with I/O descriptors.
Definition: IODataManager.h:57
IDataConnection * connection
Definition: IODataManager.h:35
const std::string & CSTR
Definition: IODataManager.h:31
void resetAge()
Reset age.
Base class for all Incidents (computing events).
Definition: Incident.h:16
Templated class to add the standard messaging functionalities.
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
virtual StatusCode initialize()
IService implementation: initialize the service.
void ignore() const
Definition: StatusCode.h:107
Property * declareProperty(const std::string &name, T &property, const std::string &doc="none") const
Declare the named property.
Definition: Service.h:211
This is a number of static methods for bootstrapping the Gaudi framework.
Definition: Bootstrap.h:14
void ageFile()
Increase age of I/O source.
virtual StatusCode connectRead()=0
Open data stream in read mode.
list i
Definition: ana.py:128
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode reconnect(Entry *e)
virtual StatusCode finalize()
Finalize (from INITIALIZED to CONFIGURED).
Definition: Service.cpp:199
IODataManager(CSTR nam, ISvcLocator *loc)
the incident service
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
virtual StatusCode finalize()
IService implementation: finalize the service.
SmartIF< ISvcLocator > & serviceLocator() const
Retrieve pointer to service locator.
Definition: Service.cpp:336
IoType ioType
Definition: IODataManager.h:34
FidMap m_fidMap
Map of FID to PFN.
Definition: IODataManager.h:61