The Gaudi Framework  master (37c0b60a)
IODataManager.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 // Framework include files
12 #include "IODataManager.h"
14 #include <GaudiKernel/Debugger.h>
16 #include <GaudiKernel/Incident.h>
17 #include <GaudiKernel/MsgStream.h>
18 #include <GaudiKernel/SmartIF.h>
19 #include <GaudiKernel/strcasecmp.h>
21 
22 #include <set>
23 
24 namespace {
25 
26  constexpr struct select2nd_t {
27  template <typename S, typename T>
28  const T& operator()( const std::pair<S, T>& p ) const {
29  return p.second;
30  }
31  } select2nd{};
32 
33  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
34  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
35  UnaryPredicate pred ) {
36  while ( first != last ) {
37  auto val = op( *first );
38  if ( pred( val ) ) *result++ = std::move( val );
39  ++first;
40  }
41  return result;
42  }
43 } // namespace
44 
45 using namespace Gaudi;
46 
48 
49 static std::set<std::string> s_badFiles;
50 
53  // Initialize base class
55  MsgStream log( msgSvc(), name() );
56  if ( !status.isSuccess() ) {
57  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
58  return status;
59  }
60  // Retrieve conversion service handling event iteration
62  if ( !m_catalog ) {
63  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
64  return StatusCode::FAILURE;
65  }
66  m_incSvc = serviceLocator()->service( "IncidentSvc" );
67  if ( !m_incSvc ) {
68  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
69  return status;
70  }
71  return status;
72 }
73 
76  m_catalog = nullptr; // release
77  return Service::finalize();
78 }
79 
82  MsgStream log( msgSvc(), name() );
83  log << MSG::ERROR << "Error: " << msg << endmsg;
84  if ( rethrow ) System::breakExecution();
85  return StatusCode::FAILURE;
86 }
87 
89 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const {
90  Connections conns;
91  transform_copy_if(
93  []( ConnectionMap::const_reference i ) { return i.second->connection; },
94  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
95  return conns;
96 }
97 
99 StatusCode IODataManager::connectRead( bool keep_open, Connection* con ) {
100  if ( !establishConnection( con ) ) {
101  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
102  }
103  std::string dsn = con ? con->name() : std::string( "Unknown" );
104  return error( "Failed to connect to data:" + dsn, false );
105 }
106 
108 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype ) {
109  if ( !establishConnection( con ) ) { return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con ); }
110  std::string dsn = con ? con->name() : std::string( "Unknown" );
111  return error( "Failed to connect to data:" + dsn, false );
112 }
113 
115 StatusCode IODataManager::read( Connection* con, void* const data, size_t len ) {
116  return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
117 }
118 
120 StatusCode IODataManager::write( Connection* con, const void* data, int len ) {
121  return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
122 }
123 
125 long long int IODataManager::seek( Connection* con, long long int where, int origin ) {
126  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
127 }
128 
130  if ( con ) {
131  std::string dataset = con->name();
132  std::string dsn = dataset;
133  StatusCode sc = con->disconnect();
134  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
135  dsn = dataset.substr( 4 );
136  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
137  dsn = dataset.substr( 4 );
138  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
139  dsn = dataset.substr( 4 );
140 
141  auto j = m_fidMap.find( dataset );
142  if ( j != m_fidMap.end() ) {
143  std::string fid = j->second;
144  std::string gfal_name = "gfal:guid:" + fid;
145  auto i = m_connectionMap.find( fid );
146  m_fidMap.erase( j );
147  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
148  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
149  if ( i != m_connectionMap.end() && i->second ) {
150  IDataConnection* c = i->second->connection;
151  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
152  if ( c->isConnected() ) {
153  MsgStream log( msgSvc(), name() );
154  c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
155  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
156  }
157  delete i->second;
158  m_connectionMap.erase( i );
159  }
160  }
161  return sc;
162  }
163  return StatusCode::FAILURE;
164 }
165 
168  if ( e && e->connection ) {
169  switch ( e->ioType ) {
170  case Connection::READ:
171  sc = e->connection->connectRead();
172  break;
173  case Connection::UPDATE:
174  case Connection::CREATE:
175  case Connection::RECREATE:
176  sc = e->connection->connectWrite( e->ioType );
177  break;
178  default:
179  return StatusCode::FAILURE;
180  }
181  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
182  std::vector<Entry*> to_retire;
183  e->connection->resetAge();
184  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
185  select2nd, [&]( Entry* i ) {
187  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
188  } );
189  if ( !to_retire.empty() ) {
190  MsgStream log( msgSvc(), name() );
191  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
192  IDataConnection* c = j->connection;
193  c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
194  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
195  } );
196  }
197  }
198  }
199  return sc;
200 }
201 
204  auto j = m_fidMap.find( dataset );
205  if ( j == m_fidMap.end() ) return nullptr;
206  auto i = m_connectionMap.find( j->second );
207  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
208 }
209 
211  if ( !con ) return error( "Severe logic bug: No connection object avalible.", true );
212 
213  if ( con->isConnected() ) {
214  con->resetAge();
215  return StatusCode::SUCCESS;
216  }
217  auto i = m_connectionMap.find( con->name() );
218  if ( i != m_connectionMap.end() ) {
219  Connection* c = i->second->connection;
220  if ( c != con ) {
221  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
222  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
223  }
224  if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
225  }
226  return StatusCode::FAILURE;
227 }
228 
229 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
230  Connection* connection ) {
231  MsgStream log( msgSvc(), name() );
232  std::string dsn = dataset;
233  try {
235  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
236  dsn = dataset.substr( 4 ), typ = FID;
237  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
238  dsn = dataset.substr( 4 ), typ = LFN;
239  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
240  dsn = dataset.substr( 4 ), typ = PFN;
241  else if ( typ == UNKNOWN )
242  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
243 
244  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
245  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
247  }
248  if ( typ == FID ) {
249  auto fi = m_connectionMap.find( dsn );
250  if ( fi == m_connectionMap.end() ) {
251  IFileCatalog::Files files;
252  m_catalog->getPFN( dsn, files );
253  if ( files.empty() ) {
254  if ( !m_useGFAL ) {
255  if ( m_quarantine ) s_badFiles.insert( dsn );
256  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
257  error( "connectDataIO> failed to resolve FID:" + dsn, false ).ignore();
259  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
260  std::string gfal_name = "gfal:guid:" + dsn;
261  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
262  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
263  if ( sc.isSuccess() ) return sc;
264  if ( m_quarantine ) s_badFiles.insert( dsn );
265  }
266  if ( m_quarantine ) s_badFiles.insert( dsn );
267  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
268  error( "connectDataIO> Failed to resolve FID:" + dsn, false ).ignore();
270  }
271  // keep track of the current return code before we start iterating over
272  // replicas
273  auto appmgr = serviceLocator()->as<IProperty>();
274  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
275  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
276  std::string pfn = i->first;
277  if ( i != files.cbegin() ) {
278  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
279  << "." << endmsg;
280  }
281  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
282  if ( !sc.isSuccess() ) {
283  if ( m_quarantine ) s_badFiles.insert( pfn );
284  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
285  } else {
286  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
287  // we found a working replica, let's reset the return code to the old value
288  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
289  return sc;
290  }
291  }
292  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
293  << files.size() << " entries." << endmsg;
295  }
296  return StatusCode::FAILURE;
297  }
298  std::string fid;
299  auto j = m_fidMap.find( dsn );
300  if ( j == m_fidMap.end() ) {
301  IFileCatalog::Files files;
302  switch ( typ ) {
303  case LFN:
304  fid = m_catalog->lookupLFN( dsn );
305  if ( fid.empty() ) {
306  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
307  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
309  }
310  break;
311  case PFN:
312  fid = m_catalog->lookupPFN( dsn );
313  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
314  if ( files.empty() ) {
315  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
316  if ( fid.empty() ) fid = m_catalog->createFID();
317  m_catalog->registerPFN( fid, dsn, technology );
318  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
319  } else {
320  fid = dsn;
321  }
322  }
323  break;
324  }
325  } else {
326  fid = j->second;
327  }
328  if ( typ == PFN ) {
329  // Open PFN
330  auto fi = m_connectionMap.find( fid );
331  if ( fi == m_connectionMap.end() ) {
332  connection->setFID( fid );
333  connection->setPFN( dsn );
334  auto e = new Entry( technology, keep_open, rw, connection );
335  // Here we open the file!
336  if ( !reconnect( e ).isSuccess() ) {
337  delete e;
338  if ( m_quarantine ) s_badFiles.insert( dsn );
339  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
340  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
342  }
343  fid = connection->fid();
344  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
345  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
346  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
347  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
348  log << "You may not be able to navigate back to the input file"
349  << " -- processing continues" << endmsg;
350  }
351  }
352  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
353  return StatusCode::SUCCESS;
354  }
355  // Here we open the file!
356  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
357  if ( m_quarantine ) s_badFiles.insert( dsn );
358  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
359  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
361  }
362  return StatusCode::SUCCESS;
363  }
364  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
365  if ( !sc.isSuccess() && m_quarantine ) {
366  s_badFiles.insert( fid );
367  } else if ( typ == LFN ) {
368  m_fidMap[dataset] = fid;
369  }
370  return sc;
371  } catch ( std::exception& e ) {
372  error( std::string( "connectDataIO> Caught exception:" ) + e.what(), false ).ignore();
373  } catch ( ... ) { error( std::string( "connectDataIO> Caught unknown exception" ), false ).ignore(); }
374  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
375  error( "connectDataIO> The dataset " + dsn + " cannot be opened.", false ).ignore();
376  s_badFiles.insert( dsn );
378 }
Gaudi::IODataManager::Entry
Definition: IODataManager.h:42
std::for_each
T for_each(T... args)
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
std::exception
STL class.
Gaudi.Configuration.log
log
Definition: Configuration.py:28
Gaudi::IODataManager::Entry::connection
IDataConnection * connection
Definition: IODataManager.h:45
AppReturnCode.h
std::move
T move(T... args)
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
MSG::INFO
@ INFO
Definition: IMessageSvc.h:25
Gaudi::IODataManager::m_quarantine
Gaudi::Property< bool > m_quarantine
Definition: IODataManager.h:56
Gaudi::IDataConnection
Definition: IIODataManager.h:34
std::pair
Gaudi::IODataManager::connections
Connections connections(const IInterface *owner) const override
Get connection by owner instance (0=ALL)
Definition: IODataManager.cpp:89
Gaudi::IODataManager::connectRead
StatusCode connectRead(bool keep_open, Connection *ioDesc) override
Open data stream in read mode.
Definition: IODataManager.cpp:99
std::vector
STL class.
std::map::find
T find(T... args)
strcasecmp.h
std::string::length
T length(T... args)
Gaudi::IODataManager::m_useGFAL
Gaudi::Property< bool > m_useGFAL
Definition: IODataManager.h:55
std::back_inserter
T back_inserter(T... args)
Gaudi::getAppReturnCode
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:79
Gaudi::IODataManager::establishConnection
StatusCode establishConnection(Connection *con)
Definition: IODataManager.cpp:210
Gaudi::IODataManager::connectWrite
StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN") override
Open data stream in write mode.
Definition: IODataManager.cpp:108
Gaudi::IDataConnection::resetAge
void resetAge()
Reset age.
Definition: IIODataManager.h:71
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
std::map::emplace
T emplace(T... args)
Gaudi::IODataManager::Entry::keepOpen
bool keepOpen
Definition: IODataManager.h:46
IODataManager.h
MSG::WARNING
@ WARNING
Definition: IMessageSvc.h:25
gaudirun.c
c
Definition: gaudirun.py:525
Gaudi::IODataManager::error
StatusCode error(CSTR msg, bool rethrow)
Small routine to issue exceptions.
Definition: IODataManager.cpp:81
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
AvalancheSchedulerErrorTest.msgSvc
msgSvc
Definition: AvalancheSchedulerErrorTest.py:80
Gaudi::IDataConnection::connectRead
virtual StatusCode connectRead()=0
Open data stream in read mode.
IIncidentSvc.h
System::breakExecution
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:57
IProperty
Definition: IProperty.h:33
Gaudi::IODataManager::m_ageLimit
Gaudi::Property< int > m_ageLimit
Definition: IODataManager.h:58
SmartIF.h
Gaudi::IODataManager::m_connectionMap
ConnectionMap m_connectionMap
Map with I/O descriptors.
Definition: IODataManager.h:64
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:332
StatusCode
Definition: StatusCode.h:65
Gaudi::setAppReturnCode
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:59
ProduceConsume.j
j
Definition: ProduceConsume.py:104
Io::UNKNOWN
@ UNKNOWN
Definition: IFileMgr.h:156
std::string::c_str
T c_str(T... args)
Gaudi::IODataManager::connection
Connection * connection(const std::string &dsn) const override
Retrieve known connection.
Definition: IODataManager.cpp:203
Gaudi::IODataManager::read
StatusCode read(Connection *ioDesc, void *const data, size_t len) override
Read raw byte buffer from input stream.
Definition: IODataManager.cpp:115
std::map::erase
T erase(T... args)
Gaudi::IODataManager::initialize
StatusCode initialize() override
IService implementation: initialize the service.
Definition: IODataManager.cpp:52
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
Gaudi::IODataManager::finalize
StatusCode finalize() override
IService implementation: finalize the service.
Definition: IODataManager.cpp:75
Gaudi::IODataManager::m_catalogSvcName
Gaudi::Property< std::string > m_catalogSvcName
Definition: IODataManager.h:53
Gaudi::IODataManager::write
StatusCode write(Connection *con, const void *data, int len) override
Write raw byte buffer to output stream.
Definition: IODataManager.cpp:120
MsgStream
Definition: MsgStream.h:33
Gaudi::IODataManager::m_catalog
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
Definition: IODataManager.h:66
Gaudi::IDataConnection::connectWrite
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
Gaudi
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition: __init__.py:1
Gaudi::IODataManager::disconnect
StatusCode disconnect(Connection *ioDesc) override
Release data stream.
Definition: IODataManager.cpp:129
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
SmartIF::as
SmartIF< IFace > as() const
return a new SmartIF instance to another interface
Definition: SmartIF.h:117
std::string::substr
T substr(T... args)
Gaudi::IODataManager::reconnect
StatusCode reconnect(Entry *e)
Definition: IODataManager.cpp:166
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:100
Gaudi::IODataManager::seek
long long int seek(Connection *ioDesc, long long int where, int origin) override
Seek on the file described by ioDesc. Arguments as in ::seek()
Definition: IODataManager.cpp:125
Gaudi::IODataManager::m_fidMap
FidMap m_fidMap
Map of FID to PFN.
Definition: IODataManager.h:68
std::begin
T begin(T... args)
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
Io::CREATE
@ CREATE
Definition: IFileMgr.h:41
std::set::insert
T insert(T... args)
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:25
IInterface
Definition: IInterface.h:239
Gaudi::IODataManager
Definition: IODataManager.h:39
std::vector::empty
T empty(T... args)
Gaudi::IODataManager::m_disablePFNWarning
Gaudi::Property< bool > m_disablePFNWarning
Definition: IODataManager.h:59
IFileCatalog.h
std::end
T end(T... args)
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
Incident.h
Incident
Definition: Incident.h:27
std::set< std::string >
Gaudi::IDataConnection::BAD_DATA_CONNECTION
@ BAD_DATA_CONNECTION
Definition: IIODataManager.h:51
Gaudi::IODataManager::Entry::ioType
IoType ioType
Definition: IODataManager.h:44
Io::READ
@ READ
Definition: IFileMgr.h:37
Gaudi::IODataManager::m_incSvc
SmartIF< IIncidentSvc > m_incSvc
Definition: IODataManager.h:74
Debugger.h
std::exception::what
T what(T... args)
MsgStream.h
Gaudi::IODataManager::connectDataIO
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
Definition: IODataManager.cpp:229
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:335