17 constexpr
struct select2nd_t {
18 template <
typename S,
typename T>
25 template <
typename InputIterator,
typename OutputIterator,
typename UnaryOperation,
typename UnaryPredicate>
26 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
29 while ( first != last ) {
30 auto val = op( *first );
31 if ( pred( val ) ) *result++ =
std::move( val );
38 using namespace Gaudi;
42 static
std::set<
std::
string> s_badFiles;
55 m_catalog = serviceLocator()->service( m_catalogSvcName );
57 log <<
MSG::ERROR <<
"Unable to localize interface IFileCatalog from service:" << m_catalogSvcName <<
endmsg;
60 m_incSvc = serviceLocator()->service(
"IncidentSvc" );
85 IODataManager::Connections IODataManager::connections(
const IInterface* owner )
const 89 []( ConnectionMap::const_reference i ) {
return i.second->connection; },
95 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con )
97 if ( !establishConnection( con ) ) {
101 return error(
"Failed to connect to data:" + dsn,
false );
105 StatusCode IODataManager::connectWrite( Connection* con, IoType mode,
CSTR doctype )
107 if ( !establishConnection( con ) ) {
108 return connectDataIO(
UNKNOWN, mode, con->name(), doctype,
true, con );
111 return error(
"Failed to connect to data:" + dsn,
false );
117 return establishConnection( con ).isSuccess() ? con->read( data, len ) :
StatusCode::FAILURE;
123 return establishConnection( con ).isSuccess() ? con->write( data, len ) :
StatusCode::FAILURE;
127 long long int IODataManager::seek( Connection* con,
long long int where,
int origin )
129 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
138 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
139 dsn = dataset.
substr( 4 );
140 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
141 dsn = dataset.
substr( 4 );
142 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
143 dsn = dataset.
substr( 4 );
145 auto j = m_fidMap.find( dataset );
146 if ( j != m_fidMap.end() ) {
149 auto i = m_connectionMap.
find( fid );
151 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
152 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
153 if ( i != m_connectionMap.end() && i->second ) {
155 if ( ( j = m_fidMap.find( c->
pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
159 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn <<
" [" << fid <<
"]" <<
endmsg;
162 m_connectionMap.erase( i );
178 case Connection::UPDATE:
180 case Connection::RECREATE:
190 select2nd, [&](
Entry* i ) {
194 if ( !to_retire.
empty() ) {
210 auto j = m_fidMap.find( dataset );
211 if ( j == m_fidMap.end() )
return nullptr;
212 auto i = m_connectionMap.find( j->second );
213 return ( i != m_connectionMap.end() ) ? i->second->connection :
nullptr;
216 StatusCode IODataManager::establishConnection( Connection* con )
218 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true );
220 if ( con->isConnected() ) {
224 auto i = m_connectionMap.find( con->name() );
225 if ( i != m_connectionMap.end() ) {
226 Connection*
c = i->second->connection;
228 m_incSvc->fireIncident(
Incident( con->name(), IncidentType::FailInputFile ) );
229 return error(
"Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
236 StatusCode IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,
237 Connection* connection )
243 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
244 dsn = dataset.
substr( 4 ), typ = FID;
245 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
246 dsn = dataset.
substr( 4 ), typ = LFN;
247 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
248 dsn = dataset.
substr( 4 ), typ = PFN;
250 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
253 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
254 return IDataConnection::BAD_DATA_CONNECTION;
257 auto fi = m_connectionMap.find( dsn );
258 if ( fi == m_connectionMap.end() ) {
260 m_catalog->getPFN( dsn, files );
261 if ( files.
empty() ) {
263 if ( m_quarantine ) s_badFiles.
insert( dsn );
264 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
265 error(
"connectDataIO> failed to resolve FID:" + dsn,
false ).ignore();
266 return IDataConnection::BAD_DATA_CONNECTION;
267 }
else if ( dsn.
length() == 36 && dsn[8] ==
'-' && dsn[13] ==
'-' ) {
269 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
270 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
272 if ( m_quarantine ) s_badFiles.
insert( dsn );
274 if ( m_quarantine ) s_badFiles.
insert( dsn );
275 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
276 error(
"connectDataIO> Failed to resolve FID:" + dsn,
false ).ignore();
277 return IDataConnection::BAD_DATA_CONNECTION;
281 auto appmgr = serviceLocator()->as<
IProperty>();
283 for (
auto i = files.
cbegin(); i != files.
cend(); ++i ) {
285 if ( i != files.
cbegin() ) {
286 log <<
MSG::WARNING <<
"Attempt to connect dsn:" << dsn <<
" with next entry in data federation:" << pfn
289 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
291 if ( m_quarantine ) s_badFiles.
insert( pfn );
292 m_incSvc->fireIncident(
Incident( pfn, IncidentType::FailInputFile ) );
294 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
300 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn <<
" Federated file could not be resolved from " 302 return IDataConnection::BAD_DATA_CONNECTION;
307 auto j = m_fidMap.
find( dsn );
308 if ( j == m_fidMap.end() ) {
312 fid = m_catalog->lookupLFN( dsn );
314 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
315 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn <<
" Cannot access this dataset." <<
endmsg;
316 return IDataConnection::BAD_DATA_CONNECTION;
320 fid = m_catalog->lookupPFN( dsn );
321 if ( !fid.
empty() ) m_catalog->getPFN( fid, files );
322 if ( files.
empty() ) {
324 if ( fid.
empty() ) fid = m_catalog->createFID();
325 m_catalog->registerPFN( fid, dsn, technology );
326 log <<
MSG::INFO <<
"Referring to dataset " << dsn <<
" by its file ID:" << fid <<
endmsg;
338 auto fi = m_connectionMap.
find( fid );
339 if ( fi == m_connectionMap.end() ) {
340 connection->setFID( fid );
341 connection->setPFN( dsn );
342 auto e =
new Entry( technology, keep_open, rw, connection );
344 if ( !reconnect( e ).isSuccess() ) {
346 if ( m_quarantine ) s_badFiles.
insert( dsn );
347 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
348 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
349 return IDataConnection::BAD_DATA_CONNECTION;
351 fid = connection->fid();
352 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
354 if ( !m_disablePFNWarning && strcasecmp( dsn.
c_str(), fid.
c_str() ) == 0 ) {
355 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn <<
" by its physical name." <<
endmsg;
356 log <<
"You may not be able to navigate back to the input file" 357 <<
" -- processing continues" <<
endmsg;
360 m_connectionMap.emplace( fid, e );
364 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
365 if ( m_quarantine ) s_badFiles.
insert( dsn );
366 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
367 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
368 return IDataConnection::BAD_DATA_CONNECTION;
372 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
375 }
else if ( typ == LFN ) {
376 m_fidMap[dataset] = fid;
380 error(
std::string(
"connectDataIO> Caught exception:" ) + e.
what(), false ).ignore();
382 error(
std::string(
"connectDataIO> Caught unknown exception" ),
false ).ignore();
384 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
385 error(
"connectDataIO> The dataset " + dsn +
" cannot be opened.",
false ).ignore();
387 return IDataConnection::BAD_DATA_CONNECTION;
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
bool isSuccess() const
Test for a status code of SUCCESS.
def read(f, regex='.*', skipevents=0)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
Base class for all Incidents (computing events).
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.