All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "GaudiKernel/Debugger.h"
3 #include "GaudiKernel/MsgStream.h"
4 #include "GaudiKernel/strcasecmp.h"
5 #include "GaudiUtils/IFileCatalog.h"
6 #include "IODataManager.h"
7 #include "GaudiKernel/SmartIF.h"
8 #include "GaudiKernel/Incident.h"
9 #include "GaudiKernel/IIncidentSvc.h"
10 #include "GaudiKernel/AppReturnCode.h"
11 
12 #include <set>
13 
14 namespace {
15 
16 constexpr struct select2nd_t {
17  template<typename S, typename T>
18  const T& operator()(const std::pair<S,T>& p) const { return p.second; }
19 } select2nd {} ;
20 
21 template <typename InputIterator, typename OutputIterator,
22  typename UnaryOperation, typename UnaryPredicate>
23 OutputIterator transform_copy_if( InputIterator first, InputIterator last,
24  OutputIterator result,
25  UnaryOperation op,
26  UnaryPredicate pred) {
27  while (first != last) {
28  auto val = op(*first);
29  if (pred(val)) *result++ = std::move(val);
30  ++first;
31  }
32  return result;
33 }
34 }
35 
36 using namespace Gaudi;
37 
39 
41 
42 static std::set<std::string> s_badFiles;
43 
45  : base_class(nam, svcloc)
46 {
47  declareProperty("CatalogType", m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
48  declareProperty("UseGFAL", m_useGFAL = true);
49  declareProperty("QuarantineFiles", m_quarantine = true);
50  declareProperty("AgeLimit", m_ageLimit = 2);
51  declareProperty("DisablePFNWarning", m_disablePFNWarning = false,
52  "if set to True, we will not report when a file "
53  "is opened by its physical name");
54 }
55 
58  // Initialize base class
60  MsgStream log(msgSvc(), name());
61  if ( !status.isSuccess() ) {
62  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
63  return status;
64  }
65  // Retrieve conversion service handling event iteration
66  m_catalog = serviceLocator()->service(m_catalogSvcName);
67  if( !m_catalog ) {
68  log << MSG::ERROR
69  << "Unable to localize interface IFileCatalog from service:"
71  return StatusCode::FAILURE;
72  }
73  m_incSvc = serviceLocator()->service("IncidentSvc");
74  if( !m_incSvc ) {
75  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
76  return status;
77  }
78  return status;
79 }
80 
83  m_catalog = nullptr; // release
84  return Service::finalize();
85 }
86 
89  MsgStream log(msgSvc(),name());
90  log << MSG::ERROR << "Error: " << msg << endmsg;
91  if ( rethrow ) System::breakExecution();
92  return S_ERROR;
93 }
94 
96 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
97  Connections conns;
98  transform_copy_if( std::begin(m_connectionMap), std::end(m_connectionMap),
99  std::back_inserter(conns),
100  [](ConnectionMap::const_reference i ) { return i.second->connection; },
101  [&](const IDataConnection* c) { return !owner || c->owner() == owner; } );
102  return conns;
103 }
104 
106 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
107  if ( !establishConnection(con) ) {
108  return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
109  }
110  std::string dsn = con ? con->name() : std::string("Unknown");
111  return error("Failed to connect to data:"+dsn,false);
112 }
113 
115 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
116  if ( !establishConnection(con) ) {
117  return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
118  }
119  std::string dsn = con ? con->name() : std::string("Unknown");
120  return error("Failed to connect to data:"+dsn,false);
121 }
122 
124 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
125  return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
126 }
127 
129 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
130  return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
131 }
132 
134 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
135  return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
136 }
137 
139  if ( con ) {
140  std::string dataset = con->name();
141  std::string dsn = dataset;
142  StatusCode sc = con->disconnect();
143  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
144  dsn = dataset.substr(4);
145  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
146  dsn = dataset.substr(4);
147  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
148  dsn = dataset.substr(4);
149 
150  auto j = m_fidMap.find(dataset);
151  if ( j != m_fidMap.end() ) {
152  std::string fid = j->second;
153  std::string gfal_name = "gfal:guid:" + fid;
154  auto i=m_connectionMap.find(fid);
155  m_fidMap.erase(j);
156  if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
157  m_fidMap.erase(j);
158  if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
159  m_fidMap.erase(j);
160  if ( i != m_connectionMap.end() && i->second ) {
161  IDataConnection* c = i->second->connection;
162  if ( (j=m_fidMap.find(c->pfn())) != m_fidMap.end() )
163  m_fidMap.erase(j);
164  if ( c->isConnected() ) {
165  MsgStream log(msgSvc(),name());
166  c->disconnect();
167  log << MSG::INFO << "Disconnect from dataset " << dsn
168  << " [" << fid << "]" << endmsg;
169  }
170  delete i->second;
171  m_connectionMap.erase(i);
172  }
173  }
174  return sc;
175  }
176  return S_ERROR;
177 }
178 
181  if ( e && e->connection ) {
182  switch(e->ioType) {
183  case Connection::READ:
184  sc = e->connection->connectRead();
185  break;
186  case Connection::UPDATE:
187  case Connection::CREATE:
188  case Connection::RECREATE:
189  sc = e->connection->connectWrite(e->ioType);
190  break;
191  default:
192  return S_ERROR;
193  }
194  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
195  std::vector<Entry*> to_retire;
196  e->connection->resetAge();
197  transform_copy_if( std::begin(m_connectionMap), std::end(m_connectionMap),
198  std::back_inserter(to_retire),
199  select2nd,
200  [&](Entry* i) {
202  return e->connection!=c && c->isConnected() &&
203  !i->keepOpen && c->ageFile() > m_ageLimit;
204  });
205  if ( !to_retire.empty() ) {
206  MsgStream log(msgSvc(),name());
207  std::for_each( std::begin(to_retire), std::end(to_retire),
208  [&](Entry* j) {
210  c->disconnect();
211  log << MSG::INFO << "Disconnect from dataset " << c->pfn()
212  << " [" << c->fid() << "]" << endmsg;
213  } );
214  }
215  }
216  }
217  return sc;
218 }
219 
222  auto j = m_fidMap.find(dataset);
223  if ( j == m_fidMap.end() ) return nullptr;
224  auto i=m_connectionMap.find(j->second);
225  return (i != m_connectionMap.end()) ? i->second->connection : nullptr;
226 }
227 
229  if ( !con ) return error("Severe logic bug: No connection object avalible.",true);
230 
231  if ( con->isConnected() ) {
232  con->resetAge();
233  return S_OK;
234  }
235  auto i=m_connectionMap.find(con->name());
236  if ( i != m_connectionMap.end() ) {
237  Connection* c = i->second->connection;
238  if ( c != con ) {
239  m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
240  return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
241  }
242  if ( reconnect(i->second).isSuccess() ) return S_OK;
243  }
244  return S_ERROR;
245 }
246 
248 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
249  MsgStream log(msgSvc(),name());
250  std::string dsn = dataset;
251  try {
253  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
254  dsn = dataset.substr(4), typ = FID;
255  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
256  dsn = dataset.substr(4), typ = LFN;
257  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
258  dsn = dataset.substr(4), typ = PFN;
259  else if ( typ == UNKNOWN )
260  return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
261 
262  if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) {
263  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
265  }
266  if ( typ == FID ) {
267  auto fi = m_connectionMap.find(dsn);
268  if ( fi == m_connectionMap.end() ) {
269  IFileCatalog::Files files;
270  m_catalog->getPFN(dsn,files);
271  if ( files.empty() ) {
272  if ( !m_useGFAL ) {
273  if ( m_quarantine ) s_badFiles.insert(dsn);
274  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
275  error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
277  }
278  else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
279  std::string gfal_name = "gfal:guid:" + dsn;
280  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
281  sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
282  if ( sc.isSuccess() ) return sc;
283  if ( m_quarantine ) s_badFiles.insert(dsn);
284  }
285  if ( m_quarantine ) s_badFiles.insert(dsn);
286  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
287  error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
289  }
290  // keep track of the current return code before we start iterating over
291  // replicas
292  auto appmgr = serviceLocator()->as<IProperty>();
293  int origReturnCode = Gaudi::getAppReturnCode(appmgr);
294  for(auto i=files.cbegin(); i!=files.cend(); ++i) {
295  std::string pfn = i->first;
296  if ( i != files.cbegin() ) {
297  log << MSG::WARNING << "Attempt to connect dsn:" << dsn
298  << " with next entry in data federation:" << pfn << "." << endmsg;
299  }
300  sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
301  if ( !sc.isSuccess() ) {
302  if ( m_quarantine ) s_badFiles.insert(pfn);
303  m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
304  }
305  else {
306  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
307  // we found a working replica, let's reset the return code to the old value
308  Gaudi::setAppReturnCode(appmgr, origReturnCode, true).ignore();
309  return sc;
310  }
311  }
312  log << MSG::ERROR << "Failed to open dsn:" << dsn
313  << " Federated file could not be resolved from "
314  << files.size() << " entries." << endmsg;
316  }
317  return S_ERROR;
318  }
319  std::string fid;
320  auto j = m_fidMap.find(dsn);
321  if ( j == m_fidMap.end() ) {
322  IFileCatalog::Files files;
323  switch(typ) {
324  case LFN:
325  fid = m_catalog->lookupLFN(dsn);
326  if ( fid.empty() ) {
327  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
328  log << MSG::ERROR << "Failed to resolve LFN:" << dsn
329  << " Cannot access this dataset." << endmsg;
331  }
332  break;
333  case PFN:
334  fid = m_catalog->lookupPFN(dsn);
335  if ( !fid.empty() ) m_catalog->getPFN(fid, files);
336  if ( files.empty() ) {
337  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
338  if ( fid.empty() ) fid = m_catalog->createFID();
339  m_catalog->registerPFN(fid,dsn,technology);
340  log << MSG::INFO << "Referring to dataset " << dsn
341  << " by its file ID:" << fid << endmsg;
342  }
343  else {
344  fid = dsn;
345  }
346  }
347  break;
348  }
349  }
350  else {
351  fid = j->second;
352  }
353  if ( typ == PFN ) {
354  // Open PFN
355  auto fi = m_connectionMap.find(fid);
356  if ( fi == m_connectionMap.end() ) {
357  connection->setFID(fid);
358  connection->setPFN(dsn);
359  auto e = new Entry(technology, keep_open, rw, connection);
360  // Here we open the file!
361  if ( !reconnect(e).isSuccess() ) {
362  delete e;
363  if ( m_quarantine ) s_badFiles.insert(dsn);
364  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
365  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
367  }
368  fid = connection->fid();
369  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
370  if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
371  if ( ! m_disablePFNWarning && strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
372  log << MSG::ERROR << "Referring to existing dataset " << dsn
373  << " by its physical name." << endmsg;
374  log << "You may not be able to navigate back to the input file"
375  << " -- processing continues" << endmsg;
376  }
377  }
378  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
379  return S_OK;
380  }
381  // Here we open the file!
382  if ( !reconnect((*fi).second).isSuccess() ) {
383  if ( m_quarantine ) s_badFiles.insert(dsn);
384  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
385  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
387  }
388  return S_OK;
389  }
390  sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
391  if ( !sc.isSuccess() && m_quarantine ) {
392  s_badFiles.insert(fid);
393  }
394  else if ( typ == LFN ) {
395  m_fidMap[dataset] = fid;
396  }
397  return sc;
398  }
399  catch (std::exception& e) {
400  error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
401  }
402  catch(...) {
403  error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
404  }
405  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
406  error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
407  s_badFiles.insert(dsn);
409 }
StatusCode finalize() override
IService implementation: finalize the service.
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:70
tuple c
Definition: gaudirun.py:391
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:63
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
int m_ageLimit
Property: Age limit.
Definition: IODataManager.h:47
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
virtual bool isConnected() const =0
Check if connected to data source.
bool m_useGFAL
Property: Flag for auto gfal data access.
Definition: IODataManager.h:49
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
const std::string & fid() const
Access file id.
StatusCode finalize() override
Definition: Service.cpp:188
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
auto begin(reverse_wrapper< T > &w)
Definition: reverse.h:45
std::vector< NamedItem > Files
Definition: IFileCatalog.h:39
StatusCode disconnect(Connection *ioDesc) override
Release data stream.
SmartIF< IIncidentSvc > m_incSvc
Definition: IODataManager.h:67
StatusCode establishConnection(Connection *con)
std::string m_catalogSvcName
Property: Name of the file catalog service.
Definition: IODataManager.h:45
StatusCode error(CSTR msg, bool rethrow)
Small routine to issue exceptions.
const std::string & pfn() const
Access physical file name.
bool m_disablePFNWarning
Property DisablePFNWarning: if set to True will not report when a file is opened by it's physical nam...
Definition: IODataManager.h:54
StatusCode read(Connection *ioDesc, void *const data, size_t len) override
Read raw byte buffer from input stream.
Connection * connection(const std::string &dsn) const override
Retrieve known connection.
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:47
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
int ageFile()
Increase age of I/O source.
StatusCode initialize() override
IService implementation: initialize the service.
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:47
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
Definition: IODataManager.h:59
#define DECLARE_COMPONENT(type)
Definition: PluginService.h:36
Definition of the basic interface.
Definition: IInterface.h:234
StatusCode write(Connection *con, const void *data, int len) override
Write raw byte buffer to output stream.
Definition: IODataManager.h:32
StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN") override
Open data stream in write mode.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:50
bool m_quarantine
Property: Flag if unaccessible files should be quarantines in job.
Definition: IODataManager.h:51
ABC describing basic data connection.
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
ConnectionMap m_connectionMap
Map with I/O descriptors.
Definition: IODataManager.h:57
IDataConnection * connection
Definition: IODataManager.h:35
const std::string & CSTR
Definition: IODataManager.h:31
void resetAge()
Reset age.
Base class for all Incidents (computing events).
Definition: Incident.h:16
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
StatusCode connectRead(bool keep_open, Connection *ioDesc) override
Open data stream in read mode.
void ignore() const
Definition: StatusCode.h:108
bool keepOpen
Definition: IODataManager.h:36
The IProperty is the basic interface for all components which have properties that can be set or get...
Definition: IProperty.h:21
virtual StatusCode connectRead()=0
Open data stream in read mode.
list i
Definition: ana.py:128
Connections connections(const IInterface *owner) const override
Get connection by owner instance (0=ALL)
long long int seek(Connection *ioDesc, long long int where, int origin) override
Seek on the file described by ioDesc. Arguments as in ::seek()
Helper functions to set/get the application return code.
Definition: __init__.py:1
StatusCode reconnect(Entry *e)
IODataManager(CSTR nam, ISvcLocator *loc)
the incident service
IoType ioType
Definition: IODataManager.h:34
FidMap m_fidMap
Map of FID to PFN.
Definition: IODataManager.h:61