00001
00002 from Gaudi.Configuration import *
00003 from GaudiPython import AppMgr, gbl
00004 from ROOT import TFile, TBufferFile, TBuffer
00005 from multiprocessing import Process, Queue
00006 from Configurables import LHCbApp
00007 import sys
00008
00009
00010
00011
00012
00013
00014
00015 def checkKeys( name ) :
00016
00017 fname = name[4:]
00018 tf = TFile( fname, 'REC' )
00019
00020
00021
00022
00023
00024 importOptions( '$STDOPTS/LHCbApplication.opts' )
00025
00026 importOptions( '$GAUDICNVROOT/options/Setup.opts' )
00027
00028
00029 OutputStream( "DstWriter" ).Output = ''
00030 HistogramPersistencySvc().OutputFile = ''
00031 MessageSvc( OutputLevel = ERROR )
00032 EventSelector().PrintFreq = 100
00033
00034 ApplicationMgr( OutputLevel = ERROR,
00035 AppName = 'File Check - Serial vs Parallel' )
00036
00037
00038 PAR = 'PARALLEL'
00039 SER = 'SERIAL'
00040
00041 def CompareTrees( pname, sname ) :
00042 pf = TFile( pname, 'REC' )
00043 sf = TFile( sname, 'REC' )
00044 event = '_Event'
00045 pfks = pf.GetListOfKeys()
00046 sfks = sf.GetListOfKeys()
00047 pfkeys = list( [pfk.GetName() for pfk in pfks] ) ; pfkeys.sort()
00048 sfkeys = list( [sfk.GetName() for sfk in sfks] ) ; sfkeys.sort()
00049 pMeta = [] ; pEvent = [] ; pOther = []
00050 for k in pfkeys :
00051 if k.startswith( event ) : pEvent.append( k )
00052 elif k.startswith( '##' ) : pMeta.append( k )
00053 else : pOther.append( k )
00054 sMeta = [] ; sEvent = [] ; sOther = []
00055 for k in sfkeys :
00056 if k.startswith( event ) : sEvent.append( k )
00057 elif k.startswith( '##' ) : sMeta.append( k )
00058 else : sOther.append( k )
00059
00060 if pMeta == sMeta : pass
00061 else : print 'Meta Data differs'
00062
00063 if pEvent == sEvent : pass
00064 else : print 'Event data differs'
00065
00066 if pOther != sOther :
00067 pset = set( pOther )
00068 sset = set( sOther )
00069 pExtra = pset - sset
00070 sExtra = sset - pset
00071 if pExtra : print 'Extra Data in parallel file : ', pExtra
00072 if sExtra : print 'Extra Data in serial file : ', sExtra
00073 if sExtra or pExtra : print 'Files will have different sizes'
00074 pf.Close()
00075 sf.Close()
00076
00077 def switchDict( d ) :
00078
00079
00080 nkeys = len( d.keys() )
00081 vals = d.values()
00082 nvals = len( vals )
00083 for v in vals :
00084 if vals.count( v ) > 1 :
00085 print 'Dictionary cannot be switched, values not unique'
00086 return None
00087 print 'Dict has keys/values : %i/%i' % ( nkeys, nvals )
00088 pairs = d.items()
00089 newd = {}
00090 for k, entry in pairs : newd[entry] = k
00091 return newd
00092
00093
00094 def printDict( d, name = 'unspecified' ) :
00095
00096
00097
00098
00099
00100
00101
00102 print '-' * 80
00103 print 'Dictionary %s : ' % ( name )
00104 for k in iter( d.keys() ) :
00105 print '\t', k, '\t', d[k]
00106 print '-' * 80
00107
00108
00109 def Reader( readerType, filename, qacross, qToEngine ) :
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122 a = AppMgr()
00123 sel = a.evtsel()
00124 evt = a.evtsvc()
00125
00126 header = '/Event/Rec/Header'
00127 sel.open( filename )
00128 ct = 0
00129 order = {}
00130 fname = filename[4:]
00131
00132
00133 while True :
00134 a.run( 1 )
00135 if evt[header] :
00136 eNumber = int( evt[header].evtNumber() )
00137 order[eNumber] = ct
00138 ct += 1
00139 else : break
00140
00141 if readerType == SER :
00142
00143 order = switchDict( order )
00144 qacross.put( order )
00145 qacross.put( None )
00146
00147 serOrder = order
00148 elif readerType == PAR :
00149
00150 for serOrder in iter( qacross.get, None ) : pass
00151 lsks = len( serOrder.keys() )
00152 lpks = len( order.keys() )
00153 print 'Events in Files (serial/parallel) : %i / %i' % ( lsks, lpks )
00154
00155
00156
00157 for i in iter( serOrder.keys() ) :
00158 if readerType == PAR : i = order[serOrder[i]]
00159
00160 a.runSelectedEvents( fname, i )
00161 lst = evt.getList()
00162
00163 lst.sort()
00164 ascii = dict( [ ( l, ( evt[l].__class__.__name__, evt[l].__repr__() ) ) for l in lst ] )
00165 qToEngine.put( ascii )
00166 qToEngine.put( None )
00167 print '%s Reader Finished' % ( readerType )
00168
00169 def ComparisonEngine( pQueue, sQueue ) :
00170
00171
00172
00173
00174
00175
00176
00177
00178 results = []
00179 while True :
00180 pitem = pQueue.get()
00181 sitem = sQueue.get()
00182 if pitem == sitem == None : print 'Termination Signals received ok' ; break
00183 elif pitem == None : print 'pitem != sitem : ', pitem, sitem ; break
00184 elif sitem == None : print 'pitem != sitem : ', pitem, sitem ; break
00185 results.append( compareEvents( pitem, sitem ) )
00186 print '=' * 80
00187 print 'Comparison Engine Finished'
00188 print '-' * 80
00189 print 'Total Events Checked : %i' % ( len( results ) )
00190 print 'Perfect Matches : %i' % ( sum( results ) )
00191 print 'Errors : %i' % ( len( results ) - sum( results ) )
00192 print '=' * 80
00193
00194 def checkForAddressDifference( a, b ) :
00195
00196
00197
00198
00199
00200
00201
00202
00203 ref = 'DataObject at 0x'
00204 if a[:16] == b[:16] == ref : return True
00205 else : return False
00206
00207
00208 def compareEvents( s, p ) :
00209
00210
00211
00212
00213 sks = s.keys() ; pks = p.keys()
00214 sks.sort() ; pks.sort()
00215 if len( sks ) == len( pks ) : pass
00216 else :
00217
00218
00219
00220
00221
00222
00223
00224
00225 extras = list( set( pks ) - set( sks ) )
00226 for e in extras :
00227 if p[e][0] == 'DataObject' : pks.remove( e )
00228 else : print 'Extra Other thing found!', e, p[e][0] ; return False
00229
00230
00231 if sks == pks : pass
00232 else : return False
00233
00234
00235 l = len( sks )
00236 diffs = []
00237 for i in xrange( l ) :
00238 key = sks[i]
00239
00240 if s[key][0] == p[key][0] : pass
00241 else : diffs.append( key )
00242
00243 if s[key][1] == p[key][1] : pass
00244 elif checkForAddressDifference( p[key][1], s[key][1] ) : pass
00245 else : diffs.append( key )
00246
00247
00248 if diffs : return False
00249 else : return True
00250
00251 def CheckFileRecords( par, ser ):
00252
00253 print "Checking File Records"
00254
00255 parFSR = GetFSRdicts( par )
00256 serFSR = GetFSRdicts( ser )
00257
00258
00259
00260 diff1 = set( parFSR["TimeSpanFSR"].iteritems() ) - set( serFSR["TimeSpanFSR"].iteritems() )
00261 diff2 = set( parFSR["EventCountFSR"].iteritems() ) - set( serFSR["EventCountFSR"].iteritems() )
00262
00263 print "\nDifferent entries in TimeSpanFSR: \t" + str( len( diff1 ) ) + "\nDifferent entries in EventCountFSR:\t" + str( len( diff2 ) )
00264
00265 for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
00266 diff3 = set( parFSR[k]["key"] ) - set( serFSR[k]["key"] )
00267 diff4 = set( parFSR[k]["incr"] ) - set( serFSR[k]["incr"] )
00268 diff5 = set( parFSR[k]["integral"] ) - set( serFSR[k]["integral"] )
00269 print "Different entries in " + str( k ) + ": \tkey: " + str( len( diff3 ) ) + " increment: " + str( len( diff4 ) ) + " integral: " + str( len( diff5 ) )
00270
00271
00272 def LumiFSR( lumi ):
00273
00274 runs = []
00275 files = []
00276 info = {}
00277 keys = []
00278
00279 for r in lumi.runNumbers() :
00280 runs.append( r )
00281
00282 for f in lumi.fileIDs() :
00283 files.append( f )
00284 s = str( lumi )
00285 sa = s.split( "info (key/incr/integral) : " )[-1]
00286 sa = sa.split( '/' )[:-1]
00287
00288 key = []
00289 incr = []
00290 integral = []
00291 for rec in sa :
00292 k, i, t = rec.split()
00293 key.append( int( k ) )
00294 incr.append( int( i ) )
00295 integral.append( int( t ) )
00296
00297 return ( runs, files, key, incr, integral )
00298
00299
00300 def GetFSRdict( filename, queue ):
00301
00302
00303 FSR = { "TimeSpanFSR" : {'earliest': 0, 'latest': 0},
00304 "LumiFSRBeamCrossing" : {'key': 0, 'incr': 0, 'integral':0},
00305 "LumiFSRBeam1" : {'key': 0, 'incr': 0, 'integral':0},
00306 "LumiFSRBeam2" : {'key': 0, 'incr': 0, 'integral':0},
00307 "LumiFSRNoBeam" : {'key': 0, 'incr': 0, 'integral':0},
00308 "EventCountFSR" : {'input': 0, 'output': 0, 'statusFlag': 0}}
00309
00310 options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" %filename
00311 options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
00312 options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');"%( 2011, 2011 )
00313 exec options
00314 app = AppMgr()
00315 app.run( 1 )
00316 fsr = app.filerecordsvc()
00317
00318 lst = fsr.getHistoNames()
00319
00320 if lst :
00321 for l in lst :
00322
00323 ob = fsr.retrieveObject( l )
00324
00325
00326 if "LumiFSR" in l:
00327
00328 assert ob.numberOfObjects() == 1
00329 k = ob.containedObject( 0 )
00330 runs, files, keys, increment, integral = LumiFSR( k )
00331
00332 FSR[l[l.rfind( '/' ) + 1:]]['runs'] = runs
00333 FSR[l[l.rfind( '/' ) + 1:]]['files'] = files
00334 FSR[l[l.rfind( '/' ) + 1:]]['key'] = keys
00335 FSR[l[l.rfind( '/' ) + 1:]]['incr'] = increment
00336 FSR[l[l.rfind( '/' ) + 1:]]['integral'] = integral
00337
00338
00339
00340 if "TimeSpanFSR" in l:
00341
00342 FSR["TimeSpanFSR"]['earliest'] = ob.containedObject( 0 ).earliest()
00343 FSR["TimeSpanFSR"]['latest'] = ob.containedObject( 0 ).latest()
00344
00345 if "EventCountFSR" in l:
00346
00347 FSR["EventCountFSR"]['input'] = ob.input()
00348 FSR["EventCountFSR"]['output'] = ob.output()
00349 FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
00350
00351 app.stop()
00352 app.finalize()
00353
00354 queue.put( FSR )
00355
00356 def CompareFSR( pout, sout ):
00357
00358 parFSR = pout.get()
00359 serFSR = sout.get()
00360
00361 print "Comparing File Records"
00362
00363 diff1 = set( parFSR["TimeSpanFSR"].iteritems() ) - set( serFSR["TimeSpanFSR"].iteritems() )
00364 diff2 = set( parFSR["EventCountFSR"].iteritems() ) - set( serFSR["EventCountFSR"].iteritems() )
00365
00366 print "\nDifferent entries in TimeSpanFSR: \t" + str( len( diff1 ) ) + "\nDifferent entries in EventCountFSR:\t" + str( len( diff2 ) )
00367
00368 for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
00369 diff3 = set( parFSR[k]['key'] ) - set( serFSR[k]['key'] )
00370 diff4 = set( parFSR[k]['incr'] ) - set( serFSR[k]['incr'] )
00371 diff5 = set( parFSR[k]['integral'] ) - set( serFSR[k]["integral"] )
00372 print "Different entries in " + str( k ) + ": \tkey: " + str( len( diff3 ) ) + " increment: " + str( len( diff4 ) ) + " integral: " + str( len( diff5 ) )
00373
00374
00375 print "\nParallel: \n" + str( parFSR )
00376 print "\nSerial: \n" + str( serFSR )
00377
00378 if __name__ == '__main__' :
00379
00380 args = sys.argv
00381 args.pop( 0 )
00382 if len( args ) != 2 :
00383 print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
00384 sys.exit( 0 )
00385 else :
00386 par = 'PFN:' + args[0]
00387 ser = 'PFN:' + args[1]
00388 print 'Parallel File to be analysed : %s' % ( par )
00389 print 'Serial File to be analysed : %s' % ( ser )
00390
00391
00392 pname = par[4:]
00393 sname = ser[4:]
00394
00395 qacross = Queue()
00396 pout = Queue()
00397 sout = Queue()
00398
00399 par = Process( target = Reader, args = ( PAR, par, qacross, pout ) )
00400 ser = Process( target = Reader, args = ( SER, ser, qacross, sout ) )
00401 com = Process( target = ComparisonEngine, args = ( pout, sout ) )
00402
00403
00404
00405
00406
00407
00408 print "Check File Records"
00409
00410 ser = sys.argv[0]
00411 par = sys.argv[1]
00412
00413 pout = Queue()
00414 sout = Queue()
00415
00416 sp = Process( target = GetFSRdict, args = ( ser, sout ) )
00417 pp = Process( target = GetFSRdict, args = ( par, pout ) )
00418 cp = Process( target = CompareFSR, args = ( pout, sout ) )
00419
00420 sp.start(); pp.start(); cp.start()
00421 sp.join();pp.join(); cp.join()
00422