![]() |
|
|
Generated: 8 Jan 2009 |
00001 // $Id: GFALDataStreamTool.cpp,v 1.3 2007/07/11 08:23:42 marcocle Exp $ 00002 // Include files 00003 00004 // from Gaudi 00005 #include "GaudiKernel/DeclareFactoryEntries.h" 00006 #include "GaudiKernel/MsgStream.h" 00007 #include "GaudiKernel/xtoa.h" 00008 #include "GaudiKernel/SmartIF.h" 00009 #include "GaudiKernel/Incident.h" 00010 #include "GaudiKernel/SvcFactory.h" 00011 #include "GaudiKernel/Tokenizer.h" 00012 #include "GaudiKernel/MsgStream.h" 00013 #include "GaudiKernel/IIncidentSvc.h" 00014 #include "GaudiKernel/ISvcLocator.h" 00015 #include "GaudiKernel/ISvcManager.h" 00016 #include "GaudiKernel/IAddressCreator.h" 00017 00018 // local 00019 #include "GFALDataStreamTool.h" 00020 00021 //----------------------------------------------------------------------------- 00022 // Implementation file for class : GFALDataStreamTool 00023 // 00024 // 2006-09-29 : Andres Osorio 00025 // : Greig Cowan 00026 //----------------------------------------------------------------------------- 00027 00028 // Declaration of the Tool Factory 00029 DECLARE_TOOL_FACTORY( GFALDataStreamTool ) 00030 00031 00032 //============================================================================= 00033 // Standard constructor, initializes variables 00034 //============================================================================= 00035 GFALDataStreamTool::GFALDataStreamTool( const std::string& type, 00036 const std::string& name, 00037 const IInterface* parent ) 00038 : DataStreamTool ( type, name , parent ) 00039 { 00040 00041 declareProperty("BlockSize",m_blocksize = 1 ); 00042 declareProperty("InitialStage",m_initstage = 1); 00043 00044 } 00045 00046 //============================================================================= 00047 // Destructor 00048 //============================================================================= 00049 GFALDataStreamTool::~GFALDataStreamTool() 00050 { 00051 00052 std::vector<GFALStageBlock*>::iterator itr = m_blocks.begin() ; 00053 while( (itr != m_blocks.end()) ) { 00054 if ( (*itr) ) delete (*itr); 00055 ++itr; 00056 } 00057 00058 } 00059 00060 StatusCode GFALDataStreamTool::initialize() { 00061 00062 MsgStream logger(msgSvc(), name()); 00063 00064 StatusCode status = DataStreamTool::initialize(); 00065 if( !status.isSuccess() ) { 00066 logger << MSG::FATAL << "Error. Cannot initialize base class." << endreq; 00067 return status; 00068 } 00069 00070 m_streamCount = 1; 00071 m_turlCount = 0; 00072 00073 m_atblock = 0; 00074 m_pos = 0; 00075 m_task = UNKNOWN; 00076 00077 if (m_blocksize < 1) { 00078 logger << MSG::ERROR << "Wrong Block size" << endreq; 00079 logger << MSG::ERROR << "Prestaging disable" << endreq; 00080 return StatusCode::FAILURE; 00081 } 00082 00083 return StatusCode::SUCCESS; 00084 } 00085 00086 //============================================================================= 00087 StatusCode GFALDataStreamTool::addStreams(const StreamSpecs & inputs) { 00088 00089 MsgStream logger(msgSvc(), name()); 00090 00091 StatusCode status = DataStreamTool::addStreams(inputs); 00092 00095 00096 m_ntotblocks = 0; 00097 status = setBlocks(); 00098 00099 if ( ! status.isSuccess() ){ 00100 logger << MSG::ERROR << "Unable to set Blocks" << endreq; 00101 return StatusCode::FAILURE; 00102 } 00103 00104 status = firstBlockStage(); 00105 00106 if ( ! status.isSuccess() ) { 00107 logger << MSG::ERROR << "Unable stage first Block" << endreq; 00108 return StatusCode::FAILURE; 00109 } 00110 00111 //Stage first block of files until all files in the block are STAGED 00112 m_task = STGING; 00113 m_blockitr=m_blocks.begin(); 00114 00115 while ( ((*m_blockitr)->getStatus() != GFALStageBlock::STGED) && ((*m_blockitr)->getStatus() != GFALStageBlock::FAILED) ) { 00116 status = updateBlockStatus(); 00117 } 00118 00120 00121 m_streamTurls = m_streamSpecs; 00122 00123 if( status.isFailure() ){ 00124 return status; 00125 } 00126 00127 m_streamCount = 1; 00128 return updateStreamTurls(); 00129 00130 } 00131 00132 StatusCode GFALDataStreamTool::finalize() { 00133 00134 StatusCode iret, status = StatusCode::SUCCESS; 00135 iret.ignore(); 00136 00137 MsgStream log(msgSvc(), name()); 00138 00139 log << MSG::DEBUG << "finalise() starts now!" << endmsg; 00140 00141 for ( StreamSpecs::const_iterator i = m_streamTurls.begin(); i != m_streamTurls.end(); i++ ) { 00142 EventSelectorDataStream * s = DataStreamTool::getStream(*i); 00143 if ( NULL == s ) { 00144 if ( s->isInitialized() ) { 00145 iret = finalizeStream(s); 00146 if ( !iret.isSuccess() ) { 00147 log << MSG::ERROR << "Failed to finalize stream " << *i << endmsg; 00148 status = iret; 00149 } 00150 } 00151 } 00152 00153 iret = eraseStream( *i ); 00154 00155 if ( !iret.isSuccess() ) { 00156 log << MSG::ERROR << "Failed to disconnect stream " << *i << endmsg; 00157 status = iret; 00158 } 00159 00160 } 00161 00162 m_streamSpecs.clear(); //The input specification are not needed anymore 00163 00164 log << MSG::DEBUG << "finalise() total files processed " << m_streamID << endmsg; 00165 00166 status = DataStreamTool::finalize(); 00167 00168 return status; 00169 00170 } 00171 00172 StatusCode GFALDataStreamTool::getNextStream( const EventSelectorDataStream * & esds, size_type & dsid ) 00173 { 00174 00175 EventSelectorDataStream * nextStream = getStream(dsid); 00176 if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached 00177 00178 esds = nextStream; 00179 00180 //Stage blocks request 00181 if ( m_task != DONE ) { 00182 00183 if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED ) { 00184 nextBlockStage().ignore(); 00185 updateStreamTurls().ignore(); 00186 } 00187 updateBlockStatus().ignore(); 00188 } 00189 // 00190 00191 ++m_streamID; 00192 00193 return StatusCode::SUCCESS; 00194 00195 } 00196 00198 // Stage files functionality 00199 00200 StatusCode GFALDataStreamTool::setBlocks() 00201 { 00202 00203 MsgStream log (msgSvc(),name()); 00204 00205 GFALStageBlock * btmp = new GFALStageBlock(m_blocksize,msgSvc()); 00206 00207 log << MSG::DEBUG << "Setting up the blocks of files to stage. Total files: " 00208 << m_streamSpecs.size() 00209 << endreq; 00210 00211 log << MSG::DEBUG << "Each block is set to have " 00212 << m_blocksize << " files." 00213 << endreq; 00214 00215 StatusCode fstatus = StatusCode::SUCCESS; 00216 00217 int addedfiles(0); 00218 00219 for(StreamSpecs::const_iterator i = m_streamSpecs.begin() ; 00220 i != m_streamSpecs.end(); ++i) { 00221 00222 GFALStageFile * ftmp = new GFALStageFile(*i,msgSvc()); 00223 00224 ftmp->initialize().ignore(); 00225 00226 if (ftmp->isValid()) { 00227 00228 if ( addedfiles != (int) m_blocksize ) { 00229 fstatus = btmp->addFile(ftmp); 00230 ++addedfiles; 00231 } 00232 00233 if ( addedfiles == (int) m_blocksize && (&(*i)!= &m_streamSpecs.back()) ) { 00234 addBlock(btmp); 00235 log << MSG::DEBUG << "adding block" << endreq; 00236 btmp = new GFALStageBlock(m_blocksize,msgSvc()); 00237 addedfiles = 0; 00238 } 00239 00240 if (&(*i)==&m_streamSpecs.back()) { 00241 log << MSG::DEBUG << "adding last block" << endreq; 00242 addBlock(btmp); 00243 } 00244 00245 } 00246 else { 00247 00248 log << MSG::DEBUG <<" The file: " 00249 << ftmp->rawFileName() 00250 << "] does not exist." 00251 << endreq; 00252 fstatus = StatusCode::FAILURE; 00253 00254 } 00255 } 00256 00257 log << MSG::DEBUG << "setBlocks done." << endreq; 00258 00259 return fstatus; 00260 00261 } 00262 00263 00264 StatusCode GFALDataStreamTool::firstBlockStage() { 00265 00266 MsgStream log (msgSvc(),name()); 00267 00268 if (m_initstage==0) { 00269 log << MSG::DEBUG <<"No initial stage" << endreq; 00270 m_blockindex = 0 ; 00271 } else { 00272 log << MSG::DEBUG <<"Performing initial stage. Total number of blocks: " 00273 << m_blocks.size() << endreq; 00274 m_blockitr = m_blocks.begin(); 00275 00276 log << MSG::DEBUG << "staging first block of files" << endreq; 00277 00278 if ( m_blocks.size() > 0 ) { 00279 StatusCode status = (*m_blockitr)->Stage(); 00280 if ( status.isFailure() ){ 00281 log << MSG::ERROR << "firstBlockStage> unable to stage first block" << endreq; 00282 return status; 00283 } 00284 } 00285 else { 00286 log << MSG::DEBUG << "Block is empty: check input file format or grid certificate" << endreq; 00287 return StatusCode::FAILURE; 00288 } 00289 00290 } 00291 00292 log << MSG::DEBUG << "firstBlockStage> done." << endreq; 00293 00294 return StatusCode::SUCCESS; 00295 00296 } 00297 00298 StatusCode GFALDataStreamTool::nextBlockStage() { 00299 00300 //Initial stage for each block. 00301 00302 MsgStream log (msgSvc(),name()); 00303 00304 if ( (*m_blockitr)->getStatus() != GFALStageBlock::STGED ) 00305 return StatusCode::SUCCESS; 00306 00307 ++m_blockitr; 00308 00309 if (m_blockitr == m_blocks.end() ) { 00310 00311 log << MSG::DEBUG << "nextBlockStage> Last block reached" << endreq; 00312 m_task = DONE; 00313 return StatusCode::SUCCESS; 00314 00315 } 00316 00317 StatusCode status = (*m_blockitr)->Stage(); 00318 if ( status.isFailure() ){ 00319 log << MSG::ERROR << "nextBlockStage> unable to stage block" << endreq; 00320 return status; 00321 } 00322 00323 ++m_blockindex; 00324 00325 log << MSG::DEBUG << "nextBlockStage> done." << endreq; 00326 00327 return StatusCode::SUCCESS; 00328 00329 } 00330 00331 00332 StatusCode GFALDataStreamTool::updateBlockStatus( ) 00333 { 00334 00335 MsgStream log (msgSvc(),name()); 00336 00337 if ( m_task == DONE ) return StatusCode::SUCCESS; 00338 00339 log << MSG::DEBUG << "updateStatus> block" << endreq; 00340 00341 (*m_blockitr)->updateStatus(); 00342 00343 return StatusCode::SUCCESS; 00344 00345 } 00346 00347 StatusCode GFALDataStreamTool::updateBlockStatus( GFALStageBlock * ablock ) 00348 { 00349 00350 MsgStream log (msgSvc(),name()); 00351 00352 if ( m_task == DONE ) return StatusCode::SUCCESS; 00353 00354 log << MSG::DEBUG << "updateStatus> block" << endreq; 00355 00356 ablock->updateStatus(); 00357 00358 return StatusCode::SUCCESS; 00359 00360 } 00361 00362 void GFALDataStreamTool::addBlock(GFALStageBlock * b) 00363 { 00364 00365 m_blocks.push_back(b); 00366 m_ntotblocks++; 00367 00368 return ; 00369 00370 } 00371 00372 StatusCode GFALDataStreamTool::updateStreamTurls() 00373 { 00374 00375 MsgStream log (msgSvc(),name()); 00376 00377 StatusCode status; 00378 00379 log << MSG::DEBUG << "updateStreamTurls>" << endreq; 00380 00381 if ( m_task == DONE ) return StatusCode::SUCCESS; 00382 00383 if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED) { 00384 00385 std::vector<GFALStageFile*>::iterator stgfile = (*m_blockitr)->m_files.begin() ; 00386 00387 while( (stgfile != (*m_blockitr)->m_files.end() ) ) { 00388 std::string turl = (*stgfile)->getTurl(); 00389 if ( (*stgfile)->getStatus() == GFALStageFile::STGED ) { 00390 int pos = m_streamTurls[m_turlCount].find("'",10); 00391 std::string tail = m_streamTurls[m_turlCount].erase(0,pos+1); 00392 std::string head; 00393 00394 if ( ! (*stgfile)->isPFN() ) head = std::string("DATAFILE='gfal:") + turl + std::string("'"); 00395 else { 00396 std::string pfnturl = turl.erase(0,8); 00397 head = std::string("DATAFILE='rfio:") + pfnturl + std::string("'"); 00398 } 00399 00400 m_streamTurls[m_turlCount] = head + tail; 00401 00402 log << MSG::DEBUG << "Adding to m_streamTurl_" << m_turlCount << "\n" 00403 << m_streamTurls[m_turlCount] << endmsg; 00404 00406 //Update the data stream list with the turl 00407 StatusCode fts = updateStreams( (*stgfile)->rawFileName(), m_streamTurls[m_turlCount] ); 00408 00409 if ( ! fts.isSuccess() ) log << MSG::DEBUG << "Failed to update Stream!" << endmsg; 00410 ++m_turlCount; 00411 00412 } 00413 00414 ++stgfile; 00415 00416 } 00417 00418 } 00419 00420 return status; 00421 00422 } 00423 00424 StatusCode GFALDataStreamTool::updateStreams( const std::string & oldinfo, const std::string & uptinfo ) 00425 { 00426 00427 Streams::iterator i = DataStreamTool::getStreamIterator(oldinfo); 00428 00429 if ( i != m_streams.end() ) { 00430 (*i)->release(); 00431 Streams::iterator p = m_streams.erase(i); 00432 char txt[32]; 00433 std::string nam = name() + "_" + ::_itoa( ++m_streamCount, txt, 10); 00434 EventSelectorDataStream* s = 0; 00435 StatusCode status = DataStreamTool::createStream(nam, uptinfo, s); 00436 if ( status.isSuccess() ) { 00437 if ( 0 != s ) { 00438 s->addRef(); 00439 m_streams.insert(p, s); 00440 return StatusCode::SUCCESS; 00441 } 00442 } 00443 } 00444 00445 return StatusCode::FAILURE; 00446 00447 }