The Gaudi Framework  v29r0 (ff2e7097)
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "IODataManager.h"
4 #include "GaudiKernel/Debugger.h"
6 #include "GaudiKernel/Incident.h"
8 #include "GaudiKernel/SmartIF.h"
11 
12 #include <set>
13 
14 namespace
15 {
16 
17  constexpr struct select2nd_t {
18  template <typename S, typename T>
19  const T& operator()( const std::pair<S, T>& p ) const
20  {
21  return p.second;
22  }
23  } select2nd{};
24 
25  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
26  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
27  UnaryPredicate pred )
28  {
29  while ( first != last ) {
30  auto val = op( *first );
31  if ( pred( val ) ) *result++ = std::move( val );
32  ++first;
33  }
34  return result;
35  }
36 }
37 
38 using namespace Gaudi;
39 
41 
43 
44 static std::set<std::string> s_badFiles;
45 
47 StatusCode IODataManager::initialize()
48 {
49  // Initialize base class
51  MsgStream log( msgSvc(), name() );
52  if ( !status.isSuccess() ) {
53  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
54  return status;
55  }
56  // Retrieve conversion service handling event iteration
57  m_catalog = serviceLocator()->service( m_catalogSvcName );
58  if ( !m_catalog ) {
59  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
60  return StatusCode::FAILURE;
61  }
62  m_incSvc = serviceLocator()->service( "IncidentSvc" );
63  if ( !m_incSvc ) {
64  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
65  return status;
66  }
67  return status;
68 }
69 
71 StatusCode IODataManager::finalize()
72 {
73  m_catalog = nullptr; // release
74  return Service::finalize();
75 }
76 
78 StatusCode IODataManager::error( CSTR msg, bool rethrow )
79 {
80  MsgStream log( msgSvc(), name() );
81  log << MSG::ERROR << "Error: " << msg << endmsg;
82  if ( rethrow ) System::breakExecution();
83  return S_ERROR;
84 }
85 
87 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const
88 {
89  Connections conns;
90  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( conns ),
91  []( ConnectionMap::const_reference i ) { return i.second->connection; },
92  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
93  return conns;
94 }
95 
97 StatusCode IODataManager::connectRead( bool keep_open, Connection* con )
98 {
99  if ( !establishConnection( con ) ) {
100  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
101  }
102  std::string dsn = con ? con->name() : std::string( "Unknown" );
103  return error( "Failed to connect to data:" + dsn, false );
104 }
105 
107 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype )
108 {
109  if ( !establishConnection( con ) ) {
110  return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con );
111  }
112  std::string dsn = con ? con->name() : std::string( "Unknown" );
113  return error( "Failed to connect to data:" + dsn, false );
114 }
115 
117 StatusCode IODataManager::read( Connection* con, void* const data, size_t len )
118 {
119  return establishConnection( con ).isSuccess() ? con->read( data, len ) : S_ERROR;
120 }
121 
123 StatusCode IODataManager::write( Connection* con, const void* data, int len )
124 {
125  return establishConnection( con ).isSuccess() ? con->write( data, len ) : S_ERROR;
126 }
127 
129 long long int IODataManager::seek( Connection* con, long long int where, int origin )
130 {
131  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
132 }
133 
134 StatusCode IODataManager::disconnect( Connection* con )
135 {
136  if ( con ) {
137  std::string dataset = con->name();
138  std::string dsn = dataset;
139  StatusCode sc = con->disconnect();
140  if (::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
141  dsn = dataset.substr( 4 );
142  else if (::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
143  dsn = dataset.substr( 4 );
144  else if (::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
145  dsn = dataset.substr( 4 );
146 
147  auto j = m_fidMap.find( dataset );
148  if ( j != m_fidMap.end() ) {
149  std::string fid = j->second;
150  std::string gfal_name = "gfal:guid:" + fid;
151  auto i = m_connectionMap.find( fid );
152  m_fidMap.erase( j );
153  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
154  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
155  if ( i != m_connectionMap.end() && i->second ) {
156  IDataConnection* c = i->second->connection;
157  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
158  if ( c->isConnected() ) {
159  MsgStream log( msgSvc(), name() );
160  c->disconnect();
161  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
162  }
163  delete i->second;
164  m_connectionMap.erase( i );
165  }
166  }
167  return sc;
168  }
169  return S_ERROR;
170 }
171 
172 StatusCode IODataManager::reconnect( Entry* e )
173 {
174  StatusCode sc = S_ERROR;
175  if ( e && e->connection ) {
176  switch ( e->ioType ) {
177  case Connection::READ:
178  sc = e->connection->connectRead();
179  break;
180  case Connection::UPDATE:
181  case Connection::CREATE:
182  case Connection::RECREATE:
183  sc = e->connection->connectWrite( e->ioType );
184  break;
185  default:
186  return S_ERROR;
187  }
188  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
189  std::vector<Entry*> to_retire;
190  e->connection->resetAge();
191  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
192  select2nd, [&]( Entry* i ) {
194  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
195  } );
196  if ( !to_retire.empty() ) {
197  MsgStream log( msgSvc(), name() );
198  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
200  c->disconnect();
201  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
202  } );
203  }
204  }
205  }
206  return sc;
207 }
208 
210 IIODataManager::Connection* IODataManager::connection( CSTR dataset ) const
211 {
212  auto j = m_fidMap.find( dataset );
213  if ( j == m_fidMap.end() ) return nullptr;
214  auto i = m_connectionMap.find( j->second );
215  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
216 }
217 
218 StatusCode IODataManager::establishConnection( Connection* con )
219 {
220  if ( !con ) return error( "Severe logic bug: No connection object avalible.", true );
221 
222  if ( con->isConnected() ) {
223  con->resetAge();
224  return S_OK;
225  }
226  auto i = m_connectionMap.find( con->name() );
227  if ( i != m_connectionMap.end() ) {
228  Connection* c = i->second->connection;
229  if ( c != con ) {
230  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
231  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
232  }
233  if ( reconnect( i->second ).isSuccess() ) return S_OK;
234  }
235  return S_ERROR;
236 }
237 
238 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
239  Connection* connection )
240 {
241  MsgStream log( msgSvc(), name() );
242  std::string dsn = dataset;
243  try {
244  StatusCode sc( StatusCode::SUCCESS, true );
245  if (::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
246  dsn = dataset.substr( 4 ), typ = FID;
247  else if (::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
248  dsn = dataset.substr( 4 ), typ = LFN;
249  else if (::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
250  dsn = dataset.substr( 4 ), typ = PFN;
251  else if ( typ == UNKNOWN )
252  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
253 
254  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
255  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
256  return IDataConnection::BAD_DATA_CONNECTION;
257  }
258  if ( typ == FID ) {
259  auto fi = m_connectionMap.find( dsn );
260  if ( fi == m_connectionMap.end() ) {
261  IFileCatalog::Files files;
262  m_catalog->getPFN( dsn, files );
263  if ( files.empty() ) {
264  if ( !m_useGFAL ) {
265  if ( m_quarantine ) s_badFiles.insert( dsn );
266  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
267  error( "connectDataIO> failed to resolve FID:" + dsn, false ).ignore();
268  return IDataConnection::BAD_DATA_CONNECTION;
269  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
270  std::string gfal_name = "gfal:guid:" + dsn;
271  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
272  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
273  if ( sc.isSuccess() ) return sc;
274  if ( m_quarantine ) s_badFiles.insert( dsn );
275  }
276  if ( m_quarantine ) s_badFiles.insert( dsn );
277  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
278  error( "connectDataIO> Failed to resolve FID:" + dsn, false ).ignore();
279  return IDataConnection::BAD_DATA_CONNECTION;
280  }
281  // keep track of the current return code before we start iterating over
282  // replicas
283  auto appmgr = serviceLocator()->as<IProperty>();
284  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
285  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
286  std::string pfn = i->first;
287  if ( i != files.cbegin() ) {
288  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
289  << "." << endmsg;
290  }
291  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
292  if ( !sc.isSuccess() ) {
293  if ( m_quarantine ) s_badFiles.insert( pfn );
294  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
295  } else {
296  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
297  // we found a working replica, let's reset the return code to the old value
298  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
299  return sc;
300  }
301  }
302  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
303  << files.size() << " entries." << endmsg;
304  return IDataConnection::BAD_DATA_CONNECTION;
305  }
306  return S_ERROR;
307  }
308  std::string fid;
309  auto j = m_fidMap.find( dsn );
310  if ( j == m_fidMap.end() ) {
311  IFileCatalog::Files files;
312  switch ( typ ) {
313  case LFN:
314  fid = m_catalog->lookupLFN( dsn );
315  if ( fid.empty() ) {
316  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
317  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
318  return IDataConnection::BAD_DATA_CONNECTION;
319  }
320  break;
321  case PFN:
322  fid = m_catalog->lookupPFN( dsn );
323  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
324  if ( files.empty() ) {
325  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
326  if ( fid.empty() ) fid = m_catalog->createFID();
327  m_catalog->registerPFN( fid, dsn, technology );
328  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
329  } else {
330  fid = dsn;
331  }
332  }
333  break;
334  }
335  } else {
336  fid = j->second;
337  }
338  if ( typ == PFN ) {
339  // Open PFN
340  auto fi = m_connectionMap.find( fid );
341  if ( fi == m_connectionMap.end() ) {
342  connection->setFID( fid );
343  connection->setPFN( dsn );
344  auto e = new Entry( technology, keep_open, rw, connection );
345  // Here we open the file!
346  if ( !reconnect( e ).isSuccess() ) {
347  delete e;
348  if ( m_quarantine ) s_badFiles.insert( dsn );
349  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
350  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
351  return IDataConnection::BAD_DATA_CONNECTION;
352  }
353  fid = connection->fid();
354  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
355  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
356  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
357  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
358  log << "You may not be able to navigate back to the input file"
359  << " -- processing continues" << endmsg;
360  }
361  }
362  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
363  return S_OK;
364  }
365  // Here we open the file!
366  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
367  if ( m_quarantine ) s_badFiles.insert( dsn );
368  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
369  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
370  return IDataConnection::BAD_DATA_CONNECTION;
371  }
372  return S_OK;
373  }
374  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
375  if ( !sc.isSuccess() && m_quarantine ) {
376  s_badFiles.insert( fid );
377  } else if ( typ == LFN ) {
378  m_fidMap[dataset] = fid;
379  }
380  return sc;
381  } catch ( std::exception& e ) {
382  error( std::string( "connectDataIO> Caught exception:" ) + e.what(), false ).ignore();
383  } catch ( ... ) {
384  error( std::string( "connectDataIO> Caught unknown exception" ), false ).ignore();
385  }
386  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
387  error( "connectDataIO> The dataset " + dsn + " cannot be opened.", false ).ignore();
388  s_badFiles.insert( dsn );
389  return IDataConnection::BAD_DATA_CONNECTION;
390 }
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:72
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:64
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
T empty(T...args)
const std::string & fid() const
Access file id.
StatusCode finalize() override
Definition: Service.cpp:174
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
def read(f, regex='.*', skipevents=0)
Definition: hivetimeline.py:22
T end(T...args)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:48
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
Definition: PluginService.h:33
STL class.
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
T what(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
Definition of the basic interface.
Definition: IInterface.h:277
Definition: IODataManager.h:34
STL class.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:51
T move(T...args)
T insert(T...args)
T find(T...args)
T length(T...args)
STL class.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T begin(T...args)
T back_inserter(T...args)
IDataConnection * connection
Definition: IODataManager.h:37
void resetAge()
Reset age.
T c_str(T...args)
Base class for all Incidents (computing events).
Definition: Incident.h:17
T substr(T...args)
void ignore() const
Definition: StatusCode.h:109
bool keepOpen
Definition: IODataManager.h:38
The IProperty is the basic interface for all components which have properties that can be set or get...
Definition: IProperty.h:20
T for_each(T...args)
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
IoType ioType
Definition: IODataManager.h:36