The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
IODataManager.cpp
Go to the documentation of this file.
1/***********************************************************************************\
2* (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3* *
4* This software is distributed under the terms of the Apache version 2 licence, *
5* copied verbatim in the file "LICENSE". *
6* *
7* In applying this licence, CERN does not waive the privileges and immunities *
8* granted to it by virtue of its status as an Intergovernmental Organization *
9* or submit itself to any jurisdiction. *
10\***********************************************************************************/
11// Framework include files
12#include "IODataManager.h"
17#include <GaudiKernel/SmartIF.h>
19
20#include <set>
21#include <strings.h>
22
23namespace {
24
25 constexpr struct select2nd_t {
26 template <typename S, typename T>
27 const T& operator()( const std::pair<S, T>& p ) const {
28 return p.second;
29 }
30 } select2nd{};
31
32 template <typename InputIterator, typename OutputIterator, typename UnaryOperation, typename UnaryPredicate>
33 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
34 UnaryPredicate pred ) {
35 while ( first != last ) {
36 auto val = op( *first );
37 if ( pred( val ) ) *result++ = std::move( val );
38 ++first;
39 }
40 return result;
41 }
42} // namespace
43
44using namespace Gaudi;
45
47
48static std::set<std::string> s_badFiles;
49
52 // Initialize base class
54 MsgStream log( msgSvc(), name() );
55 if ( !status.isSuccess() ) {
56 log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
57 return status;
58 }
59 // Retrieve conversion service handling event iteration
61 if ( !m_catalog ) {
62 log << MSG::ERROR << "Unable to localize interface IFileCatalog from service:" << m_catalogSvcName << endmsg;
64 }
65 m_incSvc = serviceLocator()->service( "IncidentSvc" );
66 if ( !m_incSvc ) {
67 log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
68 return status;
69 }
70 return status;
71}
72
75 m_catalog = nullptr; // release
76 return Service::finalize();
77}
78
81 MsgStream log( msgSvc(), name() );
82 log << MSG::ERROR << "Error: " << msg << endmsg;
84}
85
87IODataManager::Connections IODataManager::connections( const IInterface* owner ) const {
88 Connections conns;
89 transform_copy_if(
90 std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( conns ),
91 []( ConnectionMap::const_reference i ) { return i.second->connection; },
92 [&]( const IDataConnection* c ) { return !owner || c->owner() == owner; } );
93 return conns;
94}
95
97StatusCode IODataManager::connectRead( bool keep_open, Connection* con ) {
98 if ( !establishConnection( con ) ) {
99 return connectDataIO( UNKNOWN, Connection::READ, con->name(), "UNKNOWN", keep_open, con );
100 }
101 std::string dsn = con ? con->name() : std::string( "Unknown" );
102 return error( "Failed to connect to data:" + dsn );
103}
104
106StatusCode IODataManager::connectWrite( Connection* con, IoType mode, CSTR doctype ) {
107 if ( !establishConnection( con ) ) { return connectDataIO( UNKNOWN, mode, con->name(), doctype, true, con ); }
108 std::string dsn = con ? con->name() : std::string( "Unknown" );
109 return error( "Failed to connect to data:" + dsn );
110}
111
113StatusCode IODataManager::read( Connection* con, void* const data, size_t len ) {
114 return establishConnection( con ).isSuccess() ? con->read( data, len ) : StatusCode::FAILURE;
115}
116
118StatusCode IODataManager::write( Connection* con, const void* data, int len ) {
119 return establishConnection( con ).isSuccess() ? con->write( data, len ) : StatusCode::FAILURE;
120}
121
123long long int IODataManager::seek( Connection* con, long long int where, int origin ) {
124 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
125}
126
128 if ( con ) {
129 std::string dataset = con->name();
130 std::string dsn = dataset;
131 StatusCode sc = con->disconnect();
132 if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
133 dsn = dataset.substr( 4 );
134 else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
135 dsn = dataset.substr( 4 );
136 else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
137 dsn = dataset.substr( 4 );
138
139 auto j = m_fidMap.find( dataset );
140 if ( j != m_fidMap.end() ) {
141 std::string fid = j->second;
142 std::string gfal_name = "gfal:guid:" + fid;
143 auto i = m_connectionMap.find( fid );
144 m_fidMap.erase( j );
145 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
146 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
147 if ( i != m_connectionMap.end() && i->second ) {
148 IDataConnection* c = i->second->connection;
149 if ( ( j = m_fidMap.find( c->pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
150 if ( c->isConnected() ) {
151 MsgStream log( msgSvc(), name() );
152 c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
153 log << MSG::INFO << "Disconnect from dataset " << dsn << " [" << fid << "]" << endmsg;
154 }
155 delete i->second;
156 m_connectionMap.erase( i );
157 }
158 }
159 return sc;
160 }
161 return StatusCode::FAILURE;
162}
163
166 if ( e && e->connection ) {
167 switch ( e->ioType ) {
168 case Connection::READ:
169 sc = e->connection->connectRead();
170 break;
171 case Connection::UPDATE:
172 case Connection::CREATE:
173 case Connection::RECREATE:
174 sc = e->connection->connectWrite( e->ioType );
175 break;
176 default:
177 return StatusCode::FAILURE;
178 }
179 if ( sc.isSuccess() && e->ioType == Connection::READ ) {
180 std::vector<Entry*> to_retire;
181 e->connection->resetAge();
182 transform_copy_if( std::begin( m_connectionMap ), std::end( m_connectionMap ), std::back_inserter( to_retire ),
183 select2nd, [&]( Entry* i ) {
185 return e->connection != c && c->isConnected() && !i->keepOpen && c->ageFile() > m_ageLimit;
186 } );
187 if ( !to_retire.empty() ) {
188 MsgStream log( msgSvc(), name() );
189 std::for_each( std::begin( to_retire ), std::end( to_retire ), [&]( Entry* j ) {
191 c->disconnect().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
192 log << MSG::INFO << "Disconnect from dataset " << c->pfn() << " [" << c->fid() << "]" << endmsg;
193 } );
194 }
195 }
196 }
197 return sc;
198}
199
202 auto j = m_fidMap.find( dataset );
203 if ( j == m_fidMap.end() ) return nullptr;
204 auto i = m_connectionMap.find( j->second );
205 return ( i != m_connectionMap.end() ) ? i->second->connection : nullptr;
206}
207
209 if ( !con ) return error( "Severe logic bug: No connection object avalible." );
210
211 if ( con->isConnected() ) {
212 con->resetAge();
213 return StatusCode::SUCCESS;
214 }
215 auto i = m_connectionMap.find( con->name() );
216 if ( i != m_connectionMap.end() ) {
217 Connection* c = i->second->connection;
218 if ( c != con ) {
219 m_incSvc->fireIncident( Incident( con->name(), IncidentType::FailInputFile ) );
220 return error( "Severe logic bug: Twice identical connection object for DSN:" + con->name() );
221 }
222 if ( reconnect( i->second ).isSuccess() ) return StatusCode::SUCCESS;
223 }
224 return StatusCode::FAILURE;
225}
226
227StatusCode IODataManager::connectDataIO( int typ, IoType rw, CSTR dataset, CSTR technology, bool keep_open,
228 Connection* connection ) {
229 MsgStream log( msgSvc(), name() );
230 std::string dsn = dataset;
231 try {
233 if ( ::strncasecmp( dsn.c_str(), "FID:", 4 ) == 0 )
234 dsn = dataset.substr( 4 ), typ = FID;
235 else if ( ::strncasecmp( dsn.c_str(), "LFN:", 4 ) == 0 )
236 dsn = dataset.substr( 4 ), typ = LFN;
237 else if ( ::strncasecmp( dsn.c_str(), "PFN:", 4 ) == 0 )
238 dsn = dataset.substr( 4 ), typ = PFN;
239 else if ( typ == UNKNOWN )
240 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
241
242 if ( std::find( s_badFiles.begin(), s_badFiles.end(), dsn ) != s_badFiles.end() ) {
243 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
245 }
246 if ( typ == FID ) {
247 auto fi = m_connectionMap.find( dsn );
248 if ( fi == m_connectionMap.end() ) {
250 m_catalog->getPFN( dsn, files );
251 if ( files.empty() ) {
252 if ( !m_useGFAL ) {
253 if ( m_quarantine ) s_badFiles.insert( dsn );
254 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
255 error( "connectDataIO> failed to resolve FID:" + dsn ).ignore();
257 } else if ( dsn.length() == 36 && dsn[8] == '-' && dsn[13] == '-' ) {
258 std::string gfal_name = "gfal:guid:" + dsn;
259 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
260 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
261 if ( sc.isSuccess() ) return sc;
262 if ( m_quarantine ) s_badFiles.insert( dsn );
263 }
264 if ( m_quarantine ) s_badFiles.insert( dsn );
265 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
266 error( "connectDataIO> Failed to resolve FID:" + dsn ).ignore();
268 }
269 // keep track of the current return code before we start iterating over
270 // replicas
271 auto appmgr = serviceLocator()->as<IProperty>();
272 int origReturnCode = Gaudi::getAppReturnCode( appmgr );
273 for ( auto i = files.cbegin(); i != files.cend(); ++i ) {
274 std::string pfn = i->first;
275 if ( i != files.cbegin() ) {
276 log << MSG::WARNING << "Attempt to connect dsn:" << dsn << " with next entry in data federation:" << pfn
277 << "." << endmsg;
278 }
279 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
280 if ( !sc.isSuccess() ) {
281 if ( m_quarantine ) s_badFiles.insert( pfn );
282 m_incSvc->fireIncident( Incident( pfn, IncidentType::FailInputFile ) );
283 } else {
284 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
285 // we found a working replica, let's reset the return code to the old value
286 Gaudi::setAppReturnCode( appmgr, origReturnCode, true ).ignore();
287 return sc;
288 }
289 }
290 log << MSG::ERROR << "Failed to open dsn:" << dsn << " Federated file could not be resolved from "
291 << files.size() << " entries." << endmsg;
293 }
294 return StatusCode::FAILURE;
295 }
296 std::string fid;
297 auto j = m_fidMap.find( dsn );
298 if ( j == m_fidMap.end() ) {
300 switch ( typ ) {
301 case LFN:
302 fid = m_catalog->lookupLFN( dsn );
303 if ( fid.empty() ) {
304 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
305 log << MSG::ERROR << "Failed to resolve LFN:" << dsn << " Cannot access this dataset." << endmsg;
307 }
308 break;
309 case PFN:
310 fid = m_catalog->lookupPFN( dsn );
311 if ( !fid.empty() ) m_catalog->getPFN( fid, files );
312 if ( files.empty() ) {
313 if ( rw == Connection::CREATE || rw == Connection::RECREATE ) {
314 if ( fid.empty() ) fid = m_catalog->createFID();
315 m_catalog->registerPFN( fid, dsn, technology );
316 log << MSG::INFO << "Referring to dataset " << dsn << " by its file ID:" << fid << endmsg;
317 } else {
318 fid = dsn;
319 }
320 }
321 break;
322 }
323 } else {
324 fid = j->second;
325 }
326 if ( typ == PFN ) {
327 // Open PFN
328 auto fi = m_connectionMap.find( fid );
329 if ( fi == m_connectionMap.end() ) {
330 connection->setFID( fid );
331 connection->setPFN( dsn );
332 auto e = new Entry( technology, keep_open, rw, connection );
333 // Here we open the file!
334 if ( !reconnect( e ).isSuccess() ) {
335 delete e;
336 if ( m_quarantine ) s_badFiles.insert( dsn );
337 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
338 error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid ).ignore();
340 }
341 fid = connection->fid();
342 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
343 if ( !( rw == Connection::CREATE || rw == Connection::RECREATE ) ) {
344 if ( !m_disablePFNWarning && strcasecmp( dsn.c_str(), fid.c_str() ) == 0 ) {
345 log << MSG::ERROR << "Referring to existing dataset " << dsn << " by its physical name." << endmsg;
346 log << "You may not be able to navigate back to the input file"
347 << " -- processing continues" << endmsg;
348 }
349 }
350 m_connectionMap.emplace( fid, e ); // note: only if we disconnect does e get deleted??
351 return StatusCode::SUCCESS;
352 }
353 // Here we open the file!
354 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
355 if ( m_quarantine ) s_badFiles.insert( dsn );
356 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
357 error( "connectDataIO> Cannot connect to database: PFN=" + dsn + " FID=" + fid ).ignore();
359 }
360 return StatusCode::SUCCESS;
361 }
362 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
363 if ( !sc.isSuccess() && m_quarantine ) {
364 s_badFiles.insert( fid );
365 } else if ( typ == LFN ) {
366 m_fidMap[dataset] = fid;
367 }
368 return sc;
369 } catch ( std::exception& e ) {
370 error( std::string( "connectDataIO> Caught exception:" ) + e.what() ).ignore();
371 } catch ( ... ) { error( std::string( "connectDataIO> Caught unknown exception" ) ).ignore(); }
372 m_incSvc->fireIncident( Incident( dsn, IncidentType::FailInputFile ) );
373 error( "connectDataIO> The dataset " + dsn + " cannot be opened." ).ignore();
374 s_badFiles.insert( dsn );
376}
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition MsgStream.h:198
#define DECLARE_COMPONENT(type)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
const SmartIF< IMessageSvc > & msgSvc() const
The standard message service.
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
ABC describing basic data connection.
void resetAge()
Reset age.
virtual StatusCode connectRead()=0
Open data stream in read mode.
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
std::vector< NamedItem > Files
IDataConnection Connection
Connection type definition.
ConnectionMap m_connectionMap
Map with I/O descriptors.
StatusCode connectRead(bool keep_open, Connection *ioDesc) override
Open data stream in read mode.
FidMap m_fidMap
Map of FID to PFN.
const std::string & CSTR
Gaudi::Property< bool > m_quarantine
StatusCode write(Connection *con, const void *data, int len) override
Write raw byte buffer to output stream.
long long int seek(Connection *ioDesc, long long int where, int origin) override
Seek on the file described by ioDesc. Arguments as in seek()
StatusCode finalize() override
IService implementation: finalize the service.
StatusCode read(Connection *ioDesc, void *const data, size_t len) override
Read raw byte buffer from input stream.
SmartIF< IFileCatalog > m_catalog
Reference to file catalog.
StatusCode disconnect(Connection *ioDesc) override
Release data stream.
Gaudi::Property< std::string > m_catalogSvcName
StatusCode establishConnection(Connection *con)
StatusCode connectWrite(Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN") override
Open data stream in write mode.
Gaudi::Property< bool > m_useGFAL
Connection * connection(const std::string &dsn) const override
Retrieve known connection.
StatusCode reconnect(Entry *e)
Gaudi::Property< bool > m_disablePFNWarning
Gaudi::Property< int > m_ageLimit
StatusCode initialize() override
IService implementation: initialize the service.
Connections connections(const IInterface *owner) const override
Get connection by owner instance (0=ALL)
SmartIF< IIncidentSvc > m_incSvc
StatusCode connectDataIO(int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
Definition of the basic interface.
Definition IInterface.h:225
The IProperty is the basic interface for all components which have properties that can be set or get.
Definition IProperty.h:32
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
SmartIF< IFace > as()
Definition ISvcLocator.h:64
Base class for all Incidents (computing events).
Definition Incident.h:24
Definition of the MsgStream class used to transmit messages.
Definition MsgStream.h:29
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition Service.cpp:336
StatusCode finalize() override
Definition Service.cpp:223
const std::string & name() const override
Retrieve name of the service.
Definition Service.cpp:333
StatusCode initialize() override
Definition Service.cpp:118
This class is used for returning status codes from appropriate routines.
Definition StatusCode.h:64
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition StatusCode.h:139
bool isSuccess() const
Definition StatusCode.h:314
constexpr static const auto SUCCESS
Definition StatusCode.h:99
constexpr static const auto FAILURE
Definition StatusCode.h:100
This file provides a Grammar for the type Gaudi::Accumulators::Axis It allows to use that type from p...
Definition __init__.py:1
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
@ WARNING
Definition IMessageSvc.h:22
@ ERROR
Definition IMessageSvc.h:22
@ INFO
Definition IMessageSvc.h:22
IoType ioType
IDataConnection * connection
bool keepOpen