The Gaudi Framework  v30r3 (a5ef0a68)
IODataManager.cpp
Go to the documentation of this file.
1 // Framework include files
2 #include "IODataManager.h"
4 #include "GaudiKernel/Debugger.h"
6 #include "GaudiKernel/Incident.h"
8 #include "GaudiKernel/SmartIF.h"
11 
12 #include <set>
13 
14 namespace
15 {
16 
17  constexpr struct select2nd_t {
18  template <typename S, typename T>
19  const T& operator()( const std::pair<S, T>& p ) const
20  {
21  return p.second;
22  }
23  } select2nd{};
24 
25  template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
26  OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
27  UnaryPredicate pred )
28  {
29  while ( first != last ) {
30  auto val = op( *first );
31  if ( pred( val ) ) *result++ = std::move( val );
32  ++first;
33  }
34  return result;
35  }
36 }
37 
38 using namespace Gaudi;
39 
41 
42 static std::set<std::string> s_badFiles;
43 
46 {
47  // Initialize base class
49  MsgStream log( msgSvc(), name() );
50  if ( !status.isSuccess() ) {
51  log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
52  return status;
53  }
54  // Retrieve conversion service handling event iteration
55  m_catalog = serviceLocator()->service( m_catalogSvcName );
56  if ( !m_catalog ) {
57  log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
58  return StatusCode::FAILURE;
59  }
60  m_incSvc = serviceLocator()->service( "IncidentSvc" );
61  if ( !m_incSvc ) {
62  log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
63  return status;
64  }
65  return status;
66 }
67 
69 StatusCode IODataManager::finalize()
70 {
71  m_catalog = nullptr; // release
72  return Service::finalize();
73 }
74 
76 StatusCode IODataManager::error( CSTR msg, bool rethrow )
77 {
78  MsgStream log( msgSvc(), name() );
79  log << MSG::ERROR << "Error: " << msg << endmsg;
80  if ( rethrow ) System::breakExecution();
81  return StatusCode::FAILURE;
82 }
83 
85 IODataManager::Connections IODataManager::connections( const IInterface* owner ) const
86 {
87  Connections conns;
88  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( conns ),
89  []( ConnectionMap::const_reference i ) { return i.second->connection; },
90  [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
91  return conns;
92 }
93 
95 StatusCode IODataManager::connectRead( bool keep_open, Connection* con )
96 {
97  if ( !establishConnection( con ) ) {
98  return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
99  }
100  std::string dsn = con ? con->name() : std::string( "Unknown" );
101  return error( "Failed to connect to data:" + dsn, false );
102 }
103 
105 StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype )
106 {
107  if ( !establishConnection( con ) ) {
108  return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con );
109  }
110  std::string dsn = con ? con->name() : std::string( "Unknown" );
111  return error( "Failed to connect to data:" + dsn, false );
112 }
113 
115 StatusCode IODataManager::read( Connection* con, void* const data, size_t len )
116 {
117  return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
118 }
119 
121 StatusCode IODataManager::write( Connection* con, const void* data, int len )
122 {
123  return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
124 }
125 
127 long long int IODataManager::seek( Connection* con, long long int where, int origin )
128 {
129  return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
130 }
131 
132 StatusCode IODataManager::disconnect( Connection* con )
133 {
134  if ( con ) {
135  std::string dataset = con->name();
136  std::string dsn = dataset;
137  StatusCode sc = con->disconnect();
138  if (::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
139  dsn = dataset.substr( 4 );
140  else if (::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
141  dsn = dataset.substr( 4 );
142  else if (::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
143  dsn = dataset.substr( 4 );
144 
145  auto j = m_fidMap.find( dataset );
146  if ( j != m_fidMap.end() ) {
147  std::string fid = j->second;
148  std::string gfal_name = "gfal:guid:" + fid;
149  auto i = m_connectionMap.find( fid );
150  m_fidMap.erase( j );
151  if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
152  if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
153  if ( i != m_connectionMap.end() && i->second ) {
154  IDataConnection* c = i->second->connection;
155  if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
156  if ( c->isConnected() ) {
157  MsgStream log( msgSvc(), name() );
158  c->disconnect();
159  log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
160  }
161  delete i->second;
162  m_connectionMap.erase( i );
163  }
164  }
165  return sc;
166  }
167  return StatusCode::FAILURE;
168 }
169 
170 StatusCode IODataManager::reconnect( Entry* e )
171 {
173  if ( e && e->connection ) {
174  switch ( e->ioType ) {
175  case Connection::READ:
176  sc = e->connection->connectRead();
177  break;
178  case Connection::UPDATE:
179  case Connection::CREATE:
180  case Connection::RECREATE:
181  sc = e->connection->connectWrite( e->ioType );
182  break;
183  default:
184  return StatusCode::FAILURE;
185  }
186  if ( sc.isSuccess() && e->ioType == Connection::READ ) {
187  std::vector<Entry*> to_retire;
188  e->connection->resetAge();
189  transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
190  select2nd, [&]( Entry* i ) {
192  return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
193  } );
194  if ( !to_retire.empty() ) {
195  MsgStream log( msgSvc(), name() );
196  std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
198  c->disconnect();
199  log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
200  } );
201  }
202  }
203  }
204  return sc;
205 }
206 
208 IIODataManager::Connection* IODataManager::connection( CSTR dataset ) const
209 {
210  auto j = m_fidMap.find( dataset );
211  if ( j == m_fidMap.end() ) return nullptr;
212  auto i = m_connectionMap.find( j->second );
213  return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
214 }
215 
216 StatusCode IODataManager::establishConnection( Connection* con )
217 {
218  if ( !con ) return error( "Severe logic bug: No connection object avalible.", true );
219 
220  if ( con->isConnected() ) {
221  con->resetAge();
222  return StatusCode::SUCCESS;
223  }
224  auto i = m_connectionMap.find( con->name() );
225  if ( i != m_connectionMap.end() ) {
226  Connection* c = i->second->connection;
227  if ( c != con ) {
228  m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
229  return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
230  }
231  if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
232  }
233  return StatusCode::FAILURE;
234 }
235 
236 StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
237  Connection* connection )
238 {
239  MsgStream log( msgSvc(), name() );
240  std::string dsn = dataset;
241  try {
242  StatusCode sc( StatusCode::SUCCESS, true );
243  if (::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
244  dsn = dataset.substr( 4 ), typ = FID;
245  else if (::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
246  dsn = dataset.substr( 4 ), typ = LFN;
247  else if (::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
248  dsn = dataset.substr( 4 ), typ = PFN;
249  else if ( typ == UNKNOWN )
250  return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
251 
252  if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
253  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
254  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
255  }
256  if ( typ == FID ) {
257  auto fi = m_connectionMap.find( dsn );
258  if ( fi == m_connectionMap.end() ) {
259  IFileCatalog::Files files;
260  m_catalog->getPFN( dsn, files );
261  if ( files.empty() ) {
262  if ( !m_useGFAL ) {
263  if ( m_quarantine ) s_badFiles.insert( dsn );
264  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
265  error( "connectDataIO> failed to resolve FID:" + dsn, false ).ignore();
266  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
267  } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
268  std::string gfal_name = "gfal:guid:" + dsn;
269  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
270  sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
271  if ( sc.isSuccess() ) return sc;
272  if ( m_quarantine ) s_badFiles.insert( dsn );
273  }
274  if ( m_quarantine ) s_badFiles.insert( dsn );
275  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
276  error( "connectDataIO> Failed to resolve FID:" + dsn, false ).ignore();
277  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
278  }
279  // keep track of the current return code before we start iterating over
280  // replicas
281  auto appmgr = serviceLocator()->as<IProperty>();
282  int origReturnCode = Gaudi::getAppReturnCode( appmgr );
283  for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
284  std::string pfn = i->first;
285  if ( i != files.cbegin() ) {
286  log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
287  << "." << endmsg;
288  }
289  sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
290  if ( !sc.isSuccess() ) {
291  if ( m_quarantine ) s_badFiles.insert( pfn );
292  m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
293  } else {
294  m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
295  // we found a working replica, let's reset the return code to the old value
296  Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
297  return sc;
298  }
299  }
300  log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
301  << files.size() << " entries." << endmsg;
302  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
303  }
304  return StatusCode::FAILURE;
305  }
306  std::string fid;
307  auto j = m_fidMap.find( dsn );
308  if ( j == m_fidMap.end() ) {
309  IFileCatalog::Files files;
310  switch ( typ ) {
311  case LFN:
312  fid = m_catalog->lookupLFN( dsn );
313  if ( fid.empty() ) {
314  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
315  log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
316  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
317  }
318  break;
319  case PFN:
320  fid = m_catalog->lookupPFN( dsn );
321  if ( !fid.empty() ) m_catalog->getPFN( fid, files );
322  if ( files.empty() ) {
323  if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
324  if ( fid.empty() ) fid = m_catalog->createFID();
325  m_catalog->registerPFN( fid, dsn, technology );
326  log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
327  } else {
328  fid = dsn;
329  }
330  }
331  break;
332  }
333  } else {
334  fid = j->second;
335  }
336  if ( typ == PFN ) {
337  // Open PFN
338  auto fi = m_connectionMap.find( fid );
339  if ( fi == m_connectionMap.end() ) {
340  connection->setFID( fid );
341  connection->setPFN( dsn );
342  auto e = new Entry( technology, keep_open, rw, connection );
343  // Here we open the file!
344  if ( !reconnect( e ).isSuccess() ) {
345  delete e;
346  if ( m_quarantine ) s_badFiles.insert( dsn );
347  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
348  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
349  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
350  }
351  fid = connection->fid();
352  m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
353  if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
354  if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
355  log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
356  log << "You may not be able to navigate back to the input file"
357  << " -- processing continues" << endmsg;
358  }
359  }
360  m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
361  return StatusCode::SUCCESS;
362  }
363  // Here we open the file!
364  if ( !reconnect( ( *fi ).second ).isSuccess() ) {
365  if ( m_quarantine ) s_badFiles.insert( dsn );
366  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
367  error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid, false ).ignore();
368  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
369  }
370  return StatusCode::SUCCESS;
371  }
372  sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
373  if ( !sc.isSuccess() && m_quarantine ) {
374  s_badFiles.insert( fid );
375  } else if ( typ == LFN ) {
376  m_fidMap[dataset] = fid;
377  }
378  return sc;
379  } catch ( std::exception& e ) {
380  error( std::string( "connectDataIO> Caught exception:" ) + e.what(), false ).ignore();
381  } catch ( ... ) {
382  error( std::string( "connectDataIO> Caught unknown exception" ), false ).ignore();
383  }
384  m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
385  error( "connectDataIO> The dataset " + dsn + " cannot be opened.", false ).ignore();
386  s_badFiles.insert( dsn );
387  return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
388 }
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition: AppReturnCode.h:72
constexpr static const auto FAILURE
Definition: StatusCode.h:88
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:63
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
T empty(T...args)
const std::string & fid() const
Access file id.
StatusCode finalize() override
Definition: Service.cpp:173
bool isSuccess() const
Definition: StatusCode.h:287
STL namespace.
def read(f, regex='.*', skipevents=0)
Definition: hivetimeline.py:22
T end(T...args)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
Definition: Debugger.cpp:48
const std::string & pfn() const
Access physical file name.
STL class.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
T what(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
Definition of the basic interface.
Definition: IInterface.h:277
Definition: IODataManager.h:34
STL class.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
Definition: AppReturnCode.h:51
T move(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
T insert(T...args)
T find(T...args)
T length(T...args)
STL class.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T begin(T...args)
T back_inserter(T...args)
IDataConnection * connection
Definition: IODataManager.h:37
void resetAge()
Reset age.
const StatusCode & ignore() const
Ignore/check StatusCode.
Definition: StatusCode.h:165
T c_str(T...args)
Base class for all Incidents (computing events).
Definition: Incident.h:17
T substr(T...args)
bool keepOpen
Definition: IODataManager.h:38
The IProperty is the basic interface for all components which have properties that can be set or get...
Definition: IProperty.h:20
T for_each(T...args)
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
Definition: __init__.py:1
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
IoType ioType
Definition: IODataManager.h:36