16 constexpr
struct select2nd_t {
17 template <
typename S,
typename T>
23 template <
typename InputIterator,
typename OutputIterator,
typename UnaryOperation,
typename UnaryPredicate>
24 OutputIterator transform_copy_if( InputIterator first, InputIterator last, OutputIterator result, UnaryOperation op,
25 UnaryPredicate pred ) {
26 while ( first != last ) {
27 auto val = op( *first );
28 if ( pred( val ) ) *result++ =
std::move( val );
35 using namespace Gaudi;
39 static
std::set<
std::
string> s_badFiles;
51 m_catalog = serviceLocator()->service( m_catalogSvcName );
53 log <<
MSG::ERROR <<
"Unable to localize interface IFileCatalog from service:" << m_catalogSvcName <<
endmsg;
56 m_incSvc = serviceLocator()->service(
"IncidentSvc" );
79 IODataManager::Connections IODataManager::connections(
const IInterface* owner )
const {
82 []( ConnectionMap::const_reference i ) {
return i.second->connection; },
88 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con ) {
89 if ( !establishConnection( con ) ) {
93 return error(
"Failed to connect to data:" + dsn,
false );
97 StatusCode IODataManager::connectWrite( Connection* con, IoType mode,
CSTR doctype ) {
98 if ( !establishConnection( con ) ) {
return connectDataIO(
UNKNOWN, mode, con->name(), doctype,
true, con ); }
100 return error(
"Failed to connect to data:" + dsn,
false );
105 return establishConnection( con ).isSuccess() ? con->read( data, len ) :
StatusCode::FAILURE;
109 StatusCode IODataManager::write( Connection* con,
const void* data,
int len ) {
114 long long int IODataManager::seek( Connection* con,
long long int where,
int origin ) {
115 return establishConnection( con ).isSuccess() ? con->seek( where, origin ) : -1;
123 if ( ::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
124 dsn = dataset.
substr( 4 );
125 else if ( ::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
126 dsn = dataset.
substr( 4 );
127 else if ( ::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
128 dsn = dataset.
substr( 4 );
130 auto j = m_fidMap.find( dataset );
131 if ( j != m_fidMap.end() ) {
134 auto i = m_connectionMap.
find( fid );
136 if ( ( j = m_fidMap.find( fid ) ) != m_fidMap.end() ) m_fidMap.erase( j );
137 if ( ( j = m_fidMap.find( gfal_name ) ) != m_fidMap.end() ) m_fidMap.erase( j );
138 if ( i != m_connectionMap.end() && i->second ) {
140 if ( ( j = m_fidMap.find( c->
pfn() ) ) != m_fidMap.end() ) m_fidMap.erase( j );
144 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn <<
" [" << fid <<
"]" <<
endmsg;
147 m_connectionMap.erase( i );
162 case Connection::UPDATE:
164 case Connection::RECREATE:
174 select2nd, [&](
Entry* i ) {
178 if ( !to_retire.
empty() ) {
193 auto j = m_fidMap.find( dataset );
194 if ( j == m_fidMap.end() )
return nullptr;
195 auto i = m_connectionMap.find( j->second );
196 return ( i != m_connectionMap.end() ) ? i->second->connection :
nullptr;
199 StatusCode IODataManager::establishConnection( Connection* con ) {
200 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true );
202 if ( con->isConnected() ) {
206 auto i = m_connectionMap.find( con->name() );
207 if ( i != m_connectionMap.end() ) {
208 Connection*
c = i->second->connection;
210 m_incSvc->fireIncident(
Incident( con->name(), IncidentType::FailInputFile ) );
211 return error(
"Severe logic bug: Twice identical connection object for DSN:" + con->name(), true );
218 StatusCode IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,
219 Connection* connection ) {
224 if ( ::strncasecmp( dsn.
c_str(),
"FID:", 4 ) == 0 )
225 dsn = dataset.
substr( 4 ), typ = FID;
226 else if ( ::strncasecmp( dsn.
c_str(),
"LFN:", 4 ) == 0 )
227 dsn = dataset.
substr( 4 ), typ = LFN;
228 else if ( ::strncasecmp( dsn.
c_str(),
"PFN:", 4 ) == 0 )
229 dsn = dataset.
substr( 4 ), typ = PFN;
231 return connectDataIO( PFN, rw, dsn, technology, keep_open, connection );
234 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
235 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
238 auto fi = m_connectionMap.find( dsn );
239 if ( fi == m_connectionMap.end() ) {
241 m_catalog->getPFN( dsn, files );
242 if ( files.
empty() ) {
244 if ( m_quarantine ) s_badFiles.
insert( dsn );
245 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
246 error(
"connectDataIO> failed to resolve FID:" + dsn,
false ).ignore();
247 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
248 }
else if ( dsn.
length() == 36 && dsn[8] ==
'-' && dsn[13] ==
'-' ) {
250 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
251 sc = connectDataIO( PFN, rw, gfal_name, technology, keep_open, connection );
253 if ( m_quarantine ) s_badFiles.
insert( dsn );
255 if ( m_quarantine ) s_badFiles.
insert( dsn );
256 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
257 error(
"connectDataIO> Failed to resolve FID:" + dsn,
false ).ignore();
258 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
262 auto appmgr = serviceLocator()->as<
IProperty>();
264 for (
auto i = files.
cbegin(); i != files.
cend(); ++i ) {
266 if ( i != files.
cbegin() ) {
267 log <<
MSG::WARNING <<
"Attempt to connect dsn:" << dsn <<
" with next entry in data federation:" << pfn
270 sc = connectDataIO( PFN, rw, pfn, technology, keep_open, connection );
272 if ( m_quarantine ) s_badFiles.
insert( pfn );
273 m_incSvc->fireIncident(
Incident( pfn, IncidentType::FailInputFile ) );
275 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
281 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn <<
" Federated file could not be resolved from " 283 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
288 auto j = m_fidMap.
find( dsn );
289 if ( j == m_fidMap.end() ) {
293 fid = m_catalog->lookupLFN( dsn );
295 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
296 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn <<
" Cannot access this dataset." <<
endmsg;
297 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
301 fid = m_catalog->lookupPFN( dsn );
302 if ( !fid.
empty() ) m_catalog->getPFN( fid, files );
303 if ( files.
empty() ) {
305 if ( fid.
empty() ) fid = m_catalog->createFID();
306 m_catalog->registerPFN( fid, dsn, technology );
307 log <<
MSG::INFO <<
"Referring to dataset " << dsn <<
" by its file ID:" << fid <<
endmsg;
319 auto fi = m_connectionMap.
find( fid );
320 if ( fi == m_connectionMap.end() ) {
321 connection->setFID( fid );
322 connection->setPFN( dsn );
323 auto e =
new Entry( technology, keep_open, rw, connection );
325 if ( !reconnect( e ).isSuccess() ) {
327 if ( m_quarantine ) s_badFiles.
insert( dsn );
328 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
329 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
330 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
332 fid = connection->fid();
333 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
335 if ( !m_disablePFNWarning && strcasecmp( dsn.
c_str(), fid.
c_str() ) == 0 ) {
336 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn <<
" by its physical name." <<
endmsg;
337 log <<
"You may not be able to navigate back to the input file" 338 <<
" -- processing continues" <<
endmsg;
341 m_connectionMap.emplace( fid, e );
345 if ( !reconnect( ( *fi ).second ).isSuccess() ) {
346 if ( m_quarantine ) s_badFiles.
insert( dsn );
347 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
348 error(
"connectDataIO> Cannot connect to database: PFN=" + dsn +
" FID=" + fid,
false ).ignore();
349 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
353 sc = connectDataIO( FID, rw, fid, technology, keep_open, connection );
356 }
else if ( typ == LFN ) {
357 m_fidMap[dataset] = fid;
361 error(
std::string(
"connectDataIO> Caught exception:" ) + e.
what(), false ).ignore();
362 }
catch ( ... ) { error(
std::string(
"connectDataIO> Caught unknown exception" ),
false ).ignore(); }
363 m_incSvc->fireIncident(
Incident( dsn, IncidentType::FailInputFile ) );
364 error(
"connectDataIO> The dataset " + dsn +
" cannot be opened.",
false ).ignore();
366 return StatusCode( IDataConnection::BAD_DATA_CONNECTION );
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
constexpr static const auto SUCCESS
def read(f, regex='.*', skipevents=0)
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
ABC describing basic data connection.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
const StatusCode & ignore() const
Ignore/check StatusCode.
Base class for all Incidents (computing events).
constexpr static const auto FAILURE
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.