17 constexpr
struct select2nd_t {
18 template <
typename S,
typename T>
25 template <
typename InputIterator,
typename OutputIterator,
typename UnaryOperation,
typename UnaryPredicate>
26 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
29 while ( first != last ) {
30 auto val = op( *first );
31 if ( pred( val ) ) *result++ =
std::move( val );
38 using namespace Gaudi;
42 static
std::set<
std::
string> s_badFiles;
55 m_catalog = serviceLocator()->service( m_catalogSvcName );
57 log <<
MSG::ERROR <<
"Unable to localize interface IFileCatalog from service:" << m_catalogSvcName <<
endmsg;
60 m_incSvc = serviceLocator()->service(
"IncidentSvc" );
85 IODataManager::Connections IODataManager::connections(
const IInterface* owner )
const 89 []( ConnectionMap::const_reference i ) {
return i.second->connection; },
95 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con )
97 if ( !establishConnection( con ) ) {
101 return error(
"Failed to connect to data:" + dsn,
false );
105 StatusCode IODataManager::connectWrite( Connection* con, IoType mode,
CSTR doctype )
107 if ( !establishConnection( con ) ) {
108 return connectDataIO(
UNKNOWN, mode, con->name(), doctype,
true, con );
111 return error(
"Failed to connect to data:" + dsn,
false );
117 return establishConnection( con ).isSuccess() ? con->read( data, len ) :
StatusCode::FAILURE;
123 return establishConnection( con ).isSuccess() ? con->write( data, len ) :
StatusCode::FAILURE;
127 long long int IODataManager::seek( Connection* con,
long long int where,
int origin )
129 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
138 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
139 dsn = dataset.
substr( 4 );
140 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
141 dsn = dataset.
substr( 4 );
142 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
143 dsn = dataset.
substr( 4 );
145 auto j = m_fidMap.find( dataset );
146 if ( j != m_fidMap.end() ) {
149 auto i = m_connectionMap.
find( fid );
151 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
152 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
153 if ( i != m_connectionMap.end() && i->second ) {
155 if ( ( j = m_fidMap.find( c->
pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
159 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn <<
" [" << fid <<
"]" <<
endmsg;
162 m_connectionMap.erase( i );
178 case Connection::UPDATE:
180 case Connection::RECREATE:
190 select2nd, [&](
Entry* i ) {
194 if ( !to_retire.
empty() ) {
210 auto j = m_fidMap.find( dataset );
211 if ( j == m_fidMap.end() )
return nullptr;
212 auto i = m_connectionMap.find( j->second );
213 return ( i != m_connectionMap.end() ) ? i->second->connection :
nullptr;
216 StatusCode IODataManager::establishConnection( Connection* con )
218 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true );
220 if ( con->isConnected() ) {
224 auto i = m_connectionMap.find( con->name() );
225 if ( i != m_connectionMap.end() ) {
226 Connection*
c = i->second->connection;
228 m_incSvc->fireIncident(
Incident( con->name(), IncidentType::FailInputFile ) );
229 return error(
"Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
236 StatusCode IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,
237 Connection* connection )
243 if (::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
244 dsn = dataset.
substr( 4 ), typ = FID;
245 else if (::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
246 dsn = dataset.
substr( 4 ), typ = LFN;
247 else if (::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
248 dsn = dataset.
substr( 4 ), typ = PFN;
250 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
253 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
254 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
257 auto fi = m_connectionMap.find( dsn );
258 if ( fi == m_connectionMap.end() ) {
260 m_catalog->getPFN( dsn, files );
261 if ( files.
empty() ) {
263 if ( m_quarantine ) s_badFiles.
insert( dsn );
264 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
265 error(
"connectDataIO> failed to resolve FID:" + dsn,
false ).ignore();
266 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
267 }
else if ( dsn.
length() == 36 && dsn[8] ==
'-' && dsn[13] ==
'-' ) {
269 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
270 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
272 if ( m_quarantine ) s_badFiles.
insert( dsn );
274 if ( m_quarantine ) s_badFiles.
insert( dsn );
275 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
276 error(
"connectDataIO> Failed to resolve FID:" + dsn,
false ).ignore();
277 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
281 auto appmgr = serviceLocator()->as<
IProperty>();
283 for (
auto i = files.
cbegin(); i != files.
cend(); ++i ) {
285 if ( i != files.
cbegin() ) {
286 log <<
MSG::WARNING <<
"Attempt to connect dsn:" << dsn <<
" with next entry in data federation:" << pfn
289 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
291 if ( m_quarantine ) s_badFiles.
insert( pfn );
292 m_incSvc->fireIncident(
Incident( pfn, IncidentType::FailInputFile ) );
294 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
300 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn <<
" Federated file could not be resolved from " 302 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
307 auto j = m_fidMap.
find( dsn );
308 if ( j == m_fidMap.end() ) {
312 fid = m_catalog->lookupLFN( dsn );
314 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
315 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn <<
" Cannot access this dataset." <<
endmsg;
316 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
320 fid = m_catalog->lookupPFN( dsn );
321 if ( !fid.
empty() ) m_catalog->getPFN( fid, files );
322 if ( files.
empty() ) {
324 if ( fid.
empty() ) fid = m_catalog->createFID();
325 m_catalog->registerPFN( fid, dsn, technology );
326 log <<
MSG::INFO <<
"Referring to dataset " << dsn <<
" by its file ID:" << fid <<
endmsg;
338 auto fi = m_connectionMap.
find( fid );
339 if ( fi == m_connectionMap.end() ) {
340 connection->setFID( fid );
341 connection->setPFN( dsn );
342 auto e =
new Entry( technology, keep_open, rw, connection );
344 if ( !reconnect( e ).isSuccess() ) {
346 if ( m_quarantine ) s_badFiles.
insert( dsn );
347 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
348 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
349 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
351 fid = connection->fid();
352 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
354 if ( !m_disablePFNWarning && strcasecmp( dsn.
c_str(), fid.
c_str() ) == 0 ) {
355 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn <<
" by its physical name." <<
endmsg;
356 log <<
"You may not be able to navigate back to the input file" 357 <<
" -- processing continues" <<
endmsg;
360 m_connectionMap.emplace( fid, e );
364 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
365 if ( m_quarantine ) s_badFiles.
insert( dsn );
366 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
367 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
368 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
372 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
375 }
else if ( typ == LFN ) {
376 m_fidMap[dataset] = fid;
380 error(
std::string(
"connectDataIO> Caught exception:" ) + e.
what(), false ).ignore();
382 error(
std::string(
"connectDataIO> Caught unknown exception" ),
false ).ignore();
384 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
385 error(
"connectDataIO> The dataset " + dsn +
" cannot be opened.",
false ).ignore();
387 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
constexpr static const auto FAILURE
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
def read(f, regex='.*', skipevents=0)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
constexpr static const auto SUCCESS
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
const StatusCode & ignore() const
Ignore/check StatusCode.
Base class for all Incidents (computing events).
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.