Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "IODataManager.h"
4 #include "GaudiKernel/Debugger.h"
6 #include "GaudiKernel/Incident.h"
8 #include "GaudiKernel/SmartIF.h"
11 
12 #include <set>
13 
14 namespace {
15 
16  constexpr struct select2nd_t {
17  template <typename S, typename T>
18  const T& operator()( const std::pair<S, T>& p ) const {
19  return p.second;
20  }
21  } select2nd{};
22 
23  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
24  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
25  UnaryPredicate pred ) {
26  while ( first != last ) {
27  auto val = op( *first );
28  if ( pred( val ) ) *result++ = std::move( val );
29  ++first;
30  }
31  return result;
32  }
33 } // namespace
34 
35 using namespace Gaudi;
36 
38 
39 static std::set<std::string> s_badFiles;
40 
42 StatusCode IODataManager::initialize() {
43  // Initialize base class
45  MsgStream log( msgSvc(), name() );
46  if ( !status.isSuccess() ) {
47  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
48  return status;
49  }
50  // Retrieve conversion service handling event iteration
51  m_catalog = serviceLocator()->service( m_catalogSvcName );
52  if ( !m_catalog ) {
53  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
54  return StatusCode::FAILURE;
55  }
56  m_incSvc = serviceLocator()->service( "IncidentSvc" );
57  if ( !m_incSvc ) {
58  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
59  return status;
60  }
61  return status;
62 }
63 
65 StatusCode IODataManager::finalize() {
66  m_catalog = nullptr; // release
67  return Service::finalize();
68 }
69 
71 StatusCode IODataManager::error( CSTR msg, bool rethrow ) {
72  MsgStream log( msgSvc(), name() );
73  log << MSG::ERROR << "Error: " << msg << endmsg;
74  if ( rethrow ) System::breakExecution();
75  return StatusCode::FAILURE;
76 }
77 
79 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const {
80  Connections conns;
81  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( conns ),
82  []( ConnectionMap::const_reference i ) { return i.second->connection; },
83  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
84  return conns;
85 }
86 
88 StatusCode IODataManager::connectRead( bool keep_open, Connection* con ) {
89  if ( !establishConnection( con ) ) {
90  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
91  }
92  std::string dsn = con ? con->name() : std::string( "Unknown" );
93  return error( "Failed to connect to data:" + dsn, false );
94 }
95 
97 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype ) {
98  if ( !establishConnection( con ) ) { return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con ); }
99  std::string dsn = con ? con->name() : std::string( "Unknown" );
100  return error( "Failed to connect to data:" + dsn, false );
101 }
102 
104 StatusCode IODataManager::read( Connection* con, void* const data, size_t len ) {
105  return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
106 }
107 
109 StatusCode IODataManager::write( Connection* con, const void* data, int len ) {
110  return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
111 }
112 
114 long long int IODataManager::seek( Connection* con, long long int where, int origin ) {
115  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
116 }
117 
118 StatusCode IODataManager::disconnect( Connection* con ) {
119  if ( con ) {
120  std::string dataset = con->name();
121  std::string dsn = dataset;
122  StatusCode sc = con->disconnect();
123  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
124  dsn = dataset.substr( 4 );
125  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
126  dsn = dataset.substr( 4 );
127  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
128  dsn = dataset.substr( 4 );
129 
130  auto j = m_fidMap.find( dataset );
131  if ( j != m_fidMap.end() ) {
132  std::string fid = j->second;
133  std::string gfal_name = "gfal:guid:" + fid;
134  auto i = m_connectionMap.find( fid );
135  m_fidMap.erase( j );
136  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
137  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
138  if ( i != m_connectionMap.end() && i->second ) {
139  IDataConnection* c = i->second->connection;
140  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
141  if ( c->isConnected() ) {
142  MsgStream log( msgSvc(), name() );
143  c->disconnect();
144  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
145  }
146  delete i->second;
147  m_connectionMap.erase( i );
148  }
149  }
150  return sc;
151  }
152  return StatusCode::FAILURE;
153 }
154 
155 StatusCode IODataManager::reconnect( Entry* e ) {
157  if ( e && e->connection ) {
158  switch ( e->ioType ) {
159  case Connection::READ:
160  sc = e->connection->connectRead();
161  break;
162  case Connection::UPDATE:
163  case Connection::CREATE:
164  case Connection::RECREATE:
165  sc = e->connection->connectWrite( e->ioType );
166  break;
167  default:
168  return StatusCode::FAILURE;
169  }
170  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
171  std::vector<Entry*> to_retire;
172  e->connection->resetAge();
173  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
174  select2nd, [&]( Entry* i ) {
176  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
177  } );
178  if ( !to_retire.empty() ) {
179  MsgStream log( msgSvc(), name() );
180  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
182  c->disconnect();
183  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
184  } );
185  }
186  }
187  }
188  return sc;
189 }
190 
192 IIODataManager::Connection* IODataManager::connection( CSTR dataset ) const {
193  auto j = m_fidMap.find( dataset );
194  if ( j == m_fidMap.end() ) return nullptr;
195  auto i = m_connectionMap.find( j->second );
196  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
197 }
198 
199 StatusCode IODataManager::establishConnection( Connection* con ) {
200  if ( !con ) return error( "Severe logic bug: No connection object avalible.", true );
201 
202  if ( con->isConnected() ) {
203  con->resetAge();
204  return StatusCode::SUCCESS;
205  }
206  auto i = m_connectionMap.find( con->name() );
207  if ( i != m_connectionMap.end() ) {
208  Connection* c = i->second->connection;
209  if ( c != con ) {
210  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
211  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
212  }
213  if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
214  }
215  return StatusCode::FAILURE;
216 }
217 
218 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
219  Connection* connection ) {
220  MsgStream log( msgSvc(), name() );
221  std::string dsn = dataset;
222  try {
223  StatusCode sc( StatusCode::SUCCESS, true );
224  if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
225  dsn = dataset.substr( 4 ), typ = FID;
226  else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
227  dsn = dataset.substr( 4 ), typ = LFN;
228  else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
229  dsn = dataset.substr( 4 ), typ = PFN;
230  else if ( typ == UNKNOWN )
231  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
232 
233  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
234  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
235  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
236  }
237  if ( typ == FID ) {
238  auto fi = m_connectionMap.find( dsn );
239  if ( fi == m_connectionMap.end() ) {
240  IFileCatalog::Files files;
241  m_catalog->getPFN( dsn, files );
242  if ( files.empty() ) {
243  if ( !m_useGFAL ) {
244  if ( m_quarantine ) s_badFiles.insert( dsn );
245  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
246  error( "connectDataIO> failed to resolve FID:" + dsn, false ).ignore();
247  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
248  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
249  std::string gfal_name = "gfal:guid:" + dsn;
250  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
251  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
252  if ( sc.isSuccess() ) return sc;
253  if ( m_quarantine ) s_badFiles.insert( dsn );
254  }
255  if ( m_quarantine ) s_badFiles.insert( dsn );
256  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
257  error( "connectDataIO> Failed to resolve FID:" + dsn, false ).ignore();
258  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
259  }
260  // keep track of the current return code before we start iterating over
261  // replicas
262  auto appmgr = serviceLocator()->as<IProperty>();
263  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
264  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
265  std::string pfn = i->first;
266  if ( i != files.cbegin() ) {
267  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
268  << "." << endmsg;
269  }
270  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
271  if ( !sc.isSuccess() ) {
272  if ( m_quarantine ) s_badFiles.insert( pfn );
273  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
274  } else {
275  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
276  // we found a working replica, let's reset the return code to the old value
277  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
278  return sc;
279  }
280  }
281  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
282  << files.size() << " entries." << endmsg;
283  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
284  }
285  return StatusCode::FAILURE;
286  }
287  std::string fid;
288  auto j = m_fidMap.find( dsn );
289  if ( j == m_fidMap.end() ) {
290  IFileCatalog::Files files;
291  switch ( typ ) {
292  case LFN:
293  fid = m_catalog->lookupLFN( dsn );
294  if ( fid.empty() ) {
295  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
296  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
297  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
298  }
299  break;
300  case PFN:
301  fid = m_catalog->lookupPFN( dsn );
302  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
303  if ( files.empty() ) {
304  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
305  if ( fid.empty() ) fid = m_catalog->createFID();
306  m_catalog->registerPFN( fid, dsn, technology );
307  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
308  } else {
309  fid = dsn;
310  }
311  }
312  break;
313  }
314  } else {
315  fid = j->second;
316  }
317  if ( typ == PFN ) {
318  // Open PFN
319  auto fi = m_connectionMap.find( fid );
320  if ( fi == m_connectionMap.end() ) {
321  connection->setFID( fid );
322  connection->setPFN( dsn );
323  auto e = new Entry( technology, keep_open, rw, connection );
324  // Here we open the file!
325  if ( !reconnect( e ).isSuccess() ) {
326  delete e;
327  if ( m_quarantine ) s_badFiles.insert( dsn );
328  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
329  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
330  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
331  }
332  fid = connection->fid();
333  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
334  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
335  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
336  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
337  log << "You may not be able to navigate back to the input file"
338  << " -- processing continues" << endmsg;
339  }
340  }
341  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
342  return StatusCode::SUCCESS;
343  }
344  // Here we open the file!
345  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
346  if ( m_quarantine ) s_badFiles.insert( dsn );
347  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
348  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
349  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
350  }
351  return StatusCode::SUCCESS;
352  }
353  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
354  if ( !sc.isSuccess() && m_quarantine ) {
355  s_badFiles.insert( fid );
356  } else if ( typ == LFN ) {
357  m_fidMap[dataset] = fid;
358  }
359  return sc;
360  } catch ( std::exception& e ) {
361  error( std::string( "connectDataIO> Caught exception:" ) + e.what(), false ).ignore();
362  } catch ( ... ) { error( std::string( "connectDataIO> Caught unknown exception" ), false ).ignore(); }
363  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
364  error( "connectDataIO> The dataset " + dsn + " cannot be opened.", false ).ignore();
365  s_badFiles.insert( dsn );
366  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
367 }
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:69
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:60
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
T empty(T...args)
const std::string & fid() const
Access file id.
StatusCode finalize() override
Definition: Service.cpp:164
bool isSuccess() const
Definition: StatusCode.h:267
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
STL namespace.
def read(f, regex='.*', skipevents=0)
Definition: hivetimeline.py:22
T end(T...args)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:47
const std::string & pfn() const
Access physical file name.
STL class.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
T what(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
Definition of the basic interface.
Definition: IInterface.h:244
Definition: IODataManager.h:32
STL class.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:49
T move(T...args)
T insert(T...args)
T find(T...args)
T length(T...args)
STL class.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T begin(T...args)
T back_inserter(T...args)
IDataConnection * connection
Definition: IODataManager.h:35
void resetAge()
Reset age.
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:153
T c_str(T...args)
Base class for all Incidents (computing events).
Definition: Incident.h:17
constexpr static const auto FAILURE
Definition: StatusCode.h:86
T substr(T...args)
bool keepOpen
Definition: IODataManager.h:36
The IProperty is the basic interface for all components which have properties that can be set or get...
Definition: IProperty.h:20
T for_each(T...args)
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
IoType ioType
Definition: IODataManager.h:34