Gaudi Framework, version v20r2

Generated: 18 Jul 2008

GFALDataStreamTool Class Reference

#include <GaudiGridSvc/GFALDataStreamTool.h>

Inheritance diagram for GFALDataStreamTool:

Inheritance graph
[legend]
Collaboration diagram for GFALDataStreamTool:

Collaboration graph
[legend]
List of all members.

Detailed Description

Author:
Andres Osorio

Author:
Greig Cowan
Date:
2006-09-29
Implementation of DataStreamTool using GFAL

Definition at line 34 of file GFALDataStreamTool.h.

Public Types

enum  Status { UNKNOWN = 0, STGING, DONE }
 Return status. More...

Public Member Functions

 GFALDataStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard constructor.
virtual ~GFALDataStreamTool ()
 Destructor.
virtual StatusCode initialize ()
 Initialization (from CONFIGURED to INITIALIZED).
virtual StatusCode finalize ()
 Finalize (from INITIALIZED to CONFIGURED).
virtual StatusCode addStreams (const StreamSpecs &)
virtual StatusCode getNextStream (const EventSelectorDataStream *&, size_type &)

Protected Member Functions

StatusCode updateStreams (const std::string &, const std::string &)

Private Member Functions

StatusCode setBlocks ()
StatusCode firstBlockStage ()
StatusCode nextBlockStage ()
StatusCode updateBlockStatus ()
StatusCode updateBlockStatus (GFALStageBlock *)
StatusCode updateStreamTurls ()
void addBlock (GFALStageBlock *)

Private Attributes

size_type m_turlCount
StreamSpecs m_streamTurls
std::vector< GFALStageBlock * > m_blocks
std::vector< GFALStageBlock
* >::iterator 
m_blockitr
long int m_pos
size_t m_blockindex
 index of the last staged block
size_t m_ntotblocks
 total number of blocks
size_t m_blocksize
 number of files in one stage block
size_t m_initstage
 number of blocks for the initial prestage
size_t m_readfile
 number of files (streams) read.
int m_atblock
Status m_task


Member Enumeration Documentation

enum GFALDataStreamTool::Status

Return status.

Enumerator:
UNKNOWN 
STGING 
DONE 

Reimplemented from IInterface.

Definition at line 37 of file GFALDataStreamTool.h.

00037 {UNKNOWN=0,STGING,DONE};


Constructor & Destructor Documentation

GFALDataStreamTool::GFALDataStreamTool ( const std::string type,
const std::string name,
const IInterface parent 
)

Standard constructor.

Definition at line 35 of file GFALDataStreamTool.cpp.

00038   : DataStreamTool ( type, name , parent )
00039 {
00040 
00041   declareProperty("BlockSize",m_blocksize = 1 );
00042   declareProperty("InitialStage",m_initstage = 1);
00043   
00044 }

GFALDataStreamTool::~GFALDataStreamTool (  )  [virtual]

Destructor.

Definition at line 49 of file GFALDataStreamTool.cpp.

References std::vector< _Tp, _Alloc >::begin(), std::vector< _Tp, _Alloc >::end(), and m_blocks.

00050 {
00051   
00052   std::vector<GFALStageBlock*>::iterator itr = m_blocks.begin() ;
00053   while( (itr != m_blocks.end()) ) {
00054     if ( (*itr) ) delete (*itr);
00055     ++itr;
00056   }
00057   
00058 } 


Member Function Documentation

StatusCode GFALDataStreamTool::initialize (  )  [virtual]

Initialization (from CONFIGURED to INITIALIZED).

Reimplemented from DataStreamTool.

Definition at line 60 of file GFALDataStreamTool.cpp.

References endreq(), MSG::ERROR, StatusCode::FAILURE, MSG::FATAL, DataStreamTool::initialize(), StatusCode::isSuccess(), apmon_utils::logger(), m_atblock, m_blocksize, m_pos, DataStreamTool::m_streamCount, m_task, m_turlCount, AlgTool::msgSvc(), AlgTool::name(), StatusCode::SUCCESS, and UNKNOWN.

00060                                           {
00061 
00062   MsgStream logger(msgSvc(), name());
00063 
00064   StatusCode status = DataStreamTool::initialize();
00065   if( !status.isSuccess() )  {
00066     logger << MSG::FATAL << "Error. Cannot initialize base class." << endreq;
00067     return status;
00068   }
00069   
00070   m_streamCount = 1;
00071   m_turlCount   = 0;
00072   
00073   m_atblock = 0;
00074   m_pos = 0;
00075   m_task = UNKNOWN;
00076   
00077   if (m_blocksize < 1) {
00078     logger << MSG::ERROR << "Wrong Block size" << endreq;
00079     logger << MSG::ERROR << "Prestaging disable" << endreq;
00080     return StatusCode::FAILURE;
00081   }
00082   
00083   return StatusCode::SUCCESS;
00084 }

StatusCode GFALDataStreamTool::finalize (  )  [virtual]

Finalize (from INITIALIZED to CONFIGURED).

Reimplemented from DataStreamTool.

Definition at line 132 of file GFALDataStreamTool.cpp.

References MSG::DEBUG, endmsg(), DataStreamTool::eraseStream(), MSG::ERROR, DataStreamTool::finalize(), DataStreamTool::finalizeStream(), DataStreamTool::getStream(), StatusCode::ignore(), StatusCode::isSuccess(), DataStreamTool::m_streamID, DataStreamTool::m_streamSpecs, m_streamTurls, AlgTool::msgSvc(), AlgTool::name(), Gaudi::Units::s, and StatusCode::SUCCESS.

00132                                         {
00133   
00134   StatusCode iret, status = StatusCode::SUCCESS;
00135   iret.ignore();
00136   
00137   MsgStream log(msgSvc(), name());
00138   
00139   log << MSG::DEBUG << "finalise() starts now!" << endmsg;
00140   
00141   for ( StreamSpecs::const_iterator i = m_streamTurls.begin(); i != m_streamTurls.end(); i++ )    {
00142     EventSelectorDataStream * s  = DataStreamTool::getStream(*i);
00143     if ( NULL == s )   {
00144       if ( s->isInitialized() )    {
00145         iret = finalizeStream(s);
00146         if ( !iret.isSuccess() ) {
00147           log << MSG::ERROR << "Failed to finalize stream " << *i << endmsg;
00148           status = iret;
00149         }
00150       }
00151     }
00152     
00153     iret =  eraseStream( *i );
00154     
00155     if ( !iret.isSuccess() )    {
00156       log << MSG::ERROR << "Failed to disconnect stream " << *i << endmsg;
00157       status = iret;
00158     }
00159     
00160   }
00161 
00162   m_streamSpecs.clear(); //The input specification are not needed anymore
00163   
00164   log << MSG::DEBUG << "finalise() total files processed " << m_streamID << endmsg;
00165 
00166   status = DataStreamTool::finalize();
00167   
00168   return status;
00169   
00170 }

StatusCode GFALDataStreamTool::addStreams ( const StreamSpecs  )  [virtual]

Reimplemented from DataStreamTool.

Definition at line 87 of file GFALDataStreamTool.cpp.

References DataStreamTool::addStreams(), std::vector< _Tp, _Alloc >::begin(), endreq(), MSG::ERROR, GFALStageBlock::FAILED, StatusCode::FAILURE, firstBlockStage(), StatusCode::isFailure(), StatusCode::isSuccess(), apmon_utils::logger(), m_blockitr, m_blocks, m_ntotblocks, DataStreamTool::m_streamCount, DataStreamTool::m_streamSpecs, m_streamTurls, m_task, AlgTool::msgSvc(), AlgTool::name(), setBlocks(), GFALStageBlock::STGED, STGING, updateBlockStatus(), and updateStreamTurls().

00087                                                                     {
00088   
00089   MsgStream logger(msgSvc(), name());
00090   
00091   StatusCode status = DataStreamTool::addStreams(inputs);
00092   
00095   
00096   m_ntotblocks = 0;
00097   status = setBlocks();
00098     
00099   if ( ! status.isSuccess() ){
00100     logger << MSG::ERROR << "Unable to set Blocks" << endreq;
00101     return StatusCode::FAILURE;
00102   }
00103   
00104   status = firstBlockStage();
00105     
00106   if ( ! status.isSuccess() ) {
00107     logger << MSG::ERROR << "Unable stage first Block" << endreq;
00108     return StatusCode::FAILURE;
00109   }
00110   
00111   //Stage first block of files until all files in the block are STAGED 
00112   m_task = STGING;
00113   m_blockitr=m_blocks.begin();
00114   
00115   while ( ((*m_blockitr)->getStatus() != GFALStageBlock::STGED) && ((*m_blockitr)->getStatus() != GFALStageBlock::FAILED) ) {
00116     status = updateBlockStatus();
00117   }
00118   
00120   
00121   m_streamTurls = m_streamSpecs;
00122 
00123   if( status.isFailure() ){
00124     return status;
00125   }
00126 
00127   m_streamCount = 1;
00128   return updateStreamTurls();
00129 
00130 }

StatusCode GFALDataStreamTool::getNextStream ( const EventSelectorDataStream *&  ,
size_type  
) [virtual]

Reimplemented from DataStreamTool.

Definition at line 172 of file GFALDataStreamTool.cpp.

References DONE, StatusCode::FAILURE, DataStreamTool::getStream(), StatusCode::ignore(), DataStreamTool::m_streamID, m_task, nextBlockStage(), GFALStageBlock::STGED, StatusCode::SUCCESS, updateBlockStatus(), and updateStreamTurls().

00173 {
00174   
00175   EventSelectorDataStream * nextStream = getStream(dsid);
00176   if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached
00177 
00178   esds = nextStream;
00179 
00180   //Stage blocks request
00181   if ( m_task != DONE ) {
00182     
00183     if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED ) {
00184       nextBlockStage().ignore();
00185       updateStreamTurls().ignore();
00186     }
00187     updateBlockStatus().ignore();
00188   }
00189   //
00190 
00191   ++m_streamID;
00192   
00193   return StatusCode::SUCCESS;
00194   
00195 }

StatusCode GFALDataStreamTool::updateStreams ( const std::string ,
const std::string  
) [protected]

Definition at line 424 of file GFALDataStreamTool.cpp.

References _itoa(), DataStreamTool::createStream(), std::vector< _Tp, _Alloc >::end(), std::vector< _Tp, _Alloc >::erase(), StatusCode::FAILURE, DataStreamTool::getStreamIterator(), std::vector< _Tp, _Alloc >::insert(), StatusCode::isSuccess(), DataStreamTool::m_streamCount, DataStreamTool::m_streams, AlgTool::name(), Gaudi::Units::s, and StatusCode::SUCCESS.

Referenced by updateStreamTurls().

00425 {
00426   
00427   Streams::iterator i = DataStreamTool::getStreamIterator(oldinfo);
00428   
00429   if ( i != m_streams.end() )   {
00430     (*i)->release();
00431     Streams::iterator p = m_streams.erase(i);
00432     char txt[32];
00433     std::string nam = name() + "_" + ::_itoa( ++m_streamCount, txt, 10);
00434     EventSelectorDataStream* s = 0;
00435     StatusCode status = DataStreamTool::createStream(nam, uptinfo, s);
00436     if ( status.isSuccess() )   {
00437       if ( 0 != s )   {
00438         s->addRef();
00439         m_streams.insert(p, s);
00440         return StatusCode::SUCCESS;
00441       }
00442     }
00443   }
00444   
00445   return StatusCode::FAILURE;
00446   
00447 }

StatusCode GFALDataStreamTool::setBlocks (  )  [private]

Definition at line 200 of file GFALDataStreamTool.cpp.

References addBlock(), GFALStageBlock::addFile(), MSG::DEBUG, endreq(), StatusCode::FAILURE, m_blocksize, DataStreamTool::m_streamSpecs, AlgTool::msgSvc(), AlgTool::name(), and StatusCode::SUCCESS.

Referenced by addStreams().

00201 {
00202   
00203   MsgStream log (msgSvc(),name());
00204   
00205   GFALStageBlock * btmp = new GFALStageBlock(m_blocksize,msgSvc());
00206   
00207   log << MSG::DEBUG << "Setting up the blocks of files to stage. Total files: " 
00208       << m_streamSpecs.size()
00209       << endreq;
00210   
00211   log << MSG::DEBUG << "Each block is set to have "
00212       << m_blocksize << " files."
00213       << endreq;
00214   
00215   StatusCode fstatus = StatusCode::SUCCESS;
00216   
00217   int addedfiles(0);
00218   
00219   for(StreamSpecs::const_iterator i = m_streamSpecs.begin() ;
00220       i != m_streamSpecs.end(); ++i) {
00221     
00222     GFALStageFile * ftmp = new GFALStageFile(*i,msgSvc());
00223 
00224     ftmp->initialize().ignore();
00225     
00226     if (ftmp->isValid()) {
00227       
00228       if ( addedfiles != (int) m_blocksize ) {
00229         fstatus = btmp->addFile(ftmp);
00230         ++addedfiles;
00231       }
00232       
00233       if ( addedfiles == (int) m_blocksize  && (&(*i)!= &m_streamSpecs.back()) ) {
00234         addBlock(btmp);
00235         log << MSG::DEBUG << "adding block" << endreq;
00236         btmp = new GFALStageBlock(m_blocksize,msgSvc());
00237         addedfiles = 0;
00238       }
00239       
00240       if (&(*i)==&m_streamSpecs.back()) {
00241         log << MSG::DEBUG << "adding last block" << endreq;        
00242         addBlock(btmp);
00243       }
00244       
00245     }
00246     else {
00247       
00248       log << MSG::DEBUG <<" The file: " 
00249           << ftmp->rawFileName()
00250           << "] does not exist."
00251           << endreq;
00252       fstatus = StatusCode::FAILURE;
00253       
00254     }
00255   }
00256   
00257   log << MSG::DEBUG << "setBlocks done." << endreq;
00258   
00259   return fstatus;
00260   
00261 }

StatusCode GFALDataStreamTool::firstBlockStage (  )  [private]

Definition at line 264 of file GFALDataStreamTool.cpp.

References std::vector< _Tp, _Alloc >::begin(), MSG::DEBUG, endreq(), MSG::ERROR, StatusCode::FAILURE, m_blockindex, m_blockitr, m_blocks, m_initstage, AlgTool::msgSvc(), AlgTool::name(), std::vector< _Tp, _Alloc >::size(), and StatusCode::SUCCESS.

Referenced by addStreams().

00264                                                {
00265   
00266   MsgStream log (msgSvc(),name());
00267   
00268   if (m_initstage==0) {
00269     log << MSG::DEBUG <<"No initial stage" << endreq;
00270     m_blockindex = 0 ;
00271   } else {
00272     log << MSG::DEBUG <<"Performing initial stage. Total number of blocks: " 
00273         << m_blocks.size() << endreq;
00274     m_blockitr = m_blocks.begin();
00275 
00276     log << MSG::DEBUG << "staging first block of files" << endreq;
00277     
00278     if ( m_blocks.size() > 0 ) {
00279       StatusCode status = (*m_blockitr)->Stage();
00280       if ( status.isFailure() ){
00281         log << MSG::ERROR << "firstBlockStage> unable to stage first block" << endreq;
00282         return status;
00283       }
00284     }
00285     else {
00286       log << MSG::DEBUG << "Block is empty: check input file format or grid certificate" << endreq;
00287       return StatusCode::FAILURE;
00288     }
00289     
00290   }
00291   
00292   log << MSG::DEBUG << "firstBlockStage> done." << endreq;
00293   
00294   return StatusCode::SUCCESS;
00295   
00296 }

StatusCode GFALDataStreamTool::nextBlockStage (  )  [private]

Definition at line 298 of file GFALDataStreamTool.cpp.

References MSG::DEBUG, DONE, std::vector< _Tp, _Alloc >::end(), endreq(), MSG::ERROR, StatusCode::isFailure(), m_blockindex, m_blockitr, m_blocks, m_task, AlgTool::msgSvc(), AlgTool::name(), GFALStageBlock::STGED, and StatusCode::SUCCESS.

Referenced by getNextStream().

00298                                               {
00299   
00300   //Initial stage for each block.  
00301   
00302   MsgStream log (msgSvc(),name());
00303   
00304   if ( (*m_blockitr)->getStatus() != GFALStageBlock::STGED )
00305     return StatusCode::SUCCESS;
00306   
00307   ++m_blockitr;
00308   
00309   if (m_blockitr == m_blocks.end() ) {
00310 
00311     log << MSG::DEBUG << "nextBlockStage> Last block reached" << endreq;
00312     m_task = DONE;
00313     return StatusCode::SUCCESS;
00314     
00315   }
00316   
00317   StatusCode status = (*m_blockitr)->Stage();
00318   if ( status.isFailure() ){
00319     log << MSG::ERROR << "nextBlockStage> unable to stage block" << endreq;
00320     return status;
00321   }
00322   
00323   ++m_blockindex;
00324   
00325   log << MSG::DEBUG << "nextBlockStage> done." << endreq;
00326   
00327   return StatusCode::SUCCESS;
00328   
00329 }

StatusCode GFALDataStreamTool::updateBlockStatus (  )  [private]

Definition at line 332 of file GFALDataStreamTool.cpp.

References MSG::DEBUG, DONE, endreq(), m_task, AlgTool::msgSvc(), AlgTool::name(), and StatusCode::SUCCESS.

Referenced by addStreams(), and getNextStream().

00333 {
00334   
00335   MsgStream log (msgSvc(),name());
00336 
00337   if ( m_task == DONE ) return StatusCode::SUCCESS;
00338   
00339   log << MSG::DEBUG << "updateStatus> block" << endreq;
00340   
00341   (*m_blockitr)->updateStatus();
00342   
00343   return StatusCode::SUCCESS;
00344   
00345 }

StatusCode GFALDataStreamTool::updateBlockStatus ( GFALStageBlock  )  [private]

Definition at line 347 of file GFALDataStreamTool.cpp.

References MSG::DEBUG, DONE, endreq(), m_task, AlgTool::msgSvc(), AlgTool::name(), StatusCode::SUCCESS, and GFALStageBlock::updateStatus().

00348 {
00349   
00350   MsgStream log (msgSvc(),name());
00351 
00352   if ( m_task == DONE ) return StatusCode::SUCCESS;
00353   
00354   log << MSG::DEBUG << "updateStatus> block" << endreq;
00355 
00356   ablock->updateStatus();
00357   
00358   return StatusCode::SUCCESS;
00359   
00360 }

StatusCode GFALDataStreamTool::updateStreamTurls (  )  [private]

Definition at line 372 of file GFALDataStreamTool.cpp.

References MSG::DEBUG, DONE, endmsg(), endreq(), std::basic_string< _CharT, _Traits, _Alloc >::erase(), find(), StatusCode::isSuccess(), m_streamTurls, m_task, m_turlCount, AlgTool::msgSvc(), AlgTool::name(), GFALStageFile::STGED, GFALStageBlock::STGED, StatusCode::SUCCESS, and updateStreams().

Referenced by addStreams(), and getNextStream().

00373 {
00374 
00375   MsgStream log (msgSvc(),name());
00376     
00377   StatusCode status;
00378   
00379   log << MSG::DEBUG << "updateStreamTurls>" << endreq;
00380 
00381   if ( m_task == DONE ) return StatusCode::SUCCESS;
00382     
00383   if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED) {
00384     
00385     std::vector<GFALStageFile*>::iterator stgfile = (*m_blockitr)->m_files.begin() ;
00386     
00387     while( (stgfile != (*m_blockitr)->m_files.end() ) ) {
00388       std::string turl = (*stgfile)->getTurl();
00389       if ( (*stgfile)->getStatus() == GFALStageFile::STGED  ) {
00390         int pos = m_streamTurls[m_turlCount].find("'",10);
00391         std::string tail = m_streamTurls[m_turlCount].erase(0,pos+1);
00392         std::string head;
00393         
00394         if ( ! (*stgfile)->isPFN() ) head = std::string("DATAFILE='gfal:") + turl + std::string("'");
00395         else {
00396           std::string pfnturl = turl.erase(0,8);
00397           head = std::string("DATAFILE='rfio:") + pfnturl + std::string("'");
00398         }
00399         
00400         m_streamTurls[m_turlCount] = head + tail;
00401         
00402         log << MSG::DEBUG << "Adding to m_streamTurl_" << m_turlCount << "\n"
00403             << m_streamTurls[m_turlCount] << endmsg;
00404         
00406         //Update the data stream list with the turl
00407         StatusCode fts = updateStreams( (*stgfile)->rawFileName(), m_streamTurls[m_turlCount] );
00408         
00409         if ( ! fts.isSuccess() ) log << MSG::DEBUG << "Failed to update Stream!" << endmsg;
00410         ++m_turlCount;
00411         
00412       }
00413       
00414       ++stgfile;
00415       
00416     }
00417     
00418   }
00419   
00420   return status;
00421   
00422 }

void GFALDataStreamTool::addBlock ( GFALStageBlock  )  [private]

Definition at line 362 of file GFALDataStreamTool.cpp.

References m_blocks, m_ntotblocks, std::vector< _Tp, _Alloc >::push_back(), and return.

Referenced by setBlocks().

00363 {
00364   
00365   m_blocks.push_back(b);
00366   m_ntotblocks++;
00367   
00368   return ;
00369   
00370 }


Member Data Documentation

size_type GFALDataStreamTool::m_turlCount [private]

Definition at line 60 of file GFALDataStreamTool.h.

Referenced by initialize(), and updateStreamTurls().

StreamSpecs GFALDataStreamTool::m_streamTurls [private]

Definition at line 62 of file GFALDataStreamTool.h.

Referenced by addStreams(), finalize(), and updateStreamTurls().

std::vector<GFALStageBlock*> GFALDataStreamTool::m_blocks [private]

Definition at line 78 of file GFALDataStreamTool.h.

Referenced by addBlock(), addStreams(), firstBlockStage(), nextBlockStage(), and ~GFALDataStreamTool().

std::vector<GFALStageBlock*>::iterator GFALDataStreamTool::m_blockitr [private]

Definition at line 80 of file GFALDataStreamTool.h.

Referenced by addStreams(), firstBlockStage(), and nextBlockStage().

long int GFALDataStreamTool::m_pos [private]

Definition at line 82 of file GFALDataStreamTool.h.

Referenced by initialize().

size_t GFALDataStreamTool::m_blockindex [private]

index of the last staged block

Definition at line 85 of file GFALDataStreamTool.h.

Referenced by firstBlockStage(), and nextBlockStage().

size_t GFALDataStreamTool::m_ntotblocks [private]

total number of blocks

Definition at line 87 of file GFALDataStreamTool.h.

Referenced by addBlock(), and addStreams().

size_t GFALDataStreamTool::m_blocksize [private]

number of files in one stage block

Definition at line 89 of file GFALDataStreamTool.h.

Referenced by initialize(), and setBlocks().

size_t GFALDataStreamTool::m_initstage [private]

number of blocks for the initial prestage

Definition at line 91 of file GFALDataStreamTool.h.

Referenced by firstBlockStage().

size_t GFALDataStreamTool::m_readfile [private]

number of files (streams) read.

Definition at line 93 of file GFALDataStreamTool.h.

int GFALDataStreamTool::m_atblock [private]

Definition at line 95 of file GFALDataStreamTool.h.

Referenced by initialize().

Status GFALDataStreamTool::m_task [private]

Definition at line 97 of file GFALDataStreamTool.h.

Referenced by addStreams(), getNextStream(), initialize(), nextBlockStage(), updateBlockStatus(), and updateStreamTurls().


The documentation for this class was generated from the following files:
Generated at Fri Jul 18 12:07:41 2008 for Gaudi Framework, version v20r2 by Doxygen version 1.5.1 written by Dimitri van Heesch, © 1997-2004