16 constexpr
struct select2nd_t {
17 template<
typename S,
typename T>
18 const T& operator()(
const std::pair<S,T>& p)
const {
return p.second; }
21 template <
typename InputIterator,
typename OutputIterator,
22 typename UnaryOperation,
typename UnaryPredicate>
23 OutputIterator transform_copy_if( InputIterator first, InputIterator last,
24 OutputIterator result,
26 UnaryPredicate pred) {
27 while (first != last) {
28 auto val = op(*first);
29 if (pred(val)) *result++ =
std::move(val);
36 using namespace Gaudi;
54 m_catalog = serviceLocator()->service(m_catalogSvcName);
57 <<
"Unable to localize interface IFileCatalog from service:" 58 << m_catalogSvcName <<
endmsg;
61 m_incSvc = serviceLocator()->service(
"IncidentSvc");
84 IODataManager::Connections IODataManager::connections(
const IInterface* owner)
const {
88 [](ConnectionMap::const_reference i ) {
return i.second->connection; },
94 StatusCode IODataManager::connectRead(
bool keep_open, Connection* con) {
95 if ( !establishConnection(con) ) {
99 return error(
"Failed to connect to data:"+dsn,
false);
103 StatusCode IODataManager::connectWrite(Connection* con,IoType mode,
CSTR doctype) {
104 if ( !establishConnection(con) ) {
105 return connectDataIO(
UNKNOWN,mode,con->name(),doctype,
true,con);
108 return error(
"Failed to connect to data:"+dsn,
false);
112 StatusCode IODataManager::read(Connection* con,
void*
const data,
size_t len) {
113 return establishConnection(con).
isSuccess() ? con->read(data,len) :
S_ERROR;
118 return establishConnection(con).isSuccess() ? con->write(data,len) :
S_ERROR;
122 long long int IODataManager::seek(Connection* con,
long long int where,
int origin) {
123 return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
131 if ( ::strncasecmp(dsn.
c_str(),
"FID:",4)==0 )
133 else if ( ::strncasecmp(dsn.
c_str(),
"LFN:",4)==0 )
135 else if ( ::strncasecmp(dsn.
c_str(),
"PFN:",4)==0 )
138 auto j = m_fidMap.find(dataset);
139 if ( j != m_fidMap.end() ) {
142 auto i=m_connectionMap.
find(fid);
144 if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
146 if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
148 if ( i != m_connectionMap.end() && i->second ) {
150 if ( (j=m_fidMap.find(c->
pfn())) != m_fidMap.end() )
155 log <<
MSG::INFO <<
"Disconnect from dataset " << dsn
156 <<
" [" << fid <<
"]" <<
endmsg;
159 m_connectionMap.erase(i);
174 case Connection::UPDATE:
176 case Connection::RECREATE:
193 if ( !to_retire.
empty() ) {
199 log <<
MSG::INFO <<
"Disconnect from dataset " << c->
pfn()
210 auto j = m_fidMap.find(dataset);
211 if ( j == m_fidMap.end() )
return nullptr;
212 auto i=m_connectionMap.find(j->second);
213 return (i != m_connectionMap.end()) ? i->second->connection :
nullptr;
216 StatusCode IODataManager::establishConnection(Connection* con) {
217 if ( !con )
return error(
"Severe logic bug: No connection object avalible.",
true);
219 if ( con->isConnected() ) {
223 auto i=m_connectionMap.find(con->name());
224 if ( i != m_connectionMap.end() ) {
225 Connection*
c = i->second->connection;
227 m_incSvc->fireIncident(
Incident(con->name(),IncidentType::FailInputFile));
228 return error(
"Severe logic bug: Twice identical connection object for DSN:"+con->name(),
true);
230 if ( reconnect(i->second).isSuccess() )
return S_OK;
236 IODataManager::connectDataIO(
int typ, IoType rw,
CSTR dataset,
CSTR technology,
bool keep_open,Connection* connection) {
241 if ( ::strncasecmp(dsn.
c_str(),
"FID:",4)==0 )
242 dsn = dataset.
substr(4), typ = FID;
243 else if ( ::strncasecmp(dsn.
c_str(),
"LFN:",4)==0 )
244 dsn = dataset.
substr(4), typ = LFN;
245 else if ( ::strncasecmp(dsn.
c_str(),
"PFN:",4)==0 )
246 dsn = dataset.
substr(4), typ = PFN;
248 return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);
251 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
252 return IDataConnection::BAD_DATA_CONNECTION;
255 auto fi = m_connectionMap.find(dsn);
256 if ( fi == m_connectionMap.end() ) {
258 m_catalog->getPFN(dsn,files);
259 if ( files.
empty() ) {
261 if ( m_quarantine ) s_badFiles.
insert(dsn);
262 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
263 error(
"connectDataIO> failed to resolve FID:"+dsn,
false).ignore();
264 return IDataConnection::BAD_DATA_CONNECTION;
266 else if ( dsn.
length() == 36 && dsn[8]==
'-' && dsn[13]==
'-' ) {
268 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
269 sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
271 if ( m_quarantine ) s_badFiles.
insert(dsn);
273 if ( m_quarantine ) s_badFiles.
insert(dsn);
274 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
275 error(
"connectDataIO> Failed to resolve FID:"+dsn,
false).ignore();
276 return IDataConnection::BAD_DATA_CONNECTION;
280 auto appmgr = serviceLocator()->as<
IProperty>();
282 for(
auto i=files.
cbegin(); i!=files.
cend(); ++i) {
284 if ( i != files.
cbegin() ) {
286 <<
" with next entry in data federation:" << pfn <<
"." <<
endmsg;
288 sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
290 if ( m_quarantine ) s_badFiles.
insert(pfn);
291 m_incSvc->fireIncident(
Incident(pfn,IncidentType::FailInputFile));
294 m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
300 log <<
MSG::ERROR <<
"Failed to open dsn:" << dsn
301 <<
" Federated file could not be resolved from " 303 return IDataConnection::BAD_DATA_CONNECTION;
308 auto j = m_fidMap.
find(dsn);
309 if ( j == m_fidMap.end() ) {
313 fid = m_catalog->lookupLFN(dsn);
315 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
316 log <<
MSG::ERROR <<
"Failed to resolve LFN:" << dsn
317 <<
" Cannot access this dataset." <<
endmsg;
318 return IDataConnection::BAD_DATA_CONNECTION;
322 fid = m_catalog->lookupPFN(dsn);
323 if ( !fid.
empty() ) m_catalog->getPFN(fid, files);
324 if ( files.
empty() ) {
326 if ( fid.
empty() ) fid = m_catalog->createFID();
327 m_catalog->registerPFN(fid,dsn,technology);
328 log <<
MSG::INFO <<
"Referring to dataset " << dsn
329 <<
" by its file ID:" << fid <<
endmsg;
343 auto fi = m_connectionMap.
find(fid);
344 if ( fi == m_connectionMap.end() ) {
345 connection->setFID(fid);
346 connection->setPFN(dsn);
347 auto e =
new Entry(technology, keep_open, rw, connection);
349 if ( !reconnect(e).isSuccess() ) {
351 if ( m_quarantine ) s_badFiles.
insert(dsn);
352 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
353 error(
"connectDataIO> Cannot connect to database: PFN="+dsn+
" FID="+fid,
false).ignore();
354 return IDataConnection::BAD_DATA_CONNECTION;
356 fid = connection->fid();
357 m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
359 if ( ! m_disablePFNWarning && strcasecmp(dsn.
c_str(),fid.
c_str()) == 0 ) {
360 log <<
MSG::ERROR <<
"Referring to existing dataset " << dsn
361 <<
" by its physical name." <<
endmsg;
362 log <<
"You may not be able to navigate back to the input file" 363 <<
" -- processing continues" <<
endmsg;
366 m_connectionMap.emplace( fid, e );
370 if ( !reconnect((*fi).second).isSuccess() ) {
371 if ( m_quarantine ) s_badFiles.
insert(dsn);
372 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
373 error(
"connectDataIO> Cannot connect to database: PFN="+dsn+
" FID="+fid,
false).ignore();
374 return IDataConnection::BAD_DATA_CONNECTION;
378 sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
382 else if ( typ == LFN ) {
383 m_fidMap[dataset] = fid;
388 error(
std::string(
"connectDataIO> Caught exception:")+e.
what(),
false).ignore();
391 error(
std::string(
"connectDataIO> Caught unknown exception"),
false).ignore();
393 m_incSvc->fireIncident(
Incident(dsn,IncidentType::FailInputFile));
394 error(
"connectDataIO> The dataset "+dsn+
" cannot be opened.",
false).ignore();
396 return IDataConnection::BAD_DATA_CONNECTION;
int getAppReturnCode(const SmartIF< IProperty > &appmgr)
Get the application (current) return code.
Definition of the MsgStream class used to transmit messages.
StatusCode initialize() override
virtual StatusCode connectWrite(IoType type)=0
Open data stream in write mode.
virtual StatusCode disconnect()=0
Release data stream.
const std::string & fid() const
Access file id.
StatusCode finalize() override
bool isSuccess() const
Test for a status code of SUCCESS.
GAUDI_API long breakExecution()
Break the execution of the application and invoke the debugger.
const std::string & pfn() const
Access physical file name.
#define DECLARE_COMPONENT(type)
virtual bool isConnected() const =0
Check if connected to data source.
int ageFile()
Increase age of I/O source.
This class is used for returning status codes from appropriate routines.
Definition of the basic interface.
StatusCode setAppReturnCode(SmartIF< IProperty > &appmgr, int value, bool force=false)
Set the application return code.
ABC describing basic data connection.
T back_inserter(T...args)
IDataConnection * connection
void resetAge()
Reset age.
Base class for all Incidents (computing events).
The IProperty is the basic interface for all components which have properties that can be set or get...
virtual StatusCode connectRead()=0
Open data stream in read mode.
Helper functions to set/get the application return code.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.