The Gaudi Framework  v32r2 (46d42edc)
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "IODataManager.h"
4 #include "GaudiKernel/Debugger.h"
6 #include "GaudiKernel/Incident.h"
8 #include "GaudiKernel/SmartIF.h"
11 
12 #include <set>
13 
14 namespace {
15 
16  constexpr struct select2nd_t {
17  template <typename S, typename T>
18  const T& operator()( const std::pair<S, T>& p ) const {
19  return p.second;
20  }
21  } select2nd{};
22 
23  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
24  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
25  UnaryPredicate pred ) {
26  while ( first != last ) {
27  auto val = op( *first );
28  if ( pred( val ) ) *result++ = std::move( val );
29  ++first;
30  }
31  return result;
32  }
33 } // namespace
34 
35 using namespace Gaudi;
36 
38 
39 static std::set<std::string> s_badFiles;
40 
43  // Initialize base class
45  MsgStream log( msgSvc(), name() );
46  if ( !status.isSuccess() ) {
47  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
48  return status;
49  }
50  // Retrieve conversion service handling event iteration
52  if ( !m_catalog ) {
53  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
54  return StatusCode::FAILURE;
55  }
56  m_incSvc = serviceLocator()->service( "IncidentSvc" );
57  if ( !m_incSvc ) {
58  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
59  return status;
60  }
61  return status;
62 }
63 
66  m_catalog = nullptr; // release
67  return Service::finalize();
68 }
69 
72  MsgStream log( msgSvc(), name() );
73  log << MSG::ERROR << "Error: " << msg << endmsg;
74  if ( rethrow ) System::breakExecution();
75  return StatusCode::FAILURE;
76 }
77 
79 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const {
80  Connections conns;
81  transform_copy_if(
83  []( ConnectionMap::const_reference i ) { return i.second->connection; },
84  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
85  return conns;
86 }
87 
89 StatusCode IODataManager::connectRead( bool keep_open, Connection* con ) {
90  if ( !establishConnection( con ) ) {
91  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
92  }
93  std::string dsn = con ? con->name() : std::string( "Unknown" );
94  return error( "Failed to connect to data:" + dsn, false );
95 }
96 
98 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype ) {
99  if ( !establishConnection( con ) ) { return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con ); }
100  std::string dsn = con ? con->name() : std::string( "Unknown" );
101  return error( "Failed to connect to data:" + dsn, false );
102 }
103 
105 StatusCode IODataManager::read( Connection* con, void* const data, size_t len ) {
106  return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
107 }
108 
110 StatusCode IODataManager::write( Connection* con, const void* data, int len ) {
111  return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
112 }
113 
115 long long int IODataManager::seek( Connection* con, long long int where, int origin ) {
116  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
117 }
118 
120  if ( con ) {
121  std::string dataset = con->name();
122  std::string dsn = dataset;
123  StatusCode sc = con->disconnect();
124  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
125  dsn = dataset.substr( 4 );
126  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
127  dsn = dataset.substr( 4 );
128  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
129  dsn = dataset.substr( 4 );
130 
131  auto j = m_fidMap.find( dataset );
132  if ( j != m_fidMap.end() ) {
133  std::string fid = j->second;
134  std::string gfal_name = "gfal:guid:" + fid;
135  auto i = m_connectionMap.find( fid );
136  m_fidMap.erase( j );
137  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
138  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
139  if ( i != m_connectionMap.end() && i->second ) {
140  IDataConnection* c = i->second->connection;
141  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
142  if ( c->isConnected() ) {
143  MsgStream log( msgSvc(), name() );
144  c->disconnect();
145  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
146  }
147  delete i->second;
148  m_connectionMap.erase( i );
149  }
150  }
151  return sc;
152  }
153  return StatusCode::FAILURE;
154 }
155 
158  if ( e && e->connection ) {
159  switch ( e->ioType ) {
160  case Connection::READ:
161  sc = e->connection->connectRead();
162  break;
163  case Connection::UPDATE:
164  case Connection::CREATE:
165  case Connection::RECREATE:
166  sc = e->connection->connectWrite( e->ioType );
167  break;
168  default:
169  return StatusCode::FAILURE;
170  }
171  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
172  std::vector<Entry*> to_retire;
173  e->connection->resetAge();
174  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
175  select2nd, [&]( Entry* i ) {
177  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
178  } );
179  if ( !to_retire.empty() ) {
180  MsgStream log( msgSvc(), name() );
181  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
183  c->disconnect();
184  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
185  } );
186  }
187  }
188  }
189  return sc;
190 }
191 
194  auto j = m_fidMap.find( dataset );
195  if ( j == m_fidMap.end() ) return nullptr;
196  auto i = m_connectionMap.find( j->second );
197  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
198 }
199 
201  if ( !con ) return error( "Severe logic bug: No connection object avalible.", true );
202 
203  if ( con->isConnected() ) {
204  con->resetAge();
205  return StatusCode::SUCCESS;
206  }
207  auto i = m_connectionMap.find( con->name() );
208  if ( i != m_connectionMap.end() ) {
209  Connection* c = i->second->connection;
210  if ( c != con ) {
211  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
212  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
213  }
214  if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
215  }
216  return StatusCode::FAILURE;
217 }
218 
219 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
220  Connection* connection ) {
221  MsgStream log( msgSvc(), name() );
222  std::string dsn = dataset;
223  try {
224  StatusCode sc( StatusCode::SUCCESS, true );
225  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
226  dsn = dataset.substr( 4 ), typ = FID;
227  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
228  dsn = dataset.substr( 4 ), typ = LFN;
229  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
230  dsn = dataset.substr( 4 ), typ = PFN;
231  else if ( typ == UNKNOWN )
232  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
233 
234  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
235  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
237  }
238  if ( typ == FID ) {
239  auto fi = m_connectionMap.find( dsn );
240  if ( fi == m_connectionMap.end() ) {
241  IFileCatalog::Files files;
242  m_catalog->getPFN( dsn, files );
243  if ( files.empty() ) {
244  if ( !m_useGFAL ) {
245  if ( m_quarantine ) s_badFiles.insert( dsn );
246  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
247  error( "connectDataIO> failed to resolve FID:" + dsn, false ).ignore();
249  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
250  std::string gfal_name = "gfal:guid:" + dsn;
251  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
252  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
253  if ( sc.isSuccess() ) return sc;
254  if ( m_quarantine ) s_badFiles.insert( dsn );
255  }
256  if ( m_quarantine ) s_badFiles.insert( dsn );
257  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
258  error( "connectDataIO> Failed to resolve FID:" + dsn, false ).ignore();
260  }
261  // keep track of the current return code before we start iterating over
262  // replicas
263  auto appmgr = serviceLocator()->as<IProperty>();
264  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
265  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
266  std::string pfn = i->first;
267  if ( i != files.cbegin() ) {
268  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
269  << "." << endmsg;
270  }
271  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
272  if ( !sc.isSuccess() ) {
273  if ( m_quarantine ) s_badFiles.insert( pfn );
274  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
275  } else {
276  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
277  // we found a working replica, let's reset the return code to the old value
278  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
279  return sc;
280  }
281  }
282  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
283  << files.size() << " entries." << endmsg;
285  }
286  return StatusCode::FAILURE;
287  }
288  std::string fid;
289  auto j = m_fidMap.find( dsn );
290  if ( j == m_fidMap.end() ) {
291  IFileCatalog::Files files;
292  switch ( typ ) {
293  case LFN:
294  fid = m_catalog->lookupLFN( dsn );
295  if ( fid.empty() ) {
296  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
297  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
299  }
300  break;
301  case PFN:
302  fid = m_catalog->lookupPFN( dsn );
303  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
304  if ( files.empty() ) {
305  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
306  if ( fid.empty() ) fid = m_catalog->createFID();
307  m_catalog->registerPFN( fid, dsn, technology );
308  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
309  } else {
310  fid = dsn;
311  }
312  }
313  break;
314  }
315  } else {
316  fid = j->second;
317  }
318  if ( typ == PFN ) {
319  // Open PFN
320  auto fi = m_connectionMap.find( fid );
321  if ( fi == m_connectionMap.end() ) {
322  connection->setFID( fid );
323  connection->setPFN( dsn );
324  auto e = new Entry( technology, keep_open, rw, connection );
325  // Here we open the file!
326  if ( !reconnect( e ).isSuccess() ) {
327  delete e;
328  if ( m_quarantine ) s_badFiles.insert( dsn );
329  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
330  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
332  }
333  fid = connection->fid();
334  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
335  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
336  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
337  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
338  log << "You may not be able to navigate back to the input file"
339  << " -- processing continues" << endmsg;
340  }
341  }
342  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
343  return StatusCode::SUCCESS;
344  }
345  // Here we open the file!
346  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
347  if ( m_quarantine ) s_badFiles.insert( dsn );
348  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
349  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
351  }
352  return StatusCode::SUCCESS;
353  }
354  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
355  if ( !sc.isSuccess() && m_quarantine ) {
356  s_badFiles.insert( fid );
357  } else if ( typ == LFN ) {
358  m_fidMap[dataset] = fid;
359  }
360  return sc;
361  } catch ( std::exception& e ) {
362  error( std::string( "connectDataIO> Caught exception:" ) + e.what(), false ).ignore();
363  } catch ( ... ) { error( std::string( "connectDataIO> Caught unknown exception" ), false ).ignore(); }
364  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
365  error( "connectDataIO> The dataset " + dsn + " cannot be opened.", false ).ignore();
366  s_badFiles.insert( dsn );
368 }
StatusCode finalize() override
IService implementation: finalize the service.
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:69
Gaudi::Property< int > m_ageLimit
Definition: IODataManager.h:48
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:60
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:277
T empty(T... args)
const SmartIF< IMessageSvc > & msgSvc() const
The standard message service.
StatusCode finalize() override
Definition: Service.cpp:164
StatusCode disconnect(Connection *ioDesc) override
Release data stream.
Gaudi::Property< std::string > m_catalogSvcName
Definition: IODataManager.h:43
SmartIF< IIncidentSvc > m_incSvc
Definition: IODataManager.h:64
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
StatusCode establishConnection(Connection *con)
T end(T... args)
SmartIF< IFace > as()
Definition: ISvcLocator.h:103
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:47
StatusCode read(Connection *ioDesc, void *const data, size_t len) override
Read raw byte buffer from input stream.
STL class.
#define DECLARE_COMPONENT(type)
Connection * connection(const std::string &dsn) const override
Retrieve known connection.
Gaudi::Property< bool > m_quarantine
Definition: IODataManager.h:46
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
Definition: ISvcLocator.h:76
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:274
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
virtual void fireIncident(const Incident &incident)=0
Fire an Incident.
T what(T... args)
StatusCode initialize() override
IService implementation: initialize the service.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
Definition: IODataManager.h:56
Definition of the basic interface.
Definition: IInterface.h:244
T erase(T... args)
StatusCode write(Connection *con, const void *data, int len) override
Write raw byte buffer to output stream.
Definition: IODataManager.h:32
Gaudi::Property< bool > m_useGFAL
Definition: IODataManager.h:45
StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN") override
Open data stream in write mode.
bool isSuccess() const
Definition: StatusCode.h:267
STL class.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:49
T move(T... args)
T insert(T... args)
T find(T... args)
T length(T... args)
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:153
STL class.
ABC describing basic data connection.
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
T begin(T... args)
T back_inserter(T... args)
ConnectionMap m_connectionMap
Map with I/O descriptors.
Definition: IODataManager.h:54
IDataConnection * connection
Definition: IODataManager.h:35
void resetAge()
Reset age.
T c_str(T... args)
Base class for all Incidents (computing events).
Definition: Incident.h:17
Gaudi::Property< bool > m_disablePFNWarning
Definition: IODataManager.h:49
T emplace(T... args)
constexpr static const auto FAILURE
Definition: StatusCode.h:86
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
T substr(T... args)
StatusCode connectRead(bool keep_open, Connection *ioDesc) override
Open data stream in read mode.
bool keepOpen
Definition: IODataManager.h:36
The IProperty is the basic interface for all components which have properties that can be set or get.
Definition: IProperty.h:20
T for_each(T... args)
virtual StatusCode connectRead()=0
Open data stream in read mode.
Connections connections(const IInterface *owner) const override
Get connection by owner instance (0=ALL)
long long int seek(Connection *ioDesc, long long int where, int origin) override
Seek on the file described by ioDesc. Arguments as in ::seek()
Header file for std:chrono::duration-based Counters.
Definition: __init__.py:1
StatusCode reconnect(Entry *e)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
IoType ioType
Definition: IODataManager.h:34
FidMap m_fidMap
Map of FID to PFN.
Definition: IODataManager.h:58