![]() |
|
|
Generated: 8 Jan 2009 |
#include <GaudiGridSvc/GFALDataStreamTool.h>


Definition at line 34 of file GFALDataStreamTool.h.
Public Types | |
| enum | Status { UNKNOWN = 0, STGING, DONE } |
| Return status. More... | |
Public Member Functions | |
| GFALDataStreamTool (const std::string &type, const std::string &name, const IInterface *parent) | |
| Standard constructor. | |
| virtual | ~GFALDataStreamTool () |
| Destructor. | |
| virtual StatusCode | initialize () |
| Initialization (from CONFIGURED to INITIALIZED). | |
| virtual StatusCode | finalize () |
| Finalize (from INITIALIZED to CONFIGURED). | |
| virtual StatusCode | addStreams (const StreamSpecs &) |
| virtual StatusCode | getNextStream (const EventSelectorDataStream *&, size_type &) |
Protected Member Functions | |
| StatusCode | updateStreams (const std::string &, const std::string &) |
Private Member Functions | |
| StatusCode | setBlocks () |
| StatusCode | firstBlockStage () |
| StatusCode | nextBlockStage () |
| StatusCode | updateBlockStatus () |
| StatusCode | updateBlockStatus (GFALStageBlock *) |
| StatusCode | updateStreamTurls () |
| void | addBlock (GFALStageBlock *) |
Private Attributes | |
| size_type | m_turlCount |
| StreamSpecs | m_streamTurls |
| std::vector< GFALStageBlock * > | m_blocks |
| std::vector< GFALStageBlock * > ::iterator | m_blockitr |
| long int | m_pos |
| size_t | m_blockindex |
| index of the last staged block | |
| size_t | m_ntotblocks |
| total number of blocks | |
| size_t | m_blocksize |
| number of files in one stage block | |
| size_t | m_initstage |
| number of blocks for the initial prestage | |
| size_t | m_readfile |
| number of files (streams) read. | |
| int | m_atblock |
| Status | m_task |
| GFALDataStreamTool::GFALDataStreamTool | ( | const std::string & | type, | |
| const std::string & | name, | |||
| const IInterface * | parent | |||
| ) |
Standard constructor.
Definition at line 35 of file GFALDataStreamTool.cpp.
00038 : DataStreamTool ( type, name , parent ) 00039 { 00040 00041 declareProperty("BlockSize",m_blocksize = 1 ); 00042 declareProperty("InitialStage",m_initstage = 1); 00043 00044 }
| GFALDataStreamTool::~GFALDataStreamTool | ( | ) | [virtual] |
Destructor.
Definition at line 49 of file GFALDataStreamTool.cpp.
00050 { 00051 00052 std::vector<GFALStageBlock*>::iterator itr = m_blocks.begin() ; 00053 while( (itr != m_blocks.end()) ) { 00054 if ( (*itr) ) delete (*itr); 00055 ++itr; 00056 } 00057 00058 }
| StatusCode GFALDataStreamTool::initialize | ( | ) | [virtual] |
Initialization (from CONFIGURED to INITIALIZED).
Reimplemented from DataStreamTool.
Definition at line 60 of file GFALDataStreamTool.cpp.
00060 { 00061 00062 MsgStream logger(msgSvc(), name()); 00063 00064 StatusCode status = DataStreamTool::initialize(); 00065 if( !status.isSuccess() ) { 00066 logger << MSG::FATAL << "Error. Cannot initialize base class." << endreq; 00067 return status; 00068 } 00069 00070 m_streamCount = 1; 00071 m_turlCount = 0; 00072 00073 m_atblock = 0; 00074 m_pos = 0; 00075 m_task = UNKNOWN; 00076 00077 if (m_blocksize < 1) { 00078 logger << MSG::ERROR << "Wrong Block size" << endreq; 00079 logger << MSG::ERROR << "Prestaging disable" << endreq; 00080 return StatusCode::FAILURE; 00081 } 00082 00083 return StatusCode::SUCCESS; 00084 }
| StatusCode GFALDataStreamTool::finalize | ( | ) | [virtual] |
Finalize (from INITIALIZED to CONFIGURED).
Reimplemented from DataStreamTool.
Definition at line 132 of file GFALDataStreamTool.cpp.
00132 { 00133 00134 StatusCode iret, status = StatusCode::SUCCESS; 00135 iret.ignore(); 00136 00137 MsgStream log(msgSvc(), name()); 00138 00139 log << MSG::DEBUG << "finalise() starts now!" << endmsg; 00140 00141 for ( StreamSpecs::const_iterator i = m_streamTurls.begin(); i != m_streamTurls.end(); i++ ) { 00142 EventSelectorDataStream * s = DataStreamTool::getStream(*i); 00143 if ( NULL == s ) { 00144 if ( s->isInitialized() ) { 00145 iret = finalizeStream(s); 00146 if ( !iret.isSuccess() ) { 00147 log << MSG::ERROR << "Failed to finalize stream " << *i << endmsg; 00148 status = iret; 00149 } 00150 } 00151 } 00152 00153 iret = eraseStream( *i ); 00154 00155 if ( !iret.isSuccess() ) { 00156 log << MSG::ERROR << "Failed to disconnect stream " << *i << endmsg; 00157 status = iret; 00158 } 00159 00160 } 00161 00162 m_streamSpecs.clear(); //The input specification are not needed anymore 00163 00164 log << MSG::DEBUG << "finalise() total files processed " << m_streamID << endmsg; 00165 00166 status = DataStreamTool::finalize(); 00167 00168 return status; 00169 00170 }
| StatusCode GFALDataStreamTool::addStreams | ( | const StreamSpecs & | inputs | ) | [virtual] |
initialize Stage
Reimplemented from DataStreamTool.
Definition at line 87 of file GFALDataStreamTool.cpp.
00087 { 00088 00089 MsgStream logger(msgSvc(), name()); 00090 00091 StatusCode status = DataStreamTool::addStreams(inputs); 00092 00095 00096 m_ntotblocks = 0; 00097 status = setBlocks(); 00098 00099 if ( ! status.isSuccess() ){ 00100 logger << MSG::ERROR << "Unable to set Blocks" << endreq; 00101 return StatusCode::FAILURE; 00102 } 00103 00104 status = firstBlockStage(); 00105 00106 if ( ! status.isSuccess() ) { 00107 logger << MSG::ERROR << "Unable stage first Block" << endreq; 00108 return StatusCode::FAILURE; 00109 } 00110 00111 //Stage first block of files until all files in the block are STAGED 00112 m_task = STGING; 00113 m_blockitr=m_blocks.begin(); 00114 00115 while ( ((*m_blockitr)->getStatus() != GFALStageBlock::STGED) && ((*m_blockitr)->getStatus() != GFALStageBlock::FAILED) ) { 00116 status = updateBlockStatus(); 00117 } 00118 00120 00121 m_streamTurls = m_streamSpecs; 00122 00123 if( status.isFailure() ){ 00124 return status; 00125 } 00126 00127 m_streamCount = 1; 00128 return updateStreamTurls(); 00129 00130 }
| StatusCode GFALDataStreamTool::getNextStream | ( | const EventSelectorDataStream *& | esds, | |
| size_type & | dsid | |||
| ) | [virtual] |
Reimplemented from DataStreamTool.
Definition at line 172 of file GFALDataStreamTool.cpp.
00173 { 00174 00175 EventSelectorDataStream * nextStream = getStream(dsid); 00176 if ( NULL == nextStream ) return StatusCode::FAILURE; //<-end of streams reached 00177 00178 esds = nextStream; 00179 00180 //Stage blocks request 00181 if ( m_task != DONE ) { 00182 00183 if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED ) { 00184 nextBlockStage().ignore(); 00185 updateStreamTurls().ignore(); 00186 } 00187 updateBlockStatus().ignore(); 00188 } 00189 // 00190 00191 ++m_streamID; 00192 00193 return StatusCode::SUCCESS; 00194 00195 }
| StatusCode GFALDataStreamTool::updateStreams | ( | const std::string & | oldinfo, | |
| const std::string & | uptinfo | |||
| ) | [protected] |
Definition at line 424 of file GFALDataStreamTool.cpp.
00425 { 00426 00427 Streams::iterator i = DataStreamTool::getStreamIterator(oldinfo); 00428 00429 if ( i != m_streams.end() ) { 00430 (*i)->release(); 00431 Streams::iterator p = m_streams.erase(i); 00432 char txt[32]; 00433 std::string nam = name() + "_" + ::_itoa( ++m_streamCount, txt, 10); 00434 EventSelectorDataStream* s = 0; 00435 StatusCode status = DataStreamTool::createStream(nam, uptinfo, s); 00436 if ( status.isSuccess() ) { 00437 if ( 0 != s ) { 00438 s->addRef(); 00439 m_streams.insert(p, s); 00440 return StatusCode::SUCCESS; 00441 } 00442 } 00443 } 00444 00445 return StatusCode::FAILURE; 00446 00447 }
| StatusCode GFALDataStreamTool::setBlocks | ( | ) | [private] |
Definition at line 200 of file GFALDataStreamTool.cpp.
00201 { 00202 00203 MsgStream log (msgSvc(),name()); 00204 00205 GFALStageBlock * btmp = new GFALStageBlock(m_blocksize,msgSvc()); 00206 00207 log << MSG::DEBUG << "Setting up the blocks of files to stage. Total files: " 00208 << m_streamSpecs.size() 00209 << endreq; 00210 00211 log << MSG::DEBUG << "Each block is set to have " 00212 << m_blocksize << " files." 00213 << endreq; 00214 00215 StatusCode fstatus = StatusCode::SUCCESS; 00216 00217 int addedfiles(0); 00218 00219 for(StreamSpecs::const_iterator i = m_streamSpecs.begin() ; 00220 i != m_streamSpecs.end(); ++i) { 00221 00222 GFALStageFile * ftmp = new GFALStageFile(*i,msgSvc()); 00223 00224 ftmp->initialize().ignore(); 00225 00226 if (ftmp->isValid()) { 00227 00228 if ( addedfiles != (int) m_blocksize ) { 00229 fstatus = btmp->addFile(ftmp); 00230 ++addedfiles; 00231 } 00232 00233 if ( addedfiles == (int) m_blocksize && (&(*i)!= &m_streamSpecs.back()) ) { 00234 addBlock(btmp); 00235 log << MSG::DEBUG << "adding block" << endreq; 00236 btmp = new GFALStageBlock(m_blocksize,msgSvc()); 00237 addedfiles = 0; 00238 } 00239 00240 if (&(*i)==&m_streamSpecs.back()) { 00241 log << MSG::DEBUG << "adding last block" << endreq; 00242 addBlock(btmp); 00243 } 00244 00245 } 00246 else { 00247 00248 log << MSG::DEBUG <<" The file: " 00249 << ftmp->rawFileName() 00250 << "] does not exist." 00251 << endreq; 00252 fstatus = StatusCode::FAILURE; 00253 00254 } 00255 } 00256 00257 log << MSG::DEBUG << "setBlocks done." << endreq; 00258 00259 return fstatus; 00260 00261 }
| StatusCode GFALDataStreamTool::firstBlockStage | ( | ) | [private] |
Definition at line 264 of file GFALDataStreamTool.cpp.
00264 { 00265 00266 MsgStream log (msgSvc(),name()); 00267 00268 if (m_initstage==0) { 00269 log << MSG::DEBUG <<"No initial stage" << endreq; 00270 m_blockindex = 0 ; 00271 } else { 00272 log << MSG::DEBUG <<"Performing initial stage. Total number of blocks: " 00273 << m_blocks.size() << endreq; 00274 m_blockitr = m_blocks.begin(); 00275 00276 log << MSG::DEBUG << "staging first block of files" << endreq; 00277 00278 if ( m_blocks.size() > 0 ) { 00279 StatusCode status = (*m_blockitr)->Stage(); 00280 if ( status.isFailure() ){ 00281 log << MSG::ERROR << "firstBlockStage> unable to stage first block" << endreq; 00282 return status; 00283 } 00284 } 00285 else { 00286 log << MSG::DEBUG << "Block is empty: check input file format or grid certificate" << endreq; 00287 return StatusCode::FAILURE; 00288 } 00289 00290 } 00291 00292 log << MSG::DEBUG << "firstBlockStage> done." << endreq; 00293 00294 return StatusCode::SUCCESS; 00295 00296 }
| StatusCode GFALDataStreamTool::nextBlockStage | ( | ) | [private] |
Definition at line 298 of file GFALDataStreamTool.cpp.
00298 { 00299 00300 //Initial stage for each block. 00301 00302 MsgStream log (msgSvc(),name()); 00303 00304 if ( (*m_blockitr)->getStatus() != GFALStageBlock::STGED ) 00305 return StatusCode::SUCCESS; 00306 00307 ++m_blockitr; 00308 00309 if (m_blockitr == m_blocks.end() ) { 00310 00311 log << MSG::DEBUG << "nextBlockStage> Last block reached" << endreq; 00312 m_task = DONE; 00313 return StatusCode::SUCCESS; 00314 00315 } 00316 00317 StatusCode status = (*m_blockitr)->Stage(); 00318 if ( status.isFailure() ){ 00319 log << MSG::ERROR << "nextBlockStage> unable to stage block" << endreq; 00320 return status; 00321 } 00322 00323 ++m_blockindex; 00324 00325 log << MSG::DEBUG << "nextBlockStage> done." << endreq; 00326 00327 return StatusCode::SUCCESS; 00328 00329 }
| StatusCode GFALDataStreamTool::updateBlockStatus | ( | ) | [private] |
Definition at line 332 of file GFALDataStreamTool.cpp.
00333 { 00334 00335 MsgStream log (msgSvc(),name()); 00336 00337 if ( m_task == DONE ) return StatusCode::SUCCESS; 00338 00339 log << MSG::DEBUG << "updateStatus> block" << endreq; 00340 00341 (*m_blockitr)->updateStatus(); 00342 00343 return StatusCode::SUCCESS; 00344 00345 }
| StatusCode GFALDataStreamTool::updateBlockStatus | ( | GFALStageBlock * | ablock | ) | [private] |
Definition at line 347 of file GFALDataStreamTool.cpp.
00348 { 00349 00350 MsgStream log (msgSvc(),name()); 00351 00352 if ( m_task == DONE ) return StatusCode::SUCCESS; 00353 00354 log << MSG::DEBUG << "updateStatus> block" << endreq; 00355 00356 ablock->updateStatus(); 00357 00358 return StatusCode::SUCCESS; 00359 00360 }
| StatusCode GFALDataStreamTool::updateStreamTurls | ( | ) | [private] |
Definition at line 372 of file GFALDataStreamTool.cpp.
00373 { 00374 00375 MsgStream log (msgSvc(),name()); 00376 00377 StatusCode status; 00378 00379 log << MSG::DEBUG << "updateStreamTurls>" << endreq; 00380 00381 if ( m_task == DONE ) return StatusCode::SUCCESS; 00382 00383 if ( (*m_blockitr)->getStatus() == GFALStageBlock::STGED) { 00384 00385 std::vector<GFALStageFile*>::iterator stgfile = (*m_blockitr)->m_files.begin() ; 00386 00387 while( (stgfile != (*m_blockitr)->m_files.end() ) ) { 00388 std::string turl = (*stgfile)->getTurl(); 00389 if ( (*stgfile)->getStatus() == GFALStageFile::STGED ) { 00390 int pos = m_streamTurls[m_turlCount].find("'",10); 00391 std::string tail = m_streamTurls[m_turlCount].erase(0,pos+1); 00392 std::string head; 00393 00394 if ( ! (*stgfile)->isPFN() ) head = std::string("DATAFILE='gfal:") + turl + std::string("'"); 00395 else { 00396 std::string pfnturl = turl.erase(0,8); 00397 head = std::string("DATAFILE='rfio:") + pfnturl + std::string("'"); 00398 } 00399 00400 m_streamTurls[m_turlCount] = head + tail; 00401 00402 log << MSG::DEBUG << "Adding to m_streamTurl_" << m_turlCount << "\n" 00403 << m_streamTurls[m_turlCount] << endmsg; 00404 00406 //Update the data stream list with the turl 00407 StatusCode fts = updateStreams( (*stgfile)->rawFileName(), m_streamTurls[m_turlCount] ); 00408 00409 if ( ! fts.isSuccess() ) log << MSG::DEBUG << "Failed to update Stream!" << endmsg; 00410 ++m_turlCount; 00411 00412 } 00413 00414 ++stgfile; 00415 00416 } 00417 00418 } 00419 00420 return status; 00421 00422 }
| void GFALDataStreamTool::addBlock | ( | GFALStageBlock * | b | ) | [private] |
Definition at line 362 of file GFALDataStreamTool.cpp.
00363 { 00364 00365 m_blocks.push_back(b); 00366 m_ntotblocks++; 00367 00368 return ; 00369 00370 }
size_type GFALDataStreamTool::m_turlCount [private] |
Definition at line 60 of file GFALDataStreamTool.h.
StreamSpecs GFALDataStreamTool::m_streamTurls [private] |
Definition at line 62 of file GFALDataStreamTool.h.
std::vector<GFALStageBlock*> GFALDataStreamTool::m_blocks [private] |
Definition at line 78 of file GFALDataStreamTool.h.
std::vector<GFALStageBlock*>::iterator GFALDataStreamTool::m_blockitr [private] |
Definition at line 80 of file GFALDataStreamTool.h.
long int GFALDataStreamTool::m_pos [private] |
Definition at line 82 of file GFALDataStreamTool.h.
size_t GFALDataStreamTool::m_blockindex [private] |
size_t GFALDataStreamTool::m_ntotblocks [private] |
size_t GFALDataStreamTool::m_blocksize [private] |
size_t GFALDataStreamTool::m_initstage [private] |
size_t GFALDataStreamTool::m_readfile [private] |
int GFALDataStreamTool::m_atblock [private] |
Definition at line 95 of file GFALDataStreamTool.h.
Status GFALDataStreamTool::m_task [private] |
Definition at line 97 of file GFALDataStreamTool.h.