The Gaudi Framework  v29r0 (ff2e7097)
compareOutputFiles.py
Go to the documentation of this file.
1 
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl
4 from ROOT import TFile, TBufferFile, TBuffer
5 from multiprocessing import Process, Queue
6 from Configurables import LHCbApp
7 import sys
8 
9 #
10 # loadFile.py
11 # -----------
12 # Open a dst file for inspection
13 #
14 
15 
16 def checkKeys(name):
17  # Check the TTree keys in each file
18  fname = name[4:] # TFile doesn't need the "PFN:" prefix
19  tf = TFile(fname, 'REC')
20 
21 
22 importOptions('$STDOPTS/LHCbApplication.opts')
23 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
24 importOptions('$GAUDICNVROOT/options/Setup.opts')
25 
26 
27 OutputStream("DstWriter").Output = ''
28 HistogramPersistencySvc().OutputFile = ''
29 MessageSvc(OutputLevel=ERROR)
30 EventSelector().PrintFreq = 100
31 
32 ApplicationMgr(OutputLevel=ERROR,
33  AppName='File Check - Serial vs Parallel')
34 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
35 
36 PAR = 'PARALLEL'
37 SER = 'SERIAL'
38 
39 
40 def CompareTrees(pname, sname):
41  pf = TFile(pname, 'REC')
42  sf = TFile(sname, 'REC')
43  event = '_Event'
44  pfks = pf.GetListOfKeys()
45  sfks = sf.GetListOfKeys()
46  pfkeys = list([pfk.GetName() for pfk in pfks])
47  pfkeys.sort()
48  sfkeys = list([sfk.GetName() for sfk in sfks])
49  sfkeys.sort()
50  pMeta = []
51  pEvent = []
52  pOther = []
53  for k in pfkeys:
54  if k.startswith(event):
55  pEvent.append(k)
56  elif k.startswith('##'):
57  pMeta.append(k)
58  else:
59  pOther.append(k)
60  sMeta = []
61  sEvent = []
62  sOther = []
63  for k in sfkeys:
64  if k.startswith(event):
65  sEvent.append(k)
66  elif k.startswith('##'):
67  sMeta.append(k)
68  else:
69  sOther.append(k)
70 
71  if pMeta == sMeta:
72  pass
73  else:
74  print 'Meta Data differs'
75 
76  if pEvent == sEvent:
77  pass
78  else:
79  print 'Event data differs'
80 
81  if pOther != sOther:
82  pset = set(pOther)
83  sset = set(sOther)
84  pExtra = pset - sset
85  sExtra = sset - pset
86  if pExtra:
87  print 'Extra Data in parallel file : ', pExtra
88  if sExtra:
89  print 'Extra Data in serial file : ', sExtra
90  if sExtra or pExtra:
91  print 'Files will have different sizes'
92  pf.Close()
93  sf.Close()
94 
95 
96 def switchDict(d):
97  # switch a dictionary around ; make the values the keys, and vice versa
98  # only works if all values are unique
99  nkeys = len(d.keys())
100  vals = d.values()
101  nvals = len(vals)
102  for v in vals:
103  if vals.count(v) > 1:
104  print 'Dictionary cannot be switched, values not unique'
105  return None
106  print 'Dict has keys/values : %i/%i' % (nkeys, nvals)
107  pairs = d.items() # returns (key, val) tuples in a list
108  newd = {}
109  for k, entry in pairs:
110  newd[entry] = k
111  return newd
112 
113 
114 def printDict(d, name='unspecified'):
115  # Print out a dictionary in the form
116  #
117  # Dictionary Name :
118  # key value
119  # key value
120  # ...
121  #
122  print '-' * 80
123  print 'Dictionary %s : ' % (name)
124  for k in iter(d.keys()):
125  print '\t', k, '\t', d[k]
126  print '-' * 80
127 
128 
129 def Reader(readerType, filename, qacross, qToEngine):
130  #
131  # Process for reading a file
132  # One process for reading Serial File, another for Parallel File
133  #
134  # First the order of events is determined, (parallel != serial, usually)
135  #
136  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
137  # on both Serial-Reader and Parallel-Reader processes.
138  #
139  # The string repr of everything in the TES is placed in a dictionary and
140  # sent to the comparison Process, which compares the two dictionaries
141  #
142  a = AppMgr()
143  sel = a.evtsel()
144  evt = a.evtsvc()
145 
146  header = '/Event/Rec/Header'
147  sel.open(filename)
148  ct = 0
149  order = {}
150  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
151 
152  # determine the ordering
153  while True:
154  a.run(1)
155  if evt[header]:
156  eNumber = int(evt[header].evtNumber())
157  order[eNumber] = ct
158  ct += 1
159  else:
160  break
161 
162  if readerType == SER:
163  # send the ordering details to the parallel-reader
164  order = switchDict(order)
165  qacross.put(order)
166  qacross.put(None)
167  # changeName
168  serOrder = order
169  elif readerType == PAR:
170  # receive the serial ordering from queue, and send ordering to SerialReader
171  for serOrder in iter(qacross.get, None):
172  pass
173  lsks = len(serOrder.keys())
174  lpks = len(order.keys())
175  print 'Events in Files (serial/parallel) : %i / %i' % (lsks, lpks)
176 
177  # now run files in the order specified by the serial ordering
178  # and send them one by one to the comparison engine
179  for i in iter(serOrder.keys()):
180  if readerType == PAR:
181  i = order[serOrder[i]]
182 
183  a.runSelectedEvents(fname, i)
184  lst = evt.getList()
185 
186  lst.sort()
187  ascii = dict(
188  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
189  qToEngine.put(ascii)
190  qToEngine.put(None)
191  print '%s Reader Finished' % (readerType)
192 
193 
194 def ComparisonEngine(pQueue, sQueue):
195  # The Comparison Engine runs on a seperate forked process and receives
196  # events in pairs, one each from Serial FileReader and Parallel FileReader
197  #
198  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
199  # and are compared using the compareEvents method
200  #
201  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
202  #
203  results = []
204  while True:
205  pitem = pQueue.get()
206  sitem = sQueue.get()
207  if pitem == sitem == None:
208  print 'Termination Signals received ok'
209  break
210  elif pitem == None:
211  print 'pitem != sitem : ', pitem, sitem; break
212  elif sitem == None:
213  print 'pitem != sitem : ', pitem, sitem; break
214  results.append(compareEvents(pitem, sitem))
215  print '=' * 80
216  print 'Comparison Engine Finished'
217  print '-' * 80
218  print 'Total Events Checked : %i' % (len(results))
219  print 'Perfect Matches : %i' % (sum(results))
220  print 'Errors : %i' % (len(results) - sum(results))
221  print '=' * 80
222 
223 
225  # the __repr__() method for Event Data Objects will return a generic
226  # string "DataObject at 0xADDRESS" for non-Pythonised objects
227  # If these objects have the same path, they are equal, but this
228  # cannot be tested with "==" in Python, as the memory address will
229  # be different for the two different DataObjects, so this method
230  # will check if the difference is in the address
231  #
232  # args : a, b two string representations
233  ref = 'DataObject at 0x'
234  if a[:16] == b[:16] == ref:
235  return True
236  else:
237  return False
238 
239 
240 def compareEvents(s, p):
241  # events in form of dictionary, with form
242  # d[ path ] = tuple( className, string_repr )
243 
244  # check 1 : number of keys (paths)
245  sks = s.keys()
246  pks = p.keys()
247  sks.sort()
248  pks.sort()
249  if len(sks) == len(pks):
250  pass
251  else:
252  # There may be extra keys in the parallel file
253  # example: DstWriter may ask for /Event/Prev/MC/Header#1
254  # but in TESSerializer, *all* DataObjects will be sent
255  # including /Event/Prev and /Event/Prev/MC
256 
257  # check for extra keys in the parallel file which are just containing DataObjects
258  # if found, remove them
259 
260  extras = list(set(pks) - set(sks))
261  for e in extras:
262  if p[e][0] == 'DataObject':
263  pks.remove(e)
264  else:
265  print 'Extra Other thing found!', e, p[e][0]
266  return False
267 
268  # check 2 : same paths?
269  if sks == pks:
270  pass
271  else:
272  return False
273 
274  # check 3 : check the content
275  l = len(sks)
276  diffs = []
277  for i in xrange(l):
278  key = sks[i]
279  # compare class name
280  if s[key][0] == p[key][0]:
281  pass
282  else:
283  diffs.append(key)
284  # compare string representation
285  if s[key][1] == p[key][1]:
286  pass
287  elif checkForAddressDifference(p[key][1], s[key][1]):
288  pass
289  else:
290  diffs.append(key)
291 
292  # finish
293  if diffs:
294  return False
295  else:
296  return True
297 
298 
299 def CheckFileRecords(par, ser):
300 
301  print "Checking File Records"
302 
303  parFSR = GetFSRdicts(par)
304  serFSR = GetFSRdicts(ser)
305 
306  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
307  set(serFSR["TimeSpanFSR"].iteritems())
308  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
309  set(serFSR["EventCountFSR"].iteritems())
310 
311  print "\nDifferent entries in TimeSpanFSR: \t" + str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
312 
313  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
314  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
315  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
316  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
317  print "Different entries in " + str(k) + ": \tkey: " + str(len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
318 
319 
320 def LumiFSR(lumi):
321 
322  runs = []
323  files = []
324  info = {}
325  keys = []
326 
327  for r in lumi.runNumbers():
328  runs.append(r)
329 
330  for f in lumi.fileIDs():
331  files.append(f)
332  s = str(lumi)
333  sa = s.split("info (key/incr/integral) : ")[-1]
334  sa = sa.split('/')[:-1]
335 
336  key = []
337  incr = []
338  integral = []
339  for rec in sa:
340  k, i, t = rec.split()
341  key.append(int(k))
342  incr.append(int(i))
343  integral.append(int(t))
344 
345  return (runs, files, key, incr, integral)
346 
347 
348 def GetFSRdict(filename, queue):
349 
350  FSR = {"TimeSpanFSR": {'earliest': 0, 'latest': 0},
351  "LumiFSRBeamCrossing": {'key': 0, 'incr': 0, 'integral': 0},
352  "LumiFSRBeam1": {'key': 0, 'incr': 0, 'integral': 0},
353  "LumiFSRBeam2": {'key': 0, 'incr': 0, 'integral': 0},
354  "LumiFSRNoBeam": {'key': 0, 'incr': 0, 'integral': 0},
355  "EventCountFSR": {'input': 0, 'output': 0, 'statusFlag': 0}}
356 
357  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
358  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
359  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
360  2011, 2011)
361  exec options
362  app = AppMgr()
363  app.run(1)
364  fsr = app.filerecordsvc()
365 
366  lst = fsr.getHistoNames()
367 
368  if lst:
369  for l in lst:
370 
371  ob = fsr.retrieveObject(l)
372 
373  if "LumiFSR" in l:
374 
375  assert ob.numberOfObjects() == 1
376  k = ob.containedObject(0)
377  runs, files, keys, increment, integral = LumiFSR(k)
378 
379  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
380  FSR[l[l.rfind('/') + 1:]]['files'] = files
381  FSR[l[l.rfind('/') + 1:]]['key'] = keys
382  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
383  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
384 
385  if "TimeSpanFSR" in l:
386 
387  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
388  0).earliest()
389  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
390 
391  if "EventCountFSR" in l:
392 
393  FSR["EventCountFSR"]['input'] = ob.input()
394  FSR["EventCountFSR"]['output'] = ob.output()
395  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
396 
397  app.stop()
398  app.finalize()
399 
400  queue.put(FSR)
401 
402 
403 def CompareFSR(pout, sout):
404 
405  parFSR = pout.get()
406  serFSR = sout.get()
407 
408  print "Comparing File Records"
409 
410  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
411  set(serFSR["TimeSpanFSR"].iteritems())
412  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
413  set(serFSR["EventCountFSR"].iteritems())
414 
415  print "\nDifferent entries in TimeSpanFSR: \t" + str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
416 
417  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
418  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
419  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
420  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
421  print "Different entries in " + str(k) + ": \tkey: " + str(len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
422 
423  print "\nParallel: \n" + str(parFSR)
424  print "\nSerial: \n" + str(serFSR)
425 
426 
427 if __name__ == '__main__':
428 
429  args = sys.argv
430  args.pop(0) # get rid of script name
431  if len(args) != 2:
432  print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
433  sys.exit(0)
434  else:
435  par = 'PFN:' + args[0]
436  ser = 'PFN:' + args[1]
437  print 'Parallel File to be analysed : %s' % (par)
438  print 'Serial File to be analysed : %s' % (ser)
439 
440  pname = par[4:] # TFile doesn't need the "PFN:" prefix
441  sname = ser[4:]
442 
443  qacross = Queue()
444  pout = Queue()
445  sout = Queue()
446 
447  par = Process(target=Reader, args=(PAR, par, qacross, pout))
448  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
449  com = Process(target=ComparisonEngine, args=(pout, sout))
450 
451  #com.start() ; par.start() ; ser.start()
452  #ser.join() ; par.join() ; com.join()
453 
454  #CompareTrees( pname, sname )
455 
456  print "Check File Records"
457 
458  ser = sys.argv[0]
459  par = sys.argv[1]
460 
461  pout = Queue()
462  sout = Queue()
463 
464  sp = Process(target=GetFSRdict, args=(ser, sout))
465  pp = Process(target=GetFSRdict, args=(par, pout))
466  cp = Process(target=CompareFSR, args=(pout, sout))
467 
468  sp.start()
469  pp.start()
470  cp.start()
471  sp.join()
472  pp.join()
473  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
double sum(double x, double y, double z)
A small to stream Data I/O.
Definition: OutputStream.h:29
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53
def Reader(readerType, filename, qacross, qToEngine)