The Gaudi Framework  v29r2 (7a580596)
compareOutputFiles.py
Go to the documentation of this file.
1 
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl
4 from ROOT import TFile, TBufferFile, TBuffer
5 from multiprocessing import Process, Queue
6 from Configurables import LHCbApp
7 import sys
8 
9 #
10 # loadFile.py
11 # -----------
12 # Open a dst file for inspection
13 #
14 
15 
16 def checkKeys(name):
17  # Check the TTree keys in each file
18  fname = name[4:] # TFile doesn't need the "PFN:" prefix
19  tf = TFile(fname, 'REC')
20 
21 
22 importOptions('$STDOPTS/LHCbApplication.opts')
23 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
24 importOptions('$GAUDICNVROOT/options/Setup.opts')
25 
26 
27 OutputStream("DstWriter").Output = ''
28 HistogramPersistencySvc().OutputFile = ''
29 MessageSvc(OutputLevel=ERROR)
30 EventSelector().PrintFreq = 100
31 
32 ApplicationMgr(OutputLevel=ERROR,
33  AppName='File Check - Serial vs Parallel')
34 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
35 
36 PAR = 'PARALLEL'
37 SER = 'SERIAL'
38 
39 
40 def CompareTrees(pname, sname):
41  pf = TFile(pname, 'REC')
42  sf = TFile(sname, 'REC')
43  event = '_Event'
44  pfks = pf.GetListOfKeys()
45  sfks = sf.GetListOfKeys()
46  pfkeys = list([pfk.GetName() for pfk in pfks])
47  pfkeys.sort()
48  sfkeys = list([sfk.GetName() for sfk in sfks])
49  sfkeys.sort()
50  pMeta = []
51  pEvent = []
52  pOther = []
53  for k in pfkeys:
54  if k.startswith(event):
55  pEvent.append(k)
56  elif k.startswith('##'):
57  pMeta.append(k)
58  else:
59  pOther.append(k)
60  sMeta = []
61  sEvent = []
62  sOther = []
63  for k in sfkeys:
64  if k.startswith(event):
65  sEvent.append(k)
66  elif k.startswith('##'):
67  sMeta.append(k)
68  else:
69  sOther.append(k)
70 
71  if pMeta == sMeta:
72  pass
73  else:
74  print 'Meta Data differs'
75 
76  if pEvent == sEvent:
77  pass
78  else:
79  print 'Event data differs'
80 
81  if pOther != sOther:
82  pset = set(pOther)
83  sset = set(sOther)
84  pExtra = pset - sset
85  sExtra = sset - pset
86  if pExtra:
87  print 'Extra Data in parallel file : ', pExtra
88  if sExtra:
89  print 'Extra Data in serial file : ', sExtra
90  if sExtra or pExtra:
91  print 'Files will have different sizes'
92  pf.Close()
93  sf.Close()
94 
95 
96 def switchDict(d):
97  # switch a dictionary around ; make the values the keys, and vice versa
98  # only works if all values are unique
99  nkeys = len(d.keys())
100  vals = d.values()
101  nvals = len(vals)
102  for v in vals:
103  if vals.count(v) > 1:
104  print 'Dictionary cannot be switched, values not unique'
105  return None
106  print 'Dict has keys/values : %i/%i' % (nkeys, nvals)
107  pairs = d.items() # returns (key, val) tuples in a list
108  newd = {}
109  for k, entry in pairs:
110  newd[entry] = k
111  return newd
112 
113 
114 def printDict(d, name='unspecified'):
115  # Print out a dictionary in the form
116  #
117  # Dictionary Name :
118  # key value
119  # key value
120  # ...
121  #
122  print '-' * 80
123  print 'Dictionary %s : ' % (name)
124  for k in iter(d.keys()):
125  print '\t', k, '\t', d[k]
126  print '-' * 80
127 
128 
129 def Reader(readerType, filename, qacross, qToEngine):
130  #
131  # Process for reading a file
132  # One process for reading Serial File, another for Parallel File
133  #
134  # First the order of events is determined, (parallel != serial, usually)
135  #
136  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
137  # on both Serial-Reader and Parallel-Reader processes.
138  #
139  # The string repr of everything in the TES is placed in a dictionary and
140  # sent to the comparison Process, which compares the two dictionaries
141  #
142  a = AppMgr()
143  sel = a.evtsel()
144  evt = a.evtsvc()
145 
146  header = '/Event/Rec/Header'
147  sel.open(filename)
148  ct = 0
149  order = {}
150  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
151 
152  # determine the ordering
153  while True:
154  a.run(1)
155  if evt[header]:
156  eNumber = int(evt[header].evtNumber())
157  order[eNumber] = ct
158  ct += 1
159  else:
160  break
161 
162  if readerType == SER:
163  # send the ordering details to the parallel-reader
164  order = switchDict(order)
165  qacross.put(order)
166  qacross.put(None)
167  # changeName
168  serOrder = order
169  elif readerType == PAR:
170  # receive the serial ordering from queue, and send ordering to SerialReader
171  for serOrder in iter(qacross.get, None):
172  pass
173  lsks = len(serOrder.keys())
174  lpks = len(order.keys())
175  print 'Events in Files (serial/parallel) : %i / %i' % (lsks, lpks)
176 
177  # now run files in the order specified by the serial ordering
178  # and send them one by one to the comparison engine
179  for i in iter(serOrder.keys()):
180  if readerType == PAR:
181  i = order[serOrder[i]]
182 
183  a.runSelectedEvents(fname, i)
184  lst = evt.getList()
185 
186  lst.sort()
187  ascii = dict(
188  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
189  qToEngine.put(ascii)
190  qToEngine.put(None)
191  print '%s Reader Finished' % (readerType)
192 
193 
194 def ComparisonEngine(pQueue, sQueue):
195  # The Comparison Engine runs on a seperate forked process and receives
196  # events in pairs, one each from Serial FileReader and Parallel FileReader
197  #
198  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
199  # and are compared using the compareEvents method
200  #
201  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
202  #
203  results = []
204  while True:
205  pitem = pQueue.get()
206  sitem = sQueue.get()
207  if pitem == sitem == None:
208  print 'Termination Signals received ok'
209  break
210  elif pitem == None:
211  print 'pitem != sitem : ', pitem, sitem
212  break
213  elif sitem == None:
214  print 'pitem != sitem : ', pitem, sitem
215  break
216  results.append(compareEvents(pitem, sitem))
217  print '=' * 80
218  print 'Comparison Engine Finished'
219  print '-' * 80
220  print 'Total Events Checked : %i' % (len(results))
221  print 'Perfect Matches : %i' % (sum(results))
222  print 'Errors : %i' % (len(results) - sum(results))
223  print '=' * 80
224 
225 
227  # the __repr__() method for Event Data Objects will return a generic
228  # string "DataObject at 0xADDRESS" for non-Pythonised objects
229  # If these objects have the same path, they are equal, but this
230  # cannot be tested with "==" in Python, as the memory address will
231  # be different for the two different DataObjects, so this method
232  # will check if the difference is in the address
233  #
234  # args : a, b two string representations
235  ref = 'DataObject at 0x'
236  if a[:16] == b[:16] == ref:
237  return True
238  else:
239  return False
240 
241 
242 def compareEvents(s, p):
243  # events in form of dictionary, with form
244  # d[ path ] = tuple( className, string_repr )
245 
246  # check 1 : number of keys (paths)
247  sks = s.keys()
248  pks = p.keys()
249  sks.sort()
250  pks.sort()
251  if len(sks) == len(pks):
252  pass
253  else:
254  # There may be extra keys in the parallel file
255  # example: DstWriter may ask for /Event/Prev/MC/Header#1
256  # but in TESSerializer, *all* DataObjects will be sent
257  # including /Event/Prev and /Event/Prev/MC
258 
259  # check for extra keys in the parallel file which are just containing DataObjects
260  # if found, remove them
261 
262  extras = list(set(pks) - set(sks))
263  for e in extras:
264  if p[e][0] == 'DataObject':
265  pks.remove(e)
266  else:
267  print 'Extra Other thing found!', e, p[e][0]
268  return False
269 
270  # check 2 : same paths?
271  if sks == pks:
272  pass
273  else:
274  return False
275 
276  # check 3 : check the content
277  l = len(sks)
278  diffs = []
279  for i in xrange(l):
280  key = sks[i]
281  # compare class name
282  if s[key][0] == p[key][0]:
283  pass
284  else:
285  diffs.append(key)
286  # compare string representation
287  if s[key][1] == p[key][1]:
288  pass
289  elif checkForAddressDifference(p[key][1], s[key][1]):
290  pass
291  else:
292  diffs.append(key)
293 
294  # finish
295  if diffs:
296  return False
297  else:
298  return True
299 
300 
301 def CheckFileRecords(par, ser):
302 
303  print "Checking File Records"
304 
305  parFSR = GetFSRdicts(par)
306  serFSR = GetFSRdicts(ser)
307 
308  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
309  set(serFSR["TimeSpanFSR"].iteritems())
310  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
311  set(serFSR["EventCountFSR"].iteritems())
312 
313  print "\nDifferent entries in TimeSpanFSR: \t" + str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
314 
315  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
316  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
317  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
318  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
319  print "Different entries in " + str(k) + ": \tkey: " + str(len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
320 
321 
322 def LumiFSR(lumi):
323 
324  runs = []
325  files = []
326  info = {}
327  keys = []
328 
329  for r in lumi.runNumbers():
330  runs.append(r)
331 
332  for f in lumi.fileIDs():
333  files.append(f)
334  s = str(lumi)
335  sa = s.split("info (key/incr/integral) : ")[-1]
336  sa = sa.split('/')[:-1]
337 
338  key = []
339  incr = []
340  integral = []
341  for rec in sa:
342  k, i, t = rec.split()
343  key.append(int(k))
344  incr.append(int(i))
345  integral.append(int(t))
346 
347  return (runs, files, key, incr, integral)
348 
349 
350 def GetFSRdict(filename, queue):
351 
352  FSR = {"TimeSpanFSR": {'earliest': 0, 'latest': 0},
353  "LumiFSRBeamCrossing": {'key': 0, 'incr': 0, 'integral': 0},
354  "LumiFSRBeam1": {'key': 0, 'incr': 0, 'integral': 0},
355  "LumiFSRBeam2": {'key': 0, 'incr': 0, 'integral': 0},
356  "LumiFSRNoBeam": {'key': 0, 'incr': 0, 'integral': 0},
357  "EventCountFSR": {'input': 0, 'output': 0, 'statusFlag': 0}}
358 
359  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
360  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
361  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
362  2011, 2011)
363  exec options
364  app = AppMgr()
365  app.run(1)
366  fsr = app.filerecordsvc()
367 
368  lst = fsr.getHistoNames()
369 
370  if lst:
371  for l in lst:
372 
373  ob = fsr.retrieveObject(l)
374 
375  if "LumiFSR" in l:
376 
377  assert ob.numberOfObjects() == 1
378  k = ob.containedObject(0)
379  runs, files, keys, increment, integral = LumiFSR(k)
380 
381  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
382  FSR[l[l.rfind('/') + 1:]]['files'] = files
383  FSR[l[l.rfind('/') + 1:]]['key'] = keys
384  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
385  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
386 
387  if "TimeSpanFSR" in l:
388 
389  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
390  0).earliest()
391  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
392 
393  if "EventCountFSR" in l:
394 
395  FSR["EventCountFSR"]['input'] = ob.input()
396  FSR["EventCountFSR"]['output'] = ob.output()
397  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
398 
399  app.stop()
400  app.finalize()
401 
402  queue.put(FSR)
403 
404 
405 def CompareFSR(pout, sout):
406 
407  parFSR = pout.get()
408  serFSR = sout.get()
409 
410  print "Comparing File Records"
411 
412  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
413  set(serFSR["TimeSpanFSR"].iteritems())
414  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
415  set(serFSR["EventCountFSR"].iteritems())
416 
417  print "\nDifferent entries in TimeSpanFSR: \t" + str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
418 
419  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
420  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
421  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
422  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
423  print "Different entries in " + str(k) + ": \tkey: " + str(len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
424 
425  print "\nParallel: \n" + str(parFSR)
426  print "\nSerial: \n" + str(serFSR)
427 
428 
429 if __name__ == '__main__':
430 
431  args = sys.argv
432  args.pop(0) # get rid of script name
433  if len(args) != 2:
434  print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
435  sys.exit(0)
436  else:
437  par = 'PFN:' + args[0]
438  ser = 'PFN:' + args[1]
439  print 'Parallel File to be analysed : %s' % (par)
440  print 'Serial File to be analysed : %s' % (ser)
441 
442  pname = par[4:] # TFile doesn't need the "PFN:" prefix
443  sname = ser[4:]
444 
445  qacross = Queue()
446  pout = Queue()
447  sout = Queue()
448 
449  par = Process(target=Reader, args=(PAR, par, qacross, pout))
450  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
451  com = Process(target=ComparisonEngine, args=(pout, sout))
452 
453  #com.start() ; par.start() ; ser.start()
454  #ser.join() ; par.join() ; com.join()
455 
456  #CompareTrees( pname, sname )
457 
458  print "Check File Records"
459 
460  ser = sys.argv[0]
461  par = sys.argv[1]
462 
463  pout = Queue()
464  sout = Queue()
465 
466  sp = Process(target=GetFSRdict, args=(ser, sout))
467  pp = Process(target=GetFSRdict, args=(par, pout))
468  cp = Process(target=CompareFSR, args=(pout, sout))
469 
470  sp.start()
471  pp.start()
472  cp.start()
473  sp.join()
474  pp.join()
475  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
double sum(double x, double y, double z)
A small to stream Data I/O.
Definition: OutputStream.h:29
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53
def Reader(readerType, filename, qacross, qToEngine)