All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
compareOutputFiles.py
Go to the documentation of this file.
1 
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl
4 from ROOT import TFile, TBufferFile, TBuffer
5 from multiprocessing import Process, Queue
6 from Configurables import LHCbApp
7 import sys
8 
9 #
10 # loadFile.py
11 # -----------
12 # Open a dst file for inspection
13 #
14 
15 def checkKeys( name ) :
16  # Check the TTree keys in each file
17  fname = name[4:] # TFile doesn't need the "PFN:" prefix
18  tf = TFile( fname, 'REC' )
19 
20 
21 
22 
23 
24 importOptions( '$STDOPTS/LHCbApplication.opts' )
25 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
26 importOptions( '$GAUDICNVROOT/options/Setup.opts' )
27 
28 
29 OutputStream( "DstWriter" ).Output = ''
30 HistogramPersistencySvc().OutputFile = ''
31 MessageSvc( OutputLevel = ERROR )
32 EventSelector().PrintFreq = 100
33 
34 ApplicationMgr( OutputLevel = ERROR,
35  AppName = 'File Check - Serial vs Parallel' )
36  # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
37 
38 PAR = 'PARALLEL'
39 SER = 'SERIAL'
40 
41 def CompareTrees( pname, sname ) :
42  pf = TFile( pname, 'REC' )
43  sf = TFile( sname, 'REC' )
44  event = '_Event'
45  pfks = pf.GetListOfKeys()
46  sfks = sf.GetListOfKeys()
47  pfkeys = list( [pfk.GetName() for pfk in pfks] ) ; pfkeys.sort()
48  sfkeys = list( [sfk.GetName() for sfk in sfks] ) ; sfkeys.sort()
49  pMeta = [] ; pEvent = [] ; pOther = []
50  for k in pfkeys :
51  if k.startswith( event ) : pEvent.append( k )
52  elif k.startswith( '##' ) : pMeta.append( k )
53  else : pOther.append( k )
54  sMeta = [] ; sEvent = [] ; sOther = []
55  for k in sfkeys :
56  if k.startswith( event ) : sEvent.append( k )
57  elif k.startswith( '##' ) : sMeta.append( k )
58  else : sOther.append( k )
59 
60  if pMeta == sMeta : pass
61  else : print 'Meta Data differs'
62 
63  if pEvent == sEvent : pass
64  else : print 'Event data differs'
65 
66  if pOther != sOther :
67  pset = set( pOther )
68  sset = set( sOther )
69  pExtra = pset - sset
70  sExtra = sset - pset
71  if pExtra : print 'Extra Data in parallel file : ', pExtra
72  if sExtra : print 'Extra Data in serial file : ', sExtra
73  if sExtra or pExtra : print 'Files will have different sizes'
74  pf.Close()
75  sf.Close()
76 
77 def switchDict( d ) :
78  # switch a dictionary around ; make the values the keys, and vice versa
79  # only works if all values are unique
80  nkeys = len( d.keys() )
81  vals = d.values()
82  nvals = len( vals )
83  for v in vals :
84  if vals.count( v ) > 1 :
85  print 'Dictionary cannot be switched, values not unique'
86  return None
87  print 'Dict has keys/values : %i/%i' % ( nkeys, nvals )
88  pairs = d.items() # returns (key, val) tuples in a list
89  newd = {}
90  for k, entry in pairs : newd[entry] = k
91  return newd
92 
93 
94 def printDict( d, name = 'unspecified' ) :
95  # Print out a dictionary in the form
96  #
97  # Dictionary Name :
98  # key value
99  # key value
100  # ...
101  #
102  print '-' * 80
103  print 'Dictionary %s : ' % ( name )
104  for k in iter( d.keys() ) :
105  print '\t', k, '\t', d[k]
106  print '-' * 80
107 
108 
109 def Reader( readerType, filename, qacross, qToEngine ) :
110  #
111  # Process for reading a file
112  # One process for reading Serial File, another for Parallel File
113  #
114  # First the order of events is determined, (parallel != serial, usually)
115  #
116  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
117  # on both Serial-Reader and Parallel-Reader processes.
118  #
119  # The string repr of everything in the TES is placed in a dictionary and
120  # sent to the comparison Process, which compares the two dictionaries
121  #
122  a = AppMgr()
123  sel = a.evtsel()
124  evt = a.evtsvc()
125 
126  header = '/Event/Rec/Header'
127  sel.open( filename )
128  ct = 0
129  order = {}
130  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
131 
132  # determine the ordering
133  while True :
134  a.run( 1 )
135  if evt[header] :
136  eNumber = int( evt[header].evtNumber() )
137  order[eNumber] = ct
138  ct += 1
139  else : break
140 
141  if readerType == SER :
142  # send the ordering details to the parallel-reader
143  order = switchDict( order )
144  qacross.put( order )
145  qacross.put( None )
146  # changeName
147  serOrder = order
148  elif readerType == PAR :
149  # receive the serial ordering from queue, and send ordering to SerialReader
150  for serOrder in iter( qacross.get, None ) : pass
151  lsks = len( serOrder.keys() )
152  lpks = len( order.keys() )
153  print 'Events in Files (serial/parallel) : %i / %i' % ( lsks, lpks )
154 
155  # now run files in the order specified by the serial ordering
156  # and send them one by one to the comparison engine
157  for i in iter( serOrder.keys() ) :
158  if readerType == PAR : i = order[serOrder[i]]
159 
160  a.runSelectedEvents( fname, i )
161  lst = evt.getList()
162 
163  lst.sort()
164  ascii = dict( [ ( l, ( evt[l].__class__.__name__, evt[l].__repr__() ) ) for l in lst ] )
165  qToEngine.put( ascii )
166  qToEngine.put( None )
167  print '%s Reader Finished' % ( readerType )
168 
169 def ComparisonEngine( pQueue, sQueue ) :
170  # The Comparison Engine runs on a seperate forked process and receives
171  # events in pairs, one each from Serial FileReader and Parallel FileReader
172  #
173  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
174  # and are compared using the compareEvents method
175  #
176  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
177  #
178  results = []
179  while True :
180  pitem = pQueue.get()
181  sitem = sQueue.get()
182  if pitem == sitem == None : print 'Termination Signals received ok' ; break
183  elif pitem == None : print 'pitem != sitem : ', pitem, sitem ; break
184  elif sitem == None : print 'pitem != sitem : ', pitem, sitem ; break
185  results.append( compareEvents( pitem, sitem ) )
186  print '=' * 80
187  print 'Comparison Engine Finished'
188  print '-' * 80
189  print 'Total Events Checked : %i' % ( len( results ) )
190  print 'Perfect Matches : %i' % ( sum( results ) )
191  print 'Errors : %i' % ( len( results ) - sum( results ) )
192  print '=' * 80
193 
195  # the __repr__() method for Event Data Objects will return a generic
196  # string "DataObject at 0xADDRESS" for non-Pythonised objects
197  # If these objects have the same path, they are equal, but this
198  # cannot be tested with "==" in Python, as the memory address will
199  # be different for the two different DataObjects, so this method
200  # will check if the difference is in the address
201  #
202  # args : a, b two string representations
203  ref = 'DataObject at 0x'
204  if a[:16] == b[:16] == ref : return True
205  else : return False
206 
207 
208 def compareEvents( s, p ) :
209  # events in form of dictionary, with form
210  # d[ path ] = tuple( className, string_repr )
211 
212  # check 1 : number of keys (paths)
213  sks = s.keys() ; pks = p.keys()
214  sks.sort() ; pks.sort()
215  if len( sks ) == len( pks ) : pass
216  else :
217  # There may be extra keys in the parallel file
218  # example: DstWriter may ask for /Event/Prev/MC/Header#1
219  # but in TESSerializer, *all* DataObjects will be sent
220  # including /Event/Prev and /Event/Prev/MC
221 
222  # check for extra keys in the parallel file which are just containing DataObjects
223  # if found, remove them
224 
225  extras = list( set( pks ) - set( sks ) )
226  for e in extras :
227  if p[e][0] == 'DataObject' : pks.remove( e )
228  else : print 'Extra Other thing found!', e, p[e][0] ; return False
229 
230  # check 2 : same paths?
231  if sks == pks : pass
232  else : return False
233 
234  # check 3 : check the content
235  l = len( sks )
236  diffs = []
237  for i in xrange( l ) :
238  key = sks[i]
239  # compare class name
240  if s[key][0] == p[key][0] : pass
241  else : diffs.append( key )
242  # compare string representation
243  if s[key][1] == p[key][1] : pass
244  elif checkForAddressDifference( p[key][1], s[key][1] ) : pass
245  else : diffs.append( key )
246 
247  # finish
248  if diffs : return False
249  else : return True
250 
251 def CheckFileRecords( par, ser ):
252 
253  print "Checking File Records"
254 
255  parFSR = GetFSRdicts( par )
256  serFSR = GetFSRdicts( ser )
257 
258 
259 
260  diff1 = set( parFSR["TimeSpanFSR"].iteritems() ) - set( serFSR["TimeSpanFSR"].iteritems() )
261  diff2 = set( parFSR["EventCountFSR"].iteritems() ) - set( serFSR["EventCountFSR"].iteritems() )
262 
263  print "\nDifferent entries in TimeSpanFSR: \t" + str( len( diff1 ) ) + "\nDifferent entries in EventCountFSR:\t" + str( len( diff2 ) )
264 
265  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
266  diff3 = set( parFSR[k]["key"] ) - set( serFSR[k]["key"] )
267  diff4 = set( parFSR[k]["incr"] ) - set( serFSR[k]["incr"] )
268  diff5 = set( parFSR[k]["integral"] ) - set( serFSR[k]["integral"] )
269  print "Different entries in " + str( k ) + ": \tkey: " + str( len( diff3 ) ) + " increment: " + str( len( diff4 ) ) + " integral: " + str( len( diff5 ) )
270 
271 
272 def LumiFSR( lumi ):
273 
274  runs = []
275  files = []
276  info = {}
277  keys = []
278 
279  for r in lumi.runNumbers() :
280  runs.append( r )
281 
282  for f in lumi.fileIDs() :
283  files.append( f )
284  s = str( lumi )
285  sa = s.split( "info (key/incr/integral) : " )[-1]
286  sa = sa.split( '/' )[:-1]
287 
288  key = []
289  incr = []
290  integral = []
291  for rec in sa :
292  k, i, t = rec.split()
293  key.append( int( k ) )
294  incr.append( int( i ) )
295  integral.append( int( t ) )
296 
297  return ( runs, files, key, incr, integral )
298 
299 
300 def GetFSRdict( filename, queue ):
301 
302 
303  FSR = { "TimeSpanFSR" : {'earliest': 0, 'latest': 0},
304  "LumiFSRBeamCrossing" : {'key': 0, 'incr': 0, 'integral':0},
305  "LumiFSRBeam1" : {'key': 0, 'incr': 0, 'integral':0},
306  "LumiFSRBeam2" : {'key': 0, 'incr': 0, 'integral':0},
307  "LumiFSRNoBeam" : {'key': 0, 'incr': 0, 'integral':0},
308  "EventCountFSR" : {'input': 0, 'output': 0, 'statusFlag': 0}}
309 
310  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" %filename
311  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
312  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');"%( 2011, 2011 )
313  exec options
314  app = AppMgr()
315  app.run( 1 )
316  fsr = app.filerecordsvc()
317 
318  lst = fsr.getHistoNames()
319 
320  if lst :
321  for l in lst :
322 
323  ob = fsr.retrieveObject( l )
324 
325 
326  if "LumiFSR" in l:
327 
328  assert ob.numberOfObjects() == 1
329  k = ob.containedObject( 0 )
330  runs, files, keys, increment, integral = LumiFSR( k )
331 
332  FSR[l[l.rfind( '/' ) + 1:]]['runs'] = runs
333  FSR[l[l.rfind( '/' ) + 1:]]['files'] = files
334  FSR[l[l.rfind( '/' ) + 1:]]['key'] = keys
335  FSR[l[l.rfind( '/' ) + 1:]]['incr'] = increment
336  FSR[l[l.rfind( '/' ) + 1:]]['integral'] = integral
337 
338 
339 
340  if "TimeSpanFSR" in l:
341 
342  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject( 0 ).earliest()
343  FSR["TimeSpanFSR"]['latest'] = ob.containedObject( 0 ).latest()
344 
345  if "EventCountFSR" in l:
346 
347  FSR["EventCountFSR"]['input'] = ob.input()
348  FSR["EventCountFSR"]['output'] = ob.output()
349  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
350 
351  app.stop()
352  app.finalize()
353 
354  queue.put( FSR )
355 
356 def CompareFSR( pout, sout ):
357 
358  parFSR = pout.get()
359  serFSR = sout.get()
360 
361  print "Comparing File Records"
362 
363  diff1 = set( parFSR["TimeSpanFSR"].iteritems() ) - set( serFSR["TimeSpanFSR"].iteritems() )
364  diff2 = set( parFSR["EventCountFSR"].iteritems() ) - set( serFSR["EventCountFSR"].iteritems() )
365 
366  print "\nDifferent entries in TimeSpanFSR: \t" + str( len( diff1 ) ) + "\nDifferent entries in EventCountFSR:\t" + str( len( diff2 ) )
367 
368  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
369  diff3 = set( parFSR[k]['key'] ) - set( serFSR[k]['key'] )
370  diff4 = set( parFSR[k]['incr'] ) - set( serFSR[k]['incr'] )
371  diff5 = set( parFSR[k]['integral'] ) - set( serFSR[k]["integral"] )
372  print "Different entries in " + str( k ) + ": \tkey: " + str( len( diff3 ) ) + " increment: " + str( len( diff4 ) ) + " integral: " + str( len( diff5 ) )
373 
374 
375  print "\nParallel: \n" + str( parFSR )
376  print "\nSerial: \n" + str( serFSR )
377 
378 if __name__ == '__main__' :
379 
380  args = sys.argv
381  args.pop( 0 ) # get rid of script name
382  if len( args ) != 2 :
383  print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
384  sys.exit( 0 )
385  else :
386  par = 'PFN:' + args[0]
387  ser = 'PFN:' + args[1]
388  print 'Parallel File to be analysed : %s' % ( par )
389  print 'Serial File to be analysed : %s' % ( ser )
390 
391 
392  pname = par[4:] # TFile doesn't need the "PFN:" prefix
393  sname = ser[4:]
394 
395  qacross = Queue()
396  pout = Queue()
397  sout = Queue()
398 
399  par = Process( target = Reader, args = ( PAR, par, qacross, pout ) )
400  ser = Process( target = Reader, args = ( SER, ser, qacross, sout ) )
401  com = Process( target = ComparisonEngine, args = ( pout, sout ) )
402 
403  #com.start() ; par.start() ; ser.start()
404  #ser.join() ; par.join() ; com.join()
405 
406  #CompareTrees( pname, sname )
407 
408  print "Check File Records"
409 
410  ser = sys.argv[0]
411  par = sys.argv[1]
412 
413  pout = Queue()
414  sout = Queue()
415 
416  sp = Process( target = GetFSRdict, args = ( ser, sout ) )
417  pp = Process( target = GetFSRdict, args = ( par, pout ) )
418  cp = Process( target = CompareFSR, args = ( pout, sout ) )
419 
420  sp.start(); pp.start(); cp.start()
421  sp.join();pp.join(); cp.join()
422 
double sum(double x, double y, double z)
A small to stream Data I/O.
Definition: OutputStream.h:27
HistogramPersistencySvc class implementation definition.
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53