16 constexpr
struct select2nd_t {
17 template <
typename S,
typename T>
23 template <
typename InputIterator,
typename OutputIterator,
typename UnaryOperation,
typename UnaryPredicate>
24 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
25 UnaryPredicate pred ) {
26 while ( first != last ) {
27 auto val = op( *first );
28 if ( pred( val ) ) *result++ =
std::move( val );
35 using namespace Gaudi;
39 static
std::set<
std::
string> s_badFiles;
51 m_catalog = serviceLocator()->service( m_catalogSvcName );
53 log <<
MSG::ERROR <<
"Unable to localize interface IFileCatalog from service:" << m_catalogSvcName <<
endmsg;
56 m_incSvc = serviceLocator()->service(
"IncidentSvc" );
79 IODataManager::Connections IODataManager::connections(
const IInterface* owner )
const {
83 []( ConnectionMap::const_reference i ) {
return i.second->connection; },
89 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con ) {
90 if ( !establishConnection( con ) ) {
94 return error(
"Failed to connect to data:" + dsn,
false );
98 StatusCode IODataManager::connectWrite( Connection* con, IoType mode,
CSTR doctype ) {
99 if ( !establishConnection( con ) ) {
return connectDataIO(
UNKNOWN, mode, con->name(), doctype,
true, con ); }
101 return error(
"Failed to connect to data:" + dsn,
false );
106 return establishConnection( con ).isSuccess() ? con->read( data, len ) :
StatusCode::FAILURE;
110 StatusCode IODataManager::write( Connection* con,
const void* data,
int len ) {
115 long long int IODataManager::seek( Connection* con,
long long int where,
int origin ) {
116 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
124 if ( ::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
125 dsn = dataset.
substr( 4 );
126 else if ( ::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
127 dsn = dataset.
substr( 4 );
128 else if ( ::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
129 dsn = dataset.
substr( 4 );
131 auto j = m_fidMap.find( dataset );
132 if ( j != m_fidMap.end() ) {
135 auto i = m_connectionMap.
find( fid );
137 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
138 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
139 if ( i != m_connectionMap.end() && i->second ) {
141 if ( ( j = m_fidMap.find( c->
pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
145 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn <<
" [" << fid <<
"]" <<
endmsg;
148 m_connectionMap.erase( i );
163 case Connection::UPDATE:
165 case Connection::RECREATE:
175 select2nd, [&](
Entry* i ) {
179 if ( !to_retire.
empty() ) {
194 auto j = m_fidMap.find( dataset );
195 if ( j == m_fidMap.end() )
return nullptr;
196 auto i = m_connectionMap.find( j->second );
197 return ( i != m_connectionMap.end() ) ? i->second->connection :
nullptr;
200 StatusCode IODataManager::establishConnection( Connection* con ) {
201 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true );
203 if ( con->isConnected() ) {
207 auto i = m_connectionMap.find( con->name() );
208 if ( i != m_connectionMap.end() ) {
209 Connection*
c = i->second->connection;
211 m_incSvc->fireIncident(
Incident( con->name(), IncidentType::FailInputFile ) );
212 return error(
"Severe logic bug: Twice identical connection object for DSN:" + con->name(),
true );
219 StatusCode IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,
220 Connection* connection ) {
225 if ( ::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
226 dsn = dataset.
substr( 4 ), typ = FID;
227 else if ( ::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
228 dsn = dataset.
substr( 4 ), typ = LFN;
229 else if ( ::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
230 dsn = dataset.
substr( 4 ), typ = PFN;
232 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
235 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
236 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
239 auto fi = m_connectionMap.find( dsn );
240 if ( fi == m_connectionMap.end() ) {
242 m_catalog->getPFN( dsn, files );
243 if ( files.
empty() ) {
245 if ( m_quarantine ) s_badFiles.
insert( dsn );
246 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
247 error(
"connectDataIO> failed to resolve FID:" + dsn,
false ).ignore();
248 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
249 }
else if ( dsn.
length() == 36 && dsn[8] ==
'-' && dsn[13] ==
'-' ) {
251 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
252 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
254 if ( m_quarantine ) s_badFiles.
insert( dsn );
256 if ( m_quarantine ) s_badFiles.
insert( dsn );
257 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
258 error(
"connectDataIO> Failed to resolve FID:" + dsn,
false ).ignore();
259 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
263 auto appmgr = serviceLocator()->as<
IProperty>();
265 for (
auto i = files.
cbegin(); i != files.
cend(); ++i ) {
267 if ( i != files.
cbegin() ) {
268 log <<
MSG::WARNING <<
"Attempt to connect dsn:" << dsn <<
" with next entry in data federation:" << pfn
271 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
273 if ( m_quarantine ) s_badFiles.
insert( pfn );
274 m_incSvc->fireIncident(
Incident( pfn, IncidentType::FailInputFile ) );
276 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
282 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn <<
" Federated file could not be resolved from " 284 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
289 auto j = m_fidMap.
find( dsn );
290 if ( j == m_fidMap.end() ) {
294 fid = m_catalog->lookupLFN( dsn );
296 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
297 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn <<
" Cannot access this dataset." <<
endmsg;
298 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
302 fid = m_catalog->lookupPFN( dsn );
303 if ( !fid.
empty() ) m_catalog->getPFN( fid, files );
304 if ( files.
empty() ) {
306 if ( fid.
empty() ) fid = m_catalog->createFID();
307 m_catalog->registerPFN( fid, dsn, technology );
308 log <<
MSG::INFO <<
"Referring to dataset " << dsn <<
" by its file ID:" << fid <<
endmsg;
320 auto fi = m_connectionMap.
find( fid );
321 if ( fi == m_connectionMap.end() ) {
322 connection->setFID( fid );
323 connection->setPFN( dsn );
324 auto e =
new Entry( technology, keep_open, rw, connection );
326 if ( !reconnect( e ).isSuccess() ) {
328 if ( m_quarantine ) s_badFiles.
insert( dsn );
329 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
330 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
331 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
333 fid = connection->fid();
334 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
336 if ( !m_disablePFNWarning && strcasecmp( dsn.
c_str(), fid.
c_str() ) == 0 ) {
337 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn <<
" by its physical name." <<
endmsg;
338 log <<
"You may not be able to navigate back to the input file" 339 <<
" -- processing continues" <<
endmsg;
342 m_connectionMap.emplace( fid, e );
346 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
347 if ( m_quarantine ) s_badFiles.
insert( dsn );
348 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
349 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
350 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
354 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
357 }
else if ( typ == LFN ) {
358 m_fidMap[dataset] = fid;
363 }
catch ( ... ) { error(
std::string(
"connectDataIO> Caught unknown exception" ),
false ).ignore(); }
364 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
365 error(
"connectDataIO> The dataset " + dsn +
" cannot be opened.",
false ).ignore();
367 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
class MergingTransformer< Out(const vector_of_const_< In > true
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
constexpr static const auto SUCCESS
def read(f, regex='.*', skipevents=0)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
ABC describing basic data connection.
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
const StatusCode & ignore() const
Ignore/check StatusCode.
Base class for all Incidents (computing events).
constexpr static const auto FAILURE
class MergingTransformer< Out(const vector_of_const_< In > false
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Header file for std:chrono::duration-based Counters.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.