Gaudi Framework, version v20r4

Generated: 8 Jan 2009

GFALDataStreamTool.cpp

Go to the documentation of this file.
00001 // $Id: GFALDataStreamTool.cpp,v 1.3 2007/07/11 08:23:42 marcocle Exp $
00002 // Include files 
00003 
00004 // from Gaudi
00005 #include "GaudiKernel/DeclareFactoryEntries.h" 
00006 #include "GaudiKernel/MsgStream.h"
00007 #include "GaudiKernel/xtoa.h"
00008 #include "GaudiKernel/SmartIF.h"
00009 #include "GaudiKernel/Incident.h"
00010 #include "GaudiKernel/SvcFactory.h"
00011 #include "GaudiKernel/Tokenizer.h"
00012 #include "GaudiKernel/MsgStream.h"
00013 #include "GaudiKernel/IIncidentSvc.h"
00014 #include "GaudiKernel/ISvcLocator.h"
00015 #include "GaudiKernel/ISvcManager.h"
00016 #include "GaudiKernel/IAddressCreator.h"
00017 
00018 // local
00019 #include "GFALDataStreamTool.h"
00020 
00021 //-----------------------------------------------------------------------------
00022 // Implementation file for class : GFALDataStreamTool
00023 //
00024 // 2006-09-29 : Andres Osorio
00025 //            : Greig Cowan
00026 //-----------------------------------------------------------------------------
00027 
00028 // Declaration of the Tool Factory
00029 DECLARE_TOOL_FACTORY( GFALDataStreamTool )
00030 
00031 
00032 //=============================================================================
00033 // Standard constructor, initializes variables
00034 //=============================================================================
00035 GFALDataStreamTool::GFALDataStreamTool( const std::string& type,
00036                                         const std::string& name,
00037                                         const IInterface* parent )
00038   : DataStreamTool ( type, name , parent )
00039 {
00040 
00041   declareProperty("BlockSize",m_blocksize = 1 );
00042   declareProperty("InitialStage",m_initstage = 1);
00043   
00044 }
00045 
00046 //=============================================================================
00047 // Destructor
00048 //=============================================================================
00049 GFALDataStreamTool::~GFALDataStreamTool() 
00050 {
00051   
00052   std::vector<GFALStageBlock*>::iterator itr = m_blocks.begin() ;
00053   while( (itr != m_blocks.end()) ) {
00054     if ( (*itr) ) delete (*itr);
00055     ++itr;
00056   }
00057   
00058 } 
00059 
00060 StatusCode GFALDataStreamTool::initialize() {
00061 
00062   MsgStream logger(msgSvc(), name());
00063 
00064   StatusCode status = DataStreamTool::initialize();
00065   if( !status.isSuccess() )  {
00066     logger << MSG::FATAL << "Error. Cannot initialize base class." << endreq;
00067     return status;
00068   }
00069   
00070   m_streamCount = 1;
00071   m_turlCount   = 0;
00072   
00073   m_atblock = 0;
00074   m_pos = 0;
00075   m_task = UNKNOWN;
00076   
00077   if (m_blocksize < 1) {
00078     logger << MSG::ERROR << "Wrong Block size" << endreq;
00079     logger << MSG::ERROR << "Prestaging disable" << endreq;
00080     return StatusCode::FAILURE;
00081   }
00082   
00083   return StatusCode::SUCCESS;
00084 }
00085 
00086 //=============================================================================
00087 StatusCode GFALDataStreamTool::addStreams(const StreamSpecs & inputs) {
00088   
00089   MsgStream logger(msgSvc(), name());
00090   
00091   StatusCode status = DataStreamTool::addStreams(inputs);
00092   
00095   
00096   m_ntotblocks = 0;
00097   status = setBlocks();
00098     
00099   if ( ! status.isSuccess() ){
00100     logger << MSG::ERROR << "Unable to set Blocks" << endreq;
00101     return StatusCode::FAILURE;
00102   }
00103   
00104   status = firstBlockStage();
00105     
00106   if ( ! status.isSuccess() ) {
00107     logger << MSG::ERROR << "Unable stage first Block" << endreq;
00108     return StatusCode::FAILURE;
00109   }
00110   
00111   //Stage first block of files until all files in the block are STAGED 
00112   m_task = STGING;
00113   m_blockitr=m_blocks.begin();
00114   
00115   while ( ((*m_blockitr)->getStatus() != GFALStageBlock::STGED) && ((*m_blockitr)->getStatus() != GFALStageBlock::FAILED) ) {
00116     status = updateBlockStatus();
00117   }
00118   
00120   
00121   m_streamTurls = m_streamSpecs;
00122 
00123   if( status.isFailure() ){
00124     return status;
00125   }
00126 
00127   m_streamCount = 1;
00128   return updateStreamTurls();
00129 
00130 }
00131 
00132 StatusCode GFALDataStreamTool::finalize() {
00133   
00134   StatusCode iret, status = StatusCode::SUCCESS;
00135   iret.ignore();
00136   
00137   MsgStream log(msgSvc(), name());
00138   
00139   log << MSG::DEBUG << "finalise() starts now!" << endmsg;
00140   
00141   for ( StreamSpecs::const_iterator i = m_streamTurls.begin(); i != m_streamTurls.end(); i++ )    {
00142     EventSelectorDataStream * s  = DataStreamTool::getStream(*i);
00143     if ( NULL == s )   {
00144       if ( s->isInitialized() )    {
00145         iret = finalizeStream(s);
00146         if ( !iret.isSuccess() ) {
00147           log << MSG::ERROR << "Failed to finalize stream " << *i << endmsg;
00148           status = iret;
00149         }
00150       }
00151     }
00152     
00153     iret =  eraseStream( *i );
00154     
00155     if ( !iret.isSuccess() )    {
00156       log << MSG::ERROR << "Failed to disconnect stream " << *i << endmsg;
00157       status = iret;
00158     }
00159     
00160   }
00161 
00162   m_streamSpecs.clear(); //The input specification are not needed anymore
00163   
00164   log << MSG::DEBUG << "finalise() total files processed " << m_streamID << endmsg;
00165 
00166   status = DataStreamTool::finalize();
00167   
00168   return status;
00169   
00170 }
00171 
00172 StatusCode GFALDataStreamTool::getNextStream( const EventSelectorDataStream * & esds, size_type & dsid )
00173 {
00174   
00175   EventSelectorDataStream * nextStream = getStream(dsid);
00176   if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached
00177 
00178   esds = nextStream;
00179 
00180   //Stage blocks request
00181   if ( m_task != DONE ) {
00182     
00183     if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED ) {
00184       nextBlockStage().ignore();
00185       updateStreamTurls().ignore();
00186     }
00187     updateBlockStatus().ignore();
00188   }
00189   //
00190 
00191   ++m_streamID;
00192   
00193   return StatusCode::SUCCESS;
00194   
00195 }
00196 
00198 // Stage files functionality
00199 
00200 StatusCode GFALDataStreamTool::setBlocks()
00201 {
00202   
00203   MsgStream log (msgSvc(),name());
00204   
00205   GFALStageBlock * btmp = new GFALStageBlock(m_blocksize,msgSvc());
00206   
00207   log << MSG::DEBUG << "Setting up the blocks of files to stage. Total files: " 
00208       << m_streamSpecs.size()
00209       << endreq;
00210   
00211   log << MSG::DEBUG << "Each block is set to have "
00212       << m_blocksize << " files."
00213       << endreq;
00214   
00215   StatusCode fstatus = StatusCode::SUCCESS;
00216   
00217   int addedfiles(0);
00218   
00219   for(StreamSpecs::const_iterator i = m_streamSpecs.begin() ;
00220       i != m_streamSpecs.end(); ++i) {
00221     
00222     GFALStageFile * ftmp = new GFALStageFile(*i,msgSvc());
00223 
00224     ftmp->initialize().ignore();
00225     
00226     if (ftmp->isValid()) {
00227       
00228       if ( addedfiles != (int) m_blocksize ) {
00229         fstatus = btmp->addFile(ftmp);
00230         ++addedfiles;
00231       }
00232       
00233       if ( addedfiles == (int) m_blocksize  && (&(*i)!= &m_streamSpecs.back()) ) {
00234         addBlock(btmp);
00235         log << MSG::DEBUG << "adding block" << endreq;
00236         btmp = new GFALStageBlock(m_blocksize,msgSvc());
00237         addedfiles = 0;
00238       }
00239       
00240       if (&(*i)==&m_streamSpecs.back()) {
00241         log << MSG::DEBUG << "adding last block" << endreq;        
00242         addBlock(btmp);
00243       }
00244       
00245     }
00246     else {
00247       
00248       log << MSG::DEBUG <<" The file: " 
00249           << ftmp->rawFileName()
00250           << "] does not exist."
00251           << endreq;
00252       fstatus = StatusCode::FAILURE;
00253       
00254     }
00255   }
00256   
00257   log << MSG::DEBUG << "setBlocks done." << endreq;
00258   
00259   return fstatus;
00260   
00261 }
00262 
00263 
00264 StatusCode GFALDataStreamTool::firstBlockStage() {
00265   
00266   MsgStream log (msgSvc(),name());
00267   
00268   if (m_initstage==0) {
00269     log << MSG::DEBUG <<"No initial stage" << endreq;
00270     m_blockindex = 0 ;
00271   } else {
00272     log << MSG::DEBUG <<"Performing initial stage. Total number of blocks: " 
00273         << m_blocks.size() << endreq;
00274     m_blockitr = m_blocks.begin();
00275 
00276     log << MSG::DEBUG << "staging first block of files" << endreq;
00277     
00278     if ( m_blocks.size() > 0 ) {
00279       StatusCode status = (*m_blockitr)->Stage();
00280       if ( status.isFailure() ){
00281         log << MSG::ERROR << "firstBlockStage> unable to stage first block" << endreq;
00282         return status;
00283       }
00284     }
00285     else {
00286       log << MSG::DEBUG << "Block is empty: check input file format or grid certificate" << endreq;
00287       return StatusCode::FAILURE;
00288     }
00289     
00290   }
00291   
00292   log << MSG::DEBUG << "firstBlockStage> done." << endreq;
00293   
00294   return StatusCode::SUCCESS;
00295   
00296 }
00297 
00298 StatusCode GFALDataStreamTool::nextBlockStage() {
00299   
00300   //Initial stage for each block.  
00301   
00302   MsgStream log (msgSvc(),name());
00303   
00304   if ( (*m_blockitr)->getStatus() != GFALStageBlock::STGED )
00305     return StatusCode::SUCCESS;
00306   
00307   ++m_blockitr;
00308   
00309   if (m_blockitr == m_blocks.end() ) {
00310 
00311     log << MSG::DEBUG << "nextBlockStage> Last block reached" << endreq;
00312     m_task = DONE;
00313     return StatusCode::SUCCESS;
00314     
00315   }
00316   
00317   StatusCode status = (*m_blockitr)->Stage();
00318   if ( status.isFailure() ){
00319     log << MSG::ERROR << "nextBlockStage> unable to stage block" << endreq;
00320     return status;
00321   }
00322   
00323   ++m_blockindex;
00324   
00325   log << MSG::DEBUG << "nextBlockStage> done." << endreq;
00326   
00327   return StatusCode::SUCCESS;
00328   
00329 }
00330 
00331 
00332 StatusCode GFALDataStreamTool::updateBlockStatus( )
00333 {
00334   
00335   MsgStream log (msgSvc(),name());
00336 
00337   if ( m_task == DONE ) return StatusCode::SUCCESS;
00338   
00339   log << MSG::DEBUG << "updateStatus> block" << endreq;
00340   
00341   (*m_blockitr)->updateStatus();
00342   
00343   return StatusCode::SUCCESS;
00344   
00345 }
00346 
00347 StatusCode GFALDataStreamTool::updateBlockStatus( GFALStageBlock * ablock )
00348 {
00349   
00350   MsgStream log (msgSvc(),name());
00351 
00352   if ( m_task == DONE ) return StatusCode::SUCCESS;
00353   
00354   log << MSG::DEBUG << "updateStatus> block" << endreq;
00355 
00356   ablock->updateStatus();
00357   
00358   return StatusCode::SUCCESS;
00359   
00360 }
00361 
00362 void GFALDataStreamTool::addBlock(GFALStageBlock * b)
00363 {
00364   
00365   m_blocks.push_back(b);
00366   m_ntotblocks++;
00367   
00368   return ;
00369   
00370 }
00371 
00372 StatusCode GFALDataStreamTool::updateStreamTurls() 
00373 {
00374 
00375   MsgStream log (msgSvc(),name());
00376     
00377   StatusCode status;
00378   
00379   log << MSG::DEBUG << "updateStreamTurls>" << endreq;
00380 
00381   if ( m_task == DONE ) return StatusCode::SUCCESS;
00382     
00383   if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED) {
00384     
00385     std::vector<GFALStageFile*>::iterator stgfile = (*m_blockitr)->m_files.begin() ;
00386     
00387     while( (stgfile != (*m_blockitr)->m_files.end() ) ) {
00388       std::string turl = (*stgfile)->getTurl();
00389       if ( (*stgfile)->getStatus() == GFALStageFile::STGED  ) {
00390         int pos = m_streamTurls[m_turlCount].find("'",10);
00391         std::string tail = m_streamTurls[m_turlCount].erase(0,pos+1);
00392         std::string head;
00393         
00394         if ( ! (*stgfile)->isPFN() ) head = std::string("DATAFILE='gfal:") + turl + std::string("'");
00395         else {
00396           std::string pfnturl = turl.erase(0,8);
00397           head = std::string("DATAFILE='rfio:") + pfnturl + std::string("'");
00398         }
00399         
00400         m_streamTurls[m_turlCount] = head + tail;
00401         
00402         log << MSG::DEBUG << "Adding to m_streamTurl_" << m_turlCount << "\n"
00403             << m_streamTurls[m_turlCount] << endmsg;
00404         
00406         //Update the data stream list with the turl
00407         StatusCode fts = updateStreams( (*stgfile)->rawFileName(), m_streamTurls[m_turlCount] );
00408         
00409         if ( ! fts.isSuccess() ) log << MSG::DEBUG << "Failed to update Stream!" << endmsg;
00410         ++m_turlCount;
00411         
00412       }
00413       
00414       ++stgfile;
00415       
00416     }
00417     
00418   }
00419   
00420   return status;
00421   
00422 }
00423 
00424 StatusCode GFALDataStreamTool::updateStreams( const std::string & oldinfo, const std::string & uptinfo )
00425 {
00426   
00427   Streams::iterator i = DataStreamTool::getStreamIterator(oldinfo);
00428   
00429   if ( i != m_streams.end() )   {
00430     (*i)->release();
00431     Streams::iterator p = m_streams.erase(i);
00432     char txt[32];
00433     std::string nam = name() + "_" + ::_itoa( ++m_streamCount, txt, 10);
00434     EventSelectorDataStream* s = 0;
00435     StatusCode status = DataStreamTool::createStream(nam, uptinfo, s);
00436     if ( status.isSuccess() )   {
00437       if ( 0 != s )   {
00438         s->addRef();
00439         m_streams.insert(p, s);
00440         return StatusCode::SUCCESS;
00441       }
00442     }
00443   }
00444   
00445   return StatusCode::FAILURE;
00446   
00447 }

Generated at Thu Jan 8 17:44:19 2009 for Gaudi Framework, version v20r4 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004