All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "GaudiKernel/Debugger.h"
6 #include "IODataManager.h"
7 #include "GaudiKernel/SmartIF.h"
8 #include "GaudiKernel/Incident.h"
11 
12 #include <set>
13 
14 namespace {
15 
16 constexpr struct select2nd_t {
17  template<typename S, typename T>
18  const T& operator()(const std::pair<S,T>& p) const { return p.second; }
19 } select2nd {} ;
20 
21 template <typename InputIterator, typename OutputIterator,
22  typename UnaryOperation, typename UnaryPredicate>
23 OutputIterator transform_copy_if( InputIterator first, InputIterator last,
24  OutputIterator result,
25  UnaryOperation op,
26  UnaryPredicate pred) {
27  while (first != last) {
28  auto val = op(*first);
29  if (pred(val)) *result++ = std::move(val);
30  ++first;
31  }
32  return result;
33 }
34 }
35 
36 using namespace Gaudi;
37 
39 
41 
42 static std::set<std::string> s_badFiles;
43 
45 StatusCode IODataManager::initialize() {
46  // Initialize base class
48  MsgStream log(msgSvc(), name());
49  if ( !status.isSuccess() ) {
50  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
51  return status;
52  }
53  // Retrieve conversion service handling event iteration
54  m_catalog = serviceLocator()->service(m_catalogSvcName);
55  if( !m_catalog ) {
56  log << MSG::ERROR
57  << "Unable to localize interface IFileCatalog from service:"
58  << m_catalogSvcName << endmsg;
59  return StatusCode::FAILURE;
60  }
61  m_incSvc = serviceLocator()->service("IncidentSvc");
62  if( !m_incSvc ) {
63  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
64  return status;
65  }
66  return status;
67 }
68 
70 StatusCode IODataManager::finalize() {
71  m_catalog = nullptr; // release
72  return Service::finalize();
73 }
74 
76 StatusCode IODataManager::error(CSTR msg, bool rethrow) {
77  MsgStream log(msgSvc(),name());
78  log << MSG::ERROR << "Error: " << msg << endmsg;
79  if ( rethrow ) System::breakExecution();
80  return S_ERROR;
81 }
82 
84 IODataManager::Connections IODataManager::connections(const IInterface* owner) const {
85  Connections conns;
86  transform_copy_if( std::begin(m_connectionMap), std::end(m_connectionMap),
87  std::back_inserter(conns),
88  [](ConnectionMap::const_reference i ) { return i.second->connection; },
89  [&](const IDataConnection* c) { return !owner || c->owner() == owner; } );
90  return conns;
91 }
92 
94 StatusCode IODataManager::connectRead(bool keep_open, Connection* con) {
95  if ( !establishConnection(con) ) {
96  return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
97  }
98  std::string dsn = con ? con->name() : std::string("Unknown");
99  return error("Failed to connect to data:"+dsn,false);
100 }
101 
103 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,CSTR doctype) {
104  if ( !establishConnection(con) ) {
105  return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
106  }
107  std::string dsn = con ? con->name() : std::string("Unknown");
108  return error("Failed to connect to data:"+dsn,false);
109 }
110 
112 StatusCode IODataManager::read(Connection* con, void* const data, size_t len) {
113  return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
114 }
115 
117 StatusCode IODataManager::write(Connection* con, const void* data, int len) {
118  return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
119 }
120 
122 long long int IODataManager::seek(Connection* con, long long int where, int origin) {
123  return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
124 }
125 
126 StatusCode IODataManager::disconnect(Connection* con) {
127  if ( con ) {
128  std::string dataset = con->name();
129  std::string dsn = dataset;
130  StatusCode sc = con->disconnect();
131  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
132  dsn = dataset.substr(4);
133  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
134  dsn = dataset.substr(4);
135  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
136  dsn = dataset.substr(4);
137 
138  auto j = m_fidMap.find(dataset);
139  if ( j != m_fidMap.end() ) {
140  std::string fid = j->second;
141  std::string gfal_name = "gfal:guid:" + fid;
142  auto i=m_connectionMap.find(fid);
143  m_fidMap.erase(j);
144  if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
145  m_fidMap.erase(j);
146  if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
147  m_fidMap.erase(j);
148  if ( i != m_connectionMap.end() && i->second ) {
149  IDataConnection* c = i->second->connection;
150  if ( (j=m_fidMap.find(c->pfn())) != m_fidMap.end() )
151  m_fidMap.erase(j);
152  if ( c->isConnected() ) {
153  MsgStream log(msgSvc(),name());
154  c->disconnect();
155  log << MSG::INFO << "Disconnect from dataset " << dsn
156  << " [" << fid << "]" << endmsg;
157  }
158  delete i->second;
159  m_connectionMap.erase(i);
160  }
161  }
162  return sc;
163  }
164  return S_ERROR;
165 }
166 
167 StatusCode IODataManager::reconnect(Entry* e) {
168  StatusCode sc = S_ERROR;
169  if ( e && e->connection ) {
170  switch(e->ioType) {
171  case Connection::READ:
172  sc = e->connection->connectRead();
173  break;
174  case Connection::UPDATE:
175  case Connection::CREATE:
176  case Connection::RECREATE:
177  sc = e->connection->connectWrite(e->ioType);
178  break;
179  default:
180  return S_ERROR;
181  }
182  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
183  std::vector<Entry*> to_retire;
184  e->connection->resetAge();
185  transform_copy_if( std::begin(m_connectionMap), std::end(m_connectionMap),
186  std::back_inserter(to_retire),
187  select2nd,
188  [&](Entry* i) {
190  return e->connection!=c && c->isConnected() &&
191  !i->keepOpen && c->ageFile() > m_ageLimit;
192  });
193  if ( !to_retire.empty() ) {
194  MsgStream log(msgSvc(),name());
195  std::for_each( std::begin(to_retire), std::end(to_retire),
196  [&](Entry* j) {
198  c->disconnect();
199  log << MSG::INFO << "Disconnect from dataset " << c->pfn()
200  << " [" << c->fid() << "]" << endmsg;
201  } );
202  }
203  }
204  }
205  return sc;
206 }
207 
209 IIODataManager::Connection* IODataManager::connection(CSTR dataset) const {
210  auto j = m_fidMap.find(dataset);
211  if ( j == m_fidMap.end() ) return nullptr;
212  auto i=m_connectionMap.find(j->second);
213  return (i != m_connectionMap.end()) ? i->second->connection : nullptr;
214 }
215 
216 StatusCode IODataManager::establishConnection(Connection* con) {
217  if ( !con ) return error("Severe logic bug: No connection object avalible.",true);
218 
219  if ( con->isConnected() ) {
220  con->resetAge();
221  return S_OK;
222  }
223  auto i=m_connectionMap.find(con->name());
224  if ( i != m_connectionMap.end() ) {
225  Connection* c = i->second->connection;
226  if ( c != con ) {
227  m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
228  return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
229  }
230  if ( reconnect(i->second).isSuccess() ) return S_OK;
231  }
232  return S_ERROR;
233 }
234 
236 IODataManager::connectDataIO(int typ, IoType rw, CSTR dataset, CSTR technology,bool keep_open,Connection* connection) {
237  MsgStream log(msgSvc(),name());
238  std::string dsn = dataset;
239  try {
241  if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
242  dsn = dataset.substr(4), typ = FID;
243  else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
244  dsn = dataset.substr(4), typ = LFN;
245  else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
246  dsn = dataset.substr(4), typ = PFN;
247  else if ( typ == UNKNOWN )
248  return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
249 
250  if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end()) {
251  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
252  return IDataConnection::BAD_DATA_CONNECTION;
253  }
254  if ( typ == FID ) {
255  auto fi = m_connectionMap.find(dsn);
256  if ( fi == m_connectionMap.end() ) {
257  IFileCatalog::Files files;
258  m_catalog->getPFN(dsn,files);
259  if ( files.empty() ) {
260  if ( !m_useGFAL ) {
261  if ( m_quarantine ) s_badFiles.insert(dsn);
262  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
263  error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
264  return IDataConnection::BAD_DATA_CONNECTION;
265  }
266  else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' ) {
267  std::string gfal_name = "gfal:guid:" + dsn;
268  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
269  sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
270  if ( sc.isSuccess() ) return sc;
271  if ( m_quarantine ) s_badFiles.insert(dsn);
272  }
273  if ( m_quarantine ) s_badFiles.insert(dsn);
274  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
275  error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
276  return IDataConnection::BAD_DATA_CONNECTION;
277  }
278  // keep track of the current return code before we start iterating over
279  // replicas
280  auto appmgr = serviceLocator()->as<IProperty>();
281  int origReturnCode = Gaudi::getAppReturnCode(appmgr);
282  for(auto i=files.cbegin(); i!=files.cend(); ++i) {
283  std::string pfn = i->first;
284  if ( i != files.cbegin() ) {
285  log << MSG::WARNING << "Attempt to connect dsn:" << dsn
286  << " with next entry in data federation:" << pfn << "." << endmsg;
287  }
288  sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
289  if ( !sc.isSuccess() ) {
290  if ( m_quarantine ) s_badFiles.insert(pfn);
291  m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
292  }
293  else {
294  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
295  // we found a working replica, let's reset the return code to the old value
296  Gaudi::setAppReturnCode(appmgr, origReturnCode, true).ignore();
297  return sc;
298  }
299  }
300  log << MSG::ERROR << "Failed to open dsn:" << dsn
301  << " Federated file could not be resolved from "
302  << files.size() << " entries." << endmsg;
303  return IDataConnection::BAD_DATA_CONNECTION;
304  }
305  return S_ERROR;
306  }
307  std::string fid;
308  auto j = m_fidMap.find(dsn);
309  if ( j == m_fidMap.end() ) {
310  IFileCatalog::Files files;
311  switch(typ) {
312  case LFN:
313  fid = m_catalog->lookupLFN(dsn);
314  if ( fid.empty() ) {
315  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
316  log << MSG::ERROR << "Failed to resolve LFN:" << dsn
317  << " Cannot access this dataset." << endmsg;
318  return IDataConnection::BAD_DATA_CONNECTION;
319  }
320  break;
321  case PFN:
322  fid = m_catalog->lookupPFN(dsn);
323  if ( !fid.empty() ) m_catalog->getPFN(fid, files);
324  if ( files.empty() ) {
325  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
326  if ( fid.empty() ) fid = m_catalog->createFID();
327  m_catalog->registerPFN(fid,dsn,technology);
328  log << MSG::INFO << "Referring to dataset " << dsn
329  << " by its file ID:" << fid << endmsg;
330  }
331  else {
332  fid = dsn;
333  }
334  }
335  break;
336  }
337  }
338  else {
339  fid = j->second;
340  }
341  if ( typ == PFN ) {
342  // Open PFN
343  auto fi = m_connectionMap.find(fid);
344  if ( fi == m_connectionMap.end() ) {
345  connection->setFID(fid);
346  connection->setPFN(dsn);
347  auto e = new Entry(technology, keep_open, rw, connection);
348  // Here we open the file!
349  if ( !reconnect(e).isSuccess() ) {
350  delete e;
351  if ( m_quarantine ) s_badFiles.insert(dsn);
352  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
353  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
354  return IDataConnection::BAD_DATA_CONNECTION;
355  }
356  fid = connection->fid();
357  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
358  if ( !(rw==Connection::CREATE || rw==Connection::RECREATE) ) {
359  if ( ! m_disablePFNWarning && strcasecmp(dsn.c_str(),fid.c_str()) == 0 ) {
360  log << MSG::ERROR << "Referring to existing dataset " << dsn
361  << " by its physical name." << endmsg;
362  log << "You may not be able to navigate back to the input file"
363  << " -- processing continues" << endmsg;
364  }
365  }
366  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
367  return S_OK;
368  }
369  // Here we open the file!
370  if ( !reconnect((*fi).second).isSuccess() ) {
371  if ( m_quarantine ) s_badFiles.insert(dsn);
372  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
373  error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
374  return IDataConnection::BAD_DATA_CONNECTION;
375  }
376  return S_OK;
377  }
378  sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
379  if ( !sc.isSuccess() && m_quarantine ) {
380  s_badFiles.insert(fid);
381  }
382  else if ( typ == LFN ) {
383  m_fidMap[dataset] = fid;
384  }
385  return sc;
386  }
387  catch (std::exception& e) {
388  error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
389  }
390  catch(...) {
391  error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
392  }
393  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
394  error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
395  s_badFiles.insert(dsn);
396  return IDataConnection::BAD_DATA_CONNECTION;
397 }
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:72
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:64
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
T empty(T...args)
const std::string & fid() const
Access file id.
StatusCode finalize() override
Definition: Service.cpp:174
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
T end(T...args)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:47
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
Definition: PluginService.h:36
STL class.
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
T what(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
Definition of the basic interface.
Definition: IInterface.h:234
Definition: IODataManager.h:34
STL class.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:51
T move(T...args)
T insert(T...args)
T find(T...args)
T length(T...args)
STL class.
ABC describing basic data connection.
T begin(T...args)
T back_inserter(T...args)
IDataConnection * connection
Definition: IODataManager.h:37
void resetAge()
Reset age.
T c_str(T...args)
Base class for all Incidents (computing events).
Definition: Incident.h:17
T substr(T...args)
void ignore() const
Definition: StatusCode.h:106
bool keepOpen
Definition: IODataManager.h:38
The IProperty is the basic interface for all components which have properties that can be set or get...
Definition: IProperty.h:20
T for_each(T...args)
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
IoType ioType
Definition: IODataManager.h:36