Loading [MathJax]/extensions/MathMenu.js
The Gaudi Framework  master (d98a2936)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
IODataManager.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 // Framework include files
12 #include "IODataManager.h"
15 #include <GaudiKernel/Incident.h>
16 #include <GaudiKernel/MsgStream.h>
17 #include <GaudiKernel/SmartIF.h>
19 
20 #include <set>
21 #include <strings.h>
22 
23 namespace {
24 
25  constexpr struct select2nd_t {
26  template <typename S, typename T>
27  const T& operator()( const std::pair<S, T>& p ) const {
28  return p.second;
29  }
30  } select2nd{};
31 
32  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
33  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
34  UnaryPredicate pred ) {
35  while ( first != last ) {
36  auto val = op( *first );
37  if ( pred( val ) ) *result++ = std::move( val );
38  ++first;
39  }
40  return result;
41  }
42 } // namespace
43 
44 using namespace Gaudi;
45 
47 
48 static std::set<std::string> s_badFiles;
49 
52  // Initialize base class
54  MsgStream log( msgSvc(), name() );
55  if ( !status.isSuccess() ) {
56  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
57  return status;
58  }
59  // Retrieve conversion service handling event iteration
61  if ( !m_catalog ) {
62  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
63  return StatusCode::FAILURE;
64  }
65  m_incSvc = serviceLocator()->service( "IncidentSvc" );
66  if ( !m_incSvc ) {
67  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
68  return status;
69  }
70  return status;
71 }
72 
75  m_catalog = nullptr; // release
76  return Service::finalize();
77 }
78 
81  MsgStream log( msgSvc(), name() );
82  log << MSG::ERROR << "Error: " << msg << endmsg;
83  return StatusCode::FAILURE;
84 }
85 
87 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const {
88  Connections conns;
89  transform_copy_if(
90  std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( conns ),
91  []( ConnectionMap::const_reference i ) { return i.second->connection; },
92  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
93  return conns;
94 }
95 
97 StatusCode IODataManager::connectRead( bool keep_open, Connection* con ) {
98  if ( !establishConnection( con ) ) {
99  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
100  }
101  std::string dsn = con ? con->name() : std::string( "Unknown" );
102  return error( "Failed to connect to data:" + dsn );
103 }
104 
106 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype ) {
107  if ( !establishConnection( con ) ) { return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con ); }
108  std::string dsn = con ? con->name() : std::string( "Unknown" );
109  return error( "Failed to connect to data:" + dsn );
110 }
111 
113 StatusCode IODataManager::read( Connection* con, void* const data, size_t len ) {
114  return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
115 }
116 
118 StatusCode IODataManager::write( Connection* con, const void* data, int len ) {
119  return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
120 }
121 
123 long long int IODataManager::seek( Connection* con, long long int where, int origin ) {
124  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
125 }
126 
128  if ( con ) {
129  std::string dataset = con->name();
130  std::string dsn = dataset;
131  StatusCode sc = con->disconnect();
132  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
133  dsn = dataset.substr( 4 );
134  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
135  dsn = dataset.substr( 4 );
136  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
137  dsn = dataset.substr( 4 );
138 
139  auto j = m_fidMap.find( dataset );
140  if ( j != m_fidMap.end() ) {
141  std::string fid = j->second;
142  std::string gfal_name = "gfal:guid:" + fid;
143  auto i = m_connectionMap.find( fid );
144  m_fidMap.erase( j );
145  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
146  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
147  if ( i != m_connectionMap.end() && i->second ) {
148  IDataConnection* c = i->second->connection;
149  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
150  if ( c->isConnected() ) {
151  MsgStream log( msgSvc(), name() );
152  c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
153  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
154  }
155  delete i->second;
156  m_connectionMap.erase( i );
157  }
158  }
159  return sc;
160  }
161  return StatusCode::FAILURE;
162 }
163 
166  if ( e && e->connection ) {
167  switch ( e->ioType ) {
168  case Connection::READ:
169  sc = e->connection->connectRead();
170  break;
171  case Connection::UPDATE:
172  case Connection::CREATE:
173  case Connection::RECREATE:
174  sc = e->connection->connectWrite( e->ioType );
175  break;
176  default:
177  return StatusCode::FAILURE;
178  }
179  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
180  std::vector<Entry*> to_retire;
181  e->connection->resetAge();
182  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
183  select2nd, [&]( Entry* i ) {
185  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
186  } );
187  if ( !to_retire.empty() ) {
188  MsgStream log( msgSvc(), name() );
189  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
190  IDataConnection* c = j->connection;
191  c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
192  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
193  } );
194  }
195  }
196  }
197  return sc;
198 }
199 
202  auto j = m_fidMap.find( dataset );
203  if ( j == m_fidMap.end() ) return nullptr;
204  auto i = m_connectionMap.find( j->second );
205  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
206 }
207 
209  if ( !con ) return error( "Severe logic bug: No connection object avalible." );
210 
211  if ( con->isConnected() ) {
212  con->resetAge();
213  return StatusCode::SUCCESS;
214  }
215  auto i = m_connectionMap.find( con->name() );
216  if ( i != m_connectionMap.end() ) {
217  Connection* c = i->second->connection;
218  if ( c != con ) {
219  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
220  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name() );
221  }
222  if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
223  }
224  return StatusCode::FAILURE;
225 }
226 
227 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
228  Connection* connection ) {
229  MsgStream log( msgSvc(), name() );
230  std::string dsn = dataset;
231  try {
233  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
234  dsn = dataset.substr( 4 ), typ = FID;
235  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
236  dsn = dataset.substr( 4 ), typ = LFN;
237  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
238  dsn = dataset.substr( 4 ), typ = PFN;
239  else if ( typ == UNKNOWN )
240  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
241 
242  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
243  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
245  }
246  if ( typ == FID ) {
247  auto fi = m_connectionMap.find( dsn );
248  if ( fi == m_connectionMap.end() ) {
249  IFileCatalog::Files files;
250  m_catalog->getPFN( dsn, files );
251  if ( files.empty() ) {
252  if ( !m_useGFAL ) {
253  if ( m_quarantine ) s_badFiles.insert( dsn );
254  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
255  error( "connectDataIO> failed to resolve FID:" + dsn ).ignore();
257  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
258  std::string gfal_name = "gfal:guid:" + dsn;
259  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
260  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
261  if ( sc.isSuccess() ) return sc;
262  if ( m_quarantine ) s_badFiles.insert( dsn );
263  }
264  if ( m_quarantine ) s_badFiles.insert( dsn );
265  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
266  error( "connectDataIO> Failed to resolve FID:" + dsn ).ignore();
268  }
269  // keep track of the current return code before we start iterating over
270  // replicas
271  auto appmgr = serviceLocator()->as<IProperty>();
272  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
273  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
274  std::string pfn = i->first;
275  if ( i != files.cbegin() ) {
276  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
277  << "." << endmsg;
278  }
279  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
280  if ( !sc.isSuccess() ) {
281  if ( m_quarantine ) s_badFiles.insert( pfn );
282  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
283  } else {
284  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
285  // we found a working replica, let's reset the return code to the old value
286  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
287  return sc;
288  }
289  }
290  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
291  << files.size() << " entries." << endmsg;
293  }
294  return StatusCode::FAILURE;
295  }
296  std::string fid;
297  auto j = m_fidMap.find( dsn );
298  if ( j == m_fidMap.end() ) {
299  IFileCatalog::Files files;
300  switch ( typ ) {
301  case LFN:
302  fid = m_catalog->lookupLFN( dsn );
303  if ( fid.empty() ) {
304  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
305  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
307  }
308  break;
309  case PFN:
310  fid = m_catalog->lookupPFN( dsn );
311  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
312  if ( files.empty() ) {
313  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
314  if ( fid.empty() ) fid = m_catalog->createFID();
315  m_catalog->registerPFN( fid, dsn, technology );
316  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
317  } else {
318  fid = dsn;
319  }
320  }
321  break;
322  }
323  } else {
324  fid = j->second;
325  }
326  if ( typ == PFN ) {
327  // Open PFN
328  auto fi = m_connectionMap.find( fid );
329  if ( fi == m_connectionMap.end() ) {
330  connection->setFID( fid );
331  connection->setPFN( dsn );
332  auto e = new Entry( technology, keep_open, rw, connection );
333  // Here we open the file!
334  if ( !reconnect( e ).isSuccess() ) {
335  delete e;
336  if ( m_quarantine ) s_badFiles.insert( dsn );
337  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
338  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid ).ignore();
340  }
341  fid = connection->fid();
342  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
343  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
344  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
345  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
346  log << "You may not be able to navigate back to the input file"
347  << " -- processing continues" << endmsg;
348  }
349  }
350  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
351  return StatusCode::SUCCESS;
352  }
353  // Here we open the file!
354  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
355  if ( m_quarantine ) s_badFiles.insert( dsn );
356  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
357  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid ).ignore();
359  }
360  return StatusCode::SUCCESS;
361  }
362  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
363  if ( !sc.isSuccess() && m_quarantine ) {
364  s_badFiles.insert( fid );
365  } else if ( typ == LFN ) {
366  m_fidMap[dataset] = fid;
367  }
368  return sc;
369  } catch ( std::exception& e ) {
370  error( std::string( "connectDataIO> Caught exception:" ) + e.what() ).ignore();
371  } catch ( ... ) { error( std::string( "connectDataIO> Caught unknown exception" ) ).ignore(); }
372  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
373  error( "connectDataIO> The dataset " + dsn + " cannot be opened." ).ignore();
374  s_badFiles.insert( dsn );
376 }
Gaudi::IODataManager::Entry
Definition: IODataManager.h:41
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
Gaudi.Configuration.log
log
Definition: Configuration.py:28
Gaudi::IODataManager::Entry::connection
IDataConnection * connection
Definition: IODataManager.h:44
AppReturnCode.h
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
MSG::INFO
@ INFO
Definition: IMessageSvc.h:22
Gaudi::IODataManager::m_quarantine
Gaudi::Property< bool > m_quarantine
Definition: IODataManager.h:55
Gaudi::IDataConnection
Definition: IIODataManager.h:33
Gaudi::IODataManager::connections
Connections connections(const IInterface *owner) const override
Get connection by owner instance (0=ALL)
Definition: IODataManager.cpp:87
Gaudi::IODataManager::connectRead
StatusCode connectRead(bool keep_open, Connection *ioDesc) override
Open data stream in read mode.
Definition: IODataManager.cpp:97
Gaudi::IODataManager::CSTR
const std::string & CSTR
Definition: IODataManager.h:40
Gaudi::IODataManager::m_useGFAL
Gaudi::Property< bool > m_useGFAL
Definition: IODataManager.h:54
Gaudi::getAppReturnCode
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:78
Gaudi::IODataManager::establishConnection
StatusCode establishConnection(Connection *con)
Definition: IODataManager.cpp:208
Gaudi::IODataManager::connectWrite
StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN") override
Open data stream in write mode.
Definition: IODataManager.cpp:106
Gaudi::IDataConnection::resetAge
void resetAge()
Reset age.
Definition: IIODataManager.h:70
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
Gaudi::IODataManager::Entry::keepOpen
bool keepOpen
Definition: IODataManager.h:45
IODataManager.h
MSG::WARNING
@ WARNING
Definition: IMessageSvc.h:22
gaudirun.c
c
Definition: gaudirun.py:525
Gaudi::IODataManager::error
StatusCode error(CSTR msg)
Small routine to issue exceptions.
Definition: IODataManager.cpp:80
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:223
AvalancheSchedulerErrorTest.msgSvc
msgSvc
Definition: AvalancheSchedulerErrorTest.py:80
Gaudi::IDataConnection::connectRead
virtual StatusCode connectRead()=0
Open data stream in read mode.
IIncidentSvc.h
IProperty
Definition: IProperty.h:32
Gaudi::IODataManager::m_ageLimit
Gaudi::Property< int > m_ageLimit
Definition: IODataManager.h:57
SmartIF.h
Gaudi::IODataManager::m_connectionMap
ConnectionMap m_connectionMap
Map with I/O descriptors.
Definition: IODataManager.h:63
Gaudi::Utils::begin
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Definition: AttribStringParser.h:135
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:333
StatusCode
Definition: StatusCode.h:64
Gaudi::cxx::for_each
void for_each(ContainerOfSynced &c, Fun &&f)
Definition: SynchronizedValue.h:98
Gaudi::setAppReturnCode
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:58
ProduceConsume.j
j
Definition: ProduceConsume.py:104
Gaudi::IFileCatalog::Files
std::vector< NamedItem > Files
Definition: IFileCatalog.h:46
Gaudi::IODataManager::connection
Connection * connection(const std::string &dsn) const override
Retrieve known connection.
Definition: IODataManager.cpp:201
Gaudi::IODataManager::read
StatusCode read(Connection *ioDesc, void *const data, size_t len) override
Read raw byte buffer from input stream.
Definition: IODataManager.cpp:113
Gaudi::IODataManager::initialize
StatusCode initialize() override
IService implementation: initialize the service.
Definition: IODataManager.cpp:51
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:198
Gaudi::IODataManager::finalize
StatusCode finalize() override
IService implementation: finalize the service.
Definition: IODataManager.cpp:74
Gaudi::IODataManager::m_catalogSvcName
Gaudi::Property< std::string > m_catalogSvcName
Definition: IODataManager.h:52
Gaudi::IODataManager::write
StatusCode write(Connection *con, const void *data, int len) override
Write raw byte buffer to output stream.
Definition: IODataManager.cpp:118
MsgStream
Definition: MsgStream.h:29
Gaudi::IODataManager::m_catalog
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
Definition: IODataManager.h:65
Gaudi::IDataConnection::connectWrite
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
Gaudi
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition: __init__.py:1
Gaudi::IODataManager::disconnect
StatusCode disconnect(Connection *ioDesc) override
Release data stream.
Definition: IODataManager.cpp:127
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
SmartIF::as
SmartIF< IFace > as() const
return a new SmartIF instance to another interface
Definition: SmartIF.h:110
Gaudi::IODataManager::reconnect
StatusCode reconnect(Entry *e)
Definition: IODataManager.cpp:164
StatusCode::SUCCESS
constexpr static const auto SUCCESS
Definition: StatusCode.h:99
Gaudi::IODataManager::seek
long long int seek(Connection *ioDesc, long long int where, int origin) override
Seek on the file described by ioDesc. Arguments as in ::seek()
Definition: IODataManager.cpp:123
Gaudi::IODataManager::m_fidMap
FidMap m_fidMap
Map of FID to PFN.
Definition: IODataManager.h:67
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:45
Io::CREATE
@ CREATE
Definition: IFileMgr.h:34
MSG::ERROR
@ ERROR
Definition: IMessageSvc.h:22
IInterface
Definition: IInterface.h:225
Gaudi::IODataManager
Definition: IODataManager.h:38
Gaudi::IODataManager::m_disablePFNWarning
Gaudi::Property< bool > m_disablePFNWarning
Definition: IODataManager.h:58
IFileCatalog.h
IOTest.end
end
Definition: IOTest.py:125
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:100
Incident.h
Incident
Definition: Incident.h:24
Gaudi::IDataConnection::BAD_DATA_CONNECTION
@ BAD_DATA_CONNECTION
Definition: IIODataManager.h:50
Gaudi::IODataManager::Entry::ioType
IoType ioType
Definition: IODataManager.h:43
Io::READ
@ READ
Definition: IFileMgr.h:30
Gaudi::IODataManager::m_incSvc
SmartIF< IIncidentSvc > m_incSvc
Definition: IODataManager.h:73
MsgStream.h
Gaudi::IODataManager::connectDataIO
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
Definition: IODataManager.cpp:227
Service::serviceLocator
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator
Definition: Service.cpp:336