17 constexpr
struct select2nd_t {
18 template <
typename S,
typename T>
25 template <
typename InputIterator,
typename OutputIterator,
typename UnaryOperation,
typename UnaryPredicate>
26 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
29 while ( first != last ) {
30 auto val = op( *first );
31 if ( pred( val ) ) *result++ =
std::move( val );
38 using namespace Gaudi;
57 m_catalog = serviceLocator()->service( m_catalogSvcName );
59 log <<
MSG::ERROR <<
"Unable to localize interface IFileCatalog from service:" << m_catalogSvcName <<
endmsg;
62 m_incSvc = serviceLocator()->service(
"IncidentSvc" );
87 IODataManager::Connections IODataManager::connections(
const IInterface* owner )
const 91 []( ConnectionMap::const_reference i ) {
return i.second->connection; },
97 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con )
99 if ( !establishConnection( con ) ) {
103 return error(
"Failed to connect to data:" + dsn,
false );
107 StatusCode IODataManager::connectWrite( Connection* con, IoType mode,
CSTR doctype )
109 if ( !establishConnection( con ) ) {
110 return connectDataIO(
UNKNOWN, mode, con->name(), doctype,
true, con );
113 return error(
"Failed to connect to data:" + dsn,
false );
119 return establishConnection( con ).isSuccess() ? con->read( data, len ) :
S_ERROR;
125 return establishConnection( con ).isSuccess() ? con->write( data, len ) :
S_ERROR;
129 long long int IODataManager::seek( Connection* con,
long long int where,
int origin )
131 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
140 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
141 dsn = dataset.
substr( 4 );
142 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
143 dsn = dataset.
substr( 4 );
144 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
145 dsn = dataset.
substr( 4 );
147 auto j = m_fidMap.find( dataset );
148 if ( j != m_fidMap.end() ) {
151 auto i = m_connectionMap.
find( fid );
153 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
154 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
155 if ( i != m_connectionMap.end() && i->second ) {
157 if ( ( j = m_fidMap.find( c->
pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
161 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn <<
" [" << fid <<
"]" <<
endmsg;
164 m_connectionMap.erase( i );
180 case Connection::UPDATE:
182 case Connection::RECREATE:
192 select2nd, [&](
Entry* i ) {
196 if ( !to_retire.
empty() ) {
212 auto j = m_fidMap.find( dataset );
213 if ( j == m_fidMap.end() )
return nullptr;
214 auto i = m_connectionMap.find( j->second );
215 return ( i != m_connectionMap.end() ) ? i->second->connection :
nullptr;
218 StatusCode IODataManager::establishConnection( Connection* con )
220 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true );
222 if ( con->isConnected() ) {
226 auto i = m_connectionMap.find( con->name() );
227 if ( i != m_connectionMap.end() ) {
228 Connection*
c = i->second->connection;
230 m_incSvc->fireIncident(
Incident( con->name(), IncidentType::FailInputFile ) );
231 return error(
"Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
233 if ( reconnect( i->second ).isSuccess() )
return S_OK;
238 StatusCode IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,
239 Connection* connection )
245 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
246 dsn = dataset.
substr( 4 ), typ = FID;
247 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
248 dsn = dataset.
substr( 4 ), typ = LFN;
249 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
250 dsn = dataset.
substr( 4 ), typ = PFN;
252 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
255 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
256 return IDataConnection::BAD_DATA_CONNECTION;
259 auto fi = m_connectionMap.find( dsn );
260 if ( fi == m_connectionMap.end() ) {
262 m_catalog->getPFN( dsn, files );
263 if ( files.
empty() ) {
265 if ( m_quarantine ) s_badFiles.
insert( dsn );
266 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
267 error(
"connectDataIO> failed to resolve FID:" + dsn,
false ).ignore();
268 return IDataConnection::BAD_DATA_CONNECTION;
269 }
else if ( dsn.
length() == 36 && dsn[8] ==
'-' && dsn[13] ==
'-' ) {
271 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
272 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
274 if ( m_quarantine ) s_badFiles.
insert( dsn );
276 if ( m_quarantine ) s_badFiles.
insert( dsn );
277 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
278 error(
"connectDataIO> Failed to resolve FID:" + dsn,
false ).ignore();
279 return IDataConnection::BAD_DATA_CONNECTION;
283 auto appmgr = serviceLocator()->as<
IProperty>();
285 for (
auto i = files.
cbegin(); i != files.
cend(); ++i ) {
287 if ( i != files.
cbegin() ) {
288 log <<
MSG::WARNING <<
"Attempt to connect dsn:" << dsn <<
" with next entry in data federation:" << pfn
291 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
293 if ( m_quarantine ) s_badFiles.
insert( pfn );
294 m_incSvc->fireIncident(
Incident( pfn, IncidentType::FailInputFile ) );
296 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
302 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn <<
" Federated file could not be resolved from " 304 return IDataConnection::BAD_DATA_CONNECTION;
309 auto j = m_fidMap.
find( dsn );
310 if ( j == m_fidMap.end() ) {
314 fid = m_catalog->lookupLFN( dsn );
316 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
317 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn <<
" Cannot access this dataset." <<
endmsg;
318 return IDataConnection::BAD_DATA_CONNECTION;
322 fid = m_catalog->lookupPFN( dsn );
323 if ( !fid.
empty() ) m_catalog->getPFN( fid, files );
324 if ( files.
empty() ) {
326 if ( fid.
empty() ) fid = m_catalog->createFID();
327 m_catalog->registerPFN( fid, dsn, technology );
328 log <<
MSG::INFO <<
"Referring to dataset " << dsn <<
" by its file ID:" << fid <<
endmsg;
340 auto fi = m_connectionMap.
find( fid );
341 if ( fi == m_connectionMap.end() ) {
342 connection->setFID( fid );
343 connection->setPFN( dsn );
344 auto e =
new Entry( technology, keep_open, rw, connection );
346 if ( !reconnect( e ).isSuccess() ) {
348 if ( m_quarantine ) s_badFiles.
insert( dsn );
349 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
350 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
351 return IDataConnection::BAD_DATA_CONNECTION;
353 fid = connection->fid();
354 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
356 if ( !m_disablePFNWarning && strcasecmp( dsn.
c_str(), fid.
c_str() ) == 0 ) {
357 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn <<
" by its physical name." <<
endmsg;
358 log <<
"You may not be able to navigate back to the input file" 359 <<
" -- processing continues" <<
endmsg;
362 m_connectionMap.emplace( fid, e );
366 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
367 if ( m_quarantine ) s_badFiles.
insert( dsn );
368 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
369 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
370 return IDataConnection::BAD_DATA_CONNECTION;
374 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
377 }
else if ( typ == LFN ) {
378 m_fidMap[dataset] = fid;
382 error(
std::string(
"connectDataIO> Caught exception:" ) + e.
what(), false ).ignore();
384 error(
std::string(
"connectDataIO> Caught unknown exception" ),
false ).ignore();
386 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
387 error(
"connectDataIO> The dataset " + dsn +
" cannot be opened.",
false ).ignore();
389 return IDataConnection::BAD_DATA_CONNECTION;
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
bool isSuccess() const
Test for a status code of SUCCESS.
def read(f, regex='.*', skipevents=0)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
Base class for all Incidents (computing events).
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.