Gaudi Framework, version v20r4

Generated: 8 Jan 2009

GFALDataStreamTool Class Reference

#include <GaudiGridSvc/GFALDataStreamTool.h>

Inheritance diagram for GFALDataStreamTool:

Inheritance graph
[legend]
Collaboration diagram for GFALDataStreamTool:

Collaboration graph
[legend]

List of all members.


Detailed Description

Author:
Andres Osorio

Greig Cowan

Date:
2006-09-29
Implementation of DataStreamTool using GFAL

Definition at line 34 of file GFALDataStreamTool.h.


Public Types

enum  Status { UNKNOWN = 0, STGING, DONE }
 Return status. More...

Public Member Functions

 GFALDataStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard constructor.
virtual ~GFALDataStreamTool ()
 Destructor.
virtual StatusCode initialize ()
 Initialization (from CONFIGURED to INITIALIZED).
virtual StatusCode finalize ()
 Finalize (from INITIALIZED to CONFIGURED).
virtual StatusCode addStreams (const StreamSpecs &)
virtual StatusCode getNextStream (const EventSelectorDataStream *&, size_type &)

Protected Member Functions

StatusCode updateStreams (const std::string &, const std::string &)

Private Member Functions

StatusCode setBlocks ()
StatusCode firstBlockStage ()
StatusCode nextBlockStage ()
StatusCode updateBlockStatus ()
StatusCode updateBlockStatus (GFALStageBlock *)
StatusCode updateStreamTurls ()
void addBlock (GFALStageBlock *)

Private Attributes

size_type m_turlCount
StreamSpecs m_streamTurls
std::vector< GFALStageBlock * > m_blocks
std::vector< GFALStageBlock * >
::iterator 
m_blockitr
long int m_pos
size_t m_blockindex
 index of the last staged block
size_t m_ntotblocks
 total number of blocks
size_t m_blocksize
 number of files in one stage block
size_t m_initstage
 number of blocks for the initial prestage
size_t m_readfile
 number of files (streams) read.
int m_atblock
Status m_task

Member Enumeration Documentation

Return status.

Enumerator:
UNKNOWN 
STGING 
DONE 

Reimplemented from IInterface.

Definition at line 37 of file GFALDataStreamTool.h.

00037 {UNKNOWN=0,STGING,DONE};


Constructor & Destructor Documentation

GFALDataStreamTool::GFALDataStreamTool ( const std::string &  type,
const std::string &  name,
const IInterface parent 
)

Standard constructor.

Definition at line 35 of file GFALDataStreamTool.cpp.

00038   : DataStreamTool ( type, name , parent )
00039 {
00040 
00041   declareProperty("BlockSize",m_blocksize = 1 );
00042   declareProperty("InitialStage",m_initstage = 1);
00043   
00044 }

GFALDataStreamTool::~GFALDataStreamTool (  )  [virtual]

Destructor.

Definition at line 49 of file GFALDataStreamTool.cpp.

00050 {
00051   
00052   std::vector<GFALStageBlock*>::iterator itr = m_blocks.begin() ;
00053   while( (itr != m_blocks.end()) ) {
00054     if ( (*itr) ) delete (*itr);
00055     ++itr;
00056   }
00057   
00058 } 


Member Function Documentation

StatusCode GFALDataStreamTool::initialize (  )  [virtual]

Initialization (from CONFIGURED to INITIALIZED).

Reimplemented from DataStreamTool.

Definition at line 60 of file GFALDataStreamTool.cpp.

00060                                           {
00061 
00062   MsgStream logger(msgSvc(), name());
00063 
00064   StatusCode status = DataStreamTool::initialize();
00065   if( !status.isSuccess() )  {
00066     logger << MSG::FATAL << "Error. Cannot initialize base class." << endreq;
00067     return status;
00068   }
00069   
00070   m_streamCount = 1;
00071   m_turlCount   = 0;
00072   
00073   m_atblock = 0;
00074   m_pos = 0;
00075   m_task = UNKNOWN;
00076   
00077   if (m_blocksize < 1) {
00078     logger << MSG::ERROR << "Wrong Block size" << endreq;
00079     logger << MSG::ERROR << "Prestaging disable" << endreq;
00080     return StatusCode::FAILURE;
00081   }
00082   
00083   return StatusCode::SUCCESS;
00084 }

StatusCode GFALDataStreamTool::finalize (  )  [virtual]

Finalize (from INITIALIZED to CONFIGURED).

Reimplemented from DataStreamTool.

Definition at line 132 of file GFALDataStreamTool.cpp.

00132                                         {
00133   
00134   StatusCode iret, status = StatusCode::SUCCESS;
00135   iret.ignore();
00136   
00137   MsgStream log(msgSvc(), name());
00138   
00139   log << MSG::DEBUG << "finalise() starts now!" << endmsg;
00140   
00141   for ( StreamSpecs::const_iterator i = m_streamTurls.begin(); i != m_streamTurls.end(); i++ )    {
00142     EventSelectorDataStream * s  = DataStreamTool::getStream(*i);
00143     if ( NULL == s )   {
00144       if ( s->isInitialized() )    {
00145         iret = finalizeStream(s);
00146         if ( !iret.isSuccess() ) {
00147           log << MSG::ERROR << "Failed to finalize stream " << *i << endmsg;
00148           status = iret;
00149         }
00150       }
00151     }
00152     
00153     iret =  eraseStream( *i );
00154     
00155     if ( !iret.isSuccess() )    {
00156       log << MSG::ERROR << "Failed to disconnect stream " << *i << endmsg;
00157       status = iret;
00158     }
00159     
00160   }
00161 
00162   m_streamSpecs.clear(); //The input specification are not needed anymore
00163   
00164   log << MSG::DEBUG << "finalise() total files processed " << m_streamID << endmsg;
00165 
00166   status = DataStreamTool::finalize();
00167   
00168   return status;
00169   
00170 }

StatusCode GFALDataStreamTool::addStreams ( const StreamSpecs inputs  )  [virtual]

initialize Stage

Reimplemented from DataStreamTool.

Definition at line 87 of file GFALDataStreamTool.cpp.

00087                                                                     {
00088   
00089   MsgStream logger(msgSvc(), name());
00090   
00091   StatusCode status = DataStreamTool::addStreams(inputs);
00092   
00095   
00096   m_ntotblocks = 0;
00097   status = setBlocks();
00098     
00099   if ( ! status.isSuccess() ){
00100     logger << MSG::ERROR << "Unable to set Blocks" << endreq;
00101     return StatusCode::FAILURE;
00102   }
00103   
00104   status = firstBlockStage();
00105     
00106   if ( ! status.isSuccess() ) {
00107     logger << MSG::ERROR << "Unable stage first Block" << endreq;
00108     return StatusCode::FAILURE;
00109   }
00110   
00111   //Stage first block of files until all files in the block are STAGED 
00112   m_task = STGING;
00113   m_blockitr=m_blocks.begin();
00114   
00115   while ( ((*m_blockitr)->getStatus() != GFALStageBlock::STGED) && ((*m_blockitr)->getStatus() != GFALStageBlock::FAILED) ) {
00116     status = updateBlockStatus();
00117   }
00118   
00120   
00121   m_streamTurls = m_streamSpecs;
00122 
00123   if( status.isFailure() ){
00124     return status;
00125   }
00126 
00127   m_streamCount = 1;
00128   return updateStreamTurls();
00129 
00130 }

StatusCode GFALDataStreamTool::getNextStream ( const EventSelectorDataStream *&  esds,
size_type dsid 
) [virtual]

Reimplemented from DataStreamTool.

Definition at line 172 of file GFALDataStreamTool.cpp.

00173 {
00174   
00175   EventSelectorDataStream * nextStream = getStream(dsid);
00176   if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached
00177 
00178   esds = nextStream;
00179 
00180   //Stage blocks request
00181   if ( m_task != DONE ) {
00182     
00183     if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED ) {
00184       nextBlockStage().ignore();
00185       updateStreamTurls().ignore();
00186     }
00187     updateBlockStatus().ignore();
00188   }
00189   //
00190 
00191   ++m_streamID;
00192   
00193   return StatusCode::SUCCESS;
00194   
00195 }

StatusCode GFALDataStreamTool::updateStreams ( const std::string &  oldinfo,
const std::string &  uptinfo 
) [protected]

Definition at line 424 of file GFALDataStreamTool.cpp.

00425 {
00426   
00427   Streams::iterator i = DataStreamTool::getStreamIterator(oldinfo);
00428   
00429   if ( i != m_streams.end() )   {
00430     (*i)->release();
00431     Streams::iterator p = m_streams.erase(i);
00432     char txt[32];
00433     std::string nam = name() + "_" + ::_itoa( ++m_streamCount, txt, 10);
00434     EventSelectorDataStream* s = 0;
00435     StatusCode status = DataStreamTool::createStream(nam, uptinfo, s);
00436     if ( status.isSuccess() )   {
00437       if ( 0 != s )   {
00438         s->addRef();
00439         m_streams.insert(p, s);
00440         return StatusCode::SUCCESS;
00441       }
00442     }
00443   }
00444   
00445   return StatusCode::FAILURE;
00446   
00447 }

StatusCode GFALDataStreamTool::setBlocks (  )  [private]

Definition at line 200 of file GFALDataStreamTool.cpp.

00201 {
00202   
00203   MsgStream log (msgSvc(),name());
00204   
00205   GFALStageBlock * btmp = new GFALStageBlock(m_blocksize,msgSvc());
00206   
00207   log << MSG::DEBUG << "Setting up the blocks of files to stage. Total files: " 
00208       << m_streamSpecs.size()
00209       << endreq;
00210   
00211   log << MSG::DEBUG << "Each block is set to have "
00212       << m_blocksize << " files."
00213       << endreq;
00214   
00215   StatusCode fstatus = StatusCode::SUCCESS;
00216   
00217   int addedfiles(0);
00218   
00219   for(StreamSpecs::const_iterator i = m_streamSpecs.begin() ;
00220       i != m_streamSpecs.end(); ++i) {
00221     
00222     GFALStageFile * ftmp = new GFALStageFile(*i,msgSvc());
00223 
00224     ftmp->initialize().ignore();
00225     
00226     if (ftmp->isValid()) {
00227       
00228       if ( addedfiles != (int) m_blocksize ) {
00229         fstatus = btmp->addFile(ftmp);
00230         ++addedfiles;
00231       }
00232       
00233       if ( addedfiles == (int) m_blocksize  && (&(*i)!= &m_streamSpecs.back()) ) {
00234         addBlock(btmp);
00235         log << MSG::DEBUG << "adding block" << endreq;
00236         btmp = new GFALStageBlock(m_blocksize,msgSvc());
00237         addedfiles = 0;
00238       }
00239       
00240       if (&(*i)==&m_streamSpecs.back()) {
00241         log << MSG::DEBUG << "adding last block" << endreq;        
00242         addBlock(btmp);
00243       }
00244       
00245     }
00246     else {
00247       
00248       log << MSG::DEBUG <<" The file: " 
00249           << ftmp->rawFileName()
00250           << "] does not exist."
00251           << endreq;
00252       fstatus = StatusCode::FAILURE;
00253       
00254     }
00255   }
00256   
00257   log << MSG::DEBUG << "setBlocks done." << endreq;
00258   
00259   return fstatus;
00260   
00261 }

StatusCode GFALDataStreamTool::firstBlockStage (  )  [private]

Definition at line 264 of file GFALDataStreamTool.cpp.

00264                                                {
00265   
00266   MsgStream log (msgSvc(),name());
00267   
00268   if (m_initstage==0) {
00269     log << MSG::DEBUG <<"No initial stage" << endreq;
00270     m_blockindex = 0 ;
00271   } else {
00272     log << MSG::DEBUG <<"Performing initial stage. Total number of blocks: " 
00273         << m_blocks.size() << endreq;
00274     m_blockitr = m_blocks.begin();
00275 
00276     log << MSG::DEBUG << "staging first block of files" << endreq;
00277     
00278     if ( m_blocks.size() > 0 ) {
00279       StatusCode status = (*m_blockitr)->Stage();
00280       if ( status.isFailure() ){
00281         log << MSG::ERROR << "firstBlockStage> unable to stage first block" << endreq;
00282         return status;
00283       }
00284     }
00285     else {
00286       log << MSG::DEBUG << "Block is empty: check input file format or grid certificate" << endreq;
00287       return StatusCode::FAILURE;
00288     }
00289     
00290   }
00291   
00292   log << MSG::DEBUG << "firstBlockStage> done." << endreq;
00293   
00294   return StatusCode::SUCCESS;
00295   
00296 }

StatusCode GFALDataStreamTool::nextBlockStage (  )  [private]

Definition at line 298 of file GFALDataStreamTool.cpp.

00298                                               {
00299   
00300   //Initial stage for each block.  
00301   
00302   MsgStream log (msgSvc(),name());
00303   
00304   if ( (*m_blockitr)->getStatus() != GFALStageBlock::STGED )
00305     return StatusCode::SUCCESS;
00306   
00307   ++m_blockitr;
00308   
00309   if (m_blockitr == m_blocks.end() ) {
00310 
00311     log << MSG::DEBUG << "nextBlockStage> Last block reached" << endreq;
00312     m_task = DONE;
00313     return StatusCode::SUCCESS;
00314     
00315   }
00316   
00317   StatusCode status = (*m_blockitr)->Stage();
00318   if ( status.isFailure() ){
00319     log << MSG::ERROR << "nextBlockStage> unable to stage block" << endreq;
00320     return status;
00321   }
00322   
00323   ++m_blockindex;
00324   
00325   log << MSG::DEBUG << "nextBlockStage> done." << endreq;
00326   
00327   return StatusCode::SUCCESS;
00328   
00329 }

StatusCode GFALDataStreamTool::updateBlockStatus (  )  [private]

Definition at line 332 of file GFALDataStreamTool.cpp.

00333 {
00334   
00335   MsgStream log (msgSvc(),name());
00336 
00337   if ( m_task == DONE ) return StatusCode::SUCCESS;
00338   
00339   log << MSG::DEBUG << "updateStatus> block" << endreq;
00340   
00341   (*m_blockitr)->updateStatus();
00342   
00343   return StatusCode::SUCCESS;
00344   
00345 }

StatusCode GFALDataStreamTool::updateBlockStatus ( GFALStageBlock ablock  )  [private]

Definition at line 347 of file GFALDataStreamTool.cpp.

00348 {
00349   
00350   MsgStream log (msgSvc(),name());
00351 
00352   if ( m_task == DONE ) return StatusCode::SUCCESS;
00353   
00354   log << MSG::DEBUG << "updateStatus> block" << endreq;
00355 
00356   ablock->updateStatus();
00357   
00358   return StatusCode::SUCCESS;
00359   
00360 }

StatusCode GFALDataStreamTool::updateStreamTurls (  )  [private]

Definition at line 372 of file GFALDataStreamTool.cpp.

00373 {
00374 
00375   MsgStream log (msgSvc(),name());
00376     
00377   StatusCode status;
00378   
00379   log << MSG::DEBUG << "updateStreamTurls>" << endreq;
00380 
00381   if ( m_task == DONE ) return StatusCode::SUCCESS;
00382     
00383   if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED) {
00384     
00385     std::vector<GFALStageFile*>::iterator stgfile = (*m_blockitr)->m_files.begin() ;
00386     
00387     while( (stgfile != (*m_blockitr)->m_files.end() ) ) {
00388       std::string turl = (*stgfile)->getTurl();
00389       if ( (*stgfile)->getStatus() == GFALStageFile::STGED  ) {
00390         int pos = m_streamTurls[m_turlCount].find("'",10);
00391         std::string tail = m_streamTurls[m_turlCount].erase(0,pos+1);
00392         std::string head;
00393         
00394         if ( ! (*stgfile)->isPFN() ) head = std::string("DATAFILE='gfal:") + turl + std::string("'");
00395         else {
00396           std::string pfnturl = turl.erase(0,8);
00397           head = std::string("DATAFILE='rfio:") + pfnturl + std::string("'");
00398         }
00399         
00400         m_streamTurls[m_turlCount] = head + tail;
00401         
00402         log << MSG::DEBUG << "Adding to m_streamTurl_" << m_turlCount << "\n"
00403             << m_streamTurls[m_turlCount] << endmsg;
00404         
00406         //Update the data stream list with the turl
00407         StatusCode fts = updateStreams( (*stgfile)->rawFileName(), m_streamTurls[m_turlCount] );
00408         
00409         if ( ! fts.isSuccess() ) log << MSG::DEBUG << "Failed to update Stream!" << endmsg;
00410         ++m_turlCount;
00411         
00412       }
00413       
00414       ++stgfile;
00415       
00416     }
00417     
00418   }
00419   
00420   return status;
00421   
00422 }

void GFALDataStreamTool::addBlock ( GFALStageBlock b  )  [private]

Definition at line 362 of file GFALDataStreamTool.cpp.

00363 {
00364   
00365   m_blocks.push_back(b);
00366   m_ntotblocks++;
00367   
00368   return ;
00369   
00370 }


Member Data Documentation

Definition at line 60 of file GFALDataStreamTool.h.

Definition at line 62 of file GFALDataStreamTool.h.

Definition at line 78 of file GFALDataStreamTool.h.

std::vector<GFALStageBlock*>::iterator GFALDataStreamTool::m_blockitr [private]

Definition at line 80 of file GFALDataStreamTool.h.

long int GFALDataStreamTool::m_pos [private]

Definition at line 82 of file GFALDataStreamTool.h.

index of the last staged block

Definition at line 85 of file GFALDataStreamTool.h.

total number of blocks

Definition at line 87 of file GFALDataStreamTool.h.

number of files in one stage block

Definition at line 89 of file GFALDataStreamTool.h.

number of blocks for the initial prestage

Definition at line 91 of file GFALDataStreamTool.h.

number of files (streams) read.

Definition at line 93 of file GFALDataStreamTool.h.

Definition at line 95 of file GFALDataStreamTool.h.

Definition at line 97 of file GFALDataStreamTool.h.


The documentation for this class was generated from the following files:

Generated at Thu Jan 8 17:51:17 2009 for Gaudi Framework, version v20r4 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004