The Gaudi Framework  v32r2 (46d42edc)
compareOutputFiles.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl
4 from ROOT import TFile, TBufferFile, TBuffer
5 from multiprocessing import Process, Queue
6 from Configurables import LHCbApp
7 import sys
8 
9 #
10 # loadFile.py
11 # -----------
12 # Open a dst file for inspection
13 #
14 
15 
16 def checkKeys(name):
17  # Check the TTree keys in each file
18  fname = name[4:] # TFile doesn't need the "PFN:" prefix
19  tf = TFile(fname, 'REC')
20 
21 
22 importOptions('$STDOPTS/LHCbApplication.opts')
23 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
24 importOptions('$GAUDICNVROOT/options/Setup.opts')
25 
26 OutputStream("DstWriter").Output = ''
27 HistogramPersistencySvc().OutputFile = ''
28 MessageSvc(OutputLevel=ERROR)
29 EventSelector().PrintFreq = 100
30 
31 ApplicationMgr(OutputLevel=ERROR, AppName='File Check - Serial vs Parallel')
32 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
33 
34 PAR = 'PARALLEL'
35 SER = 'SERIAL'
36 
37 
38 def CompareTrees(pname, sname):
39  pf = TFile(pname, 'REC')
40  sf = TFile(sname, 'REC')
41  event = '_Event'
42  pfks = pf.GetListOfKeys()
43  sfks = sf.GetListOfKeys()
44  pfkeys = list([pfk.GetName() for pfk in pfks])
45  pfkeys.sort()
46  sfkeys = list([sfk.GetName() for sfk in sfks])
47  sfkeys.sort()
48  pMeta = []
49  pEvent = []
50  pOther = []
51  for k in pfkeys:
52  if k.startswith(event):
53  pEvent.append(k)
54  elif k.startswith('##'):
55  pMeta.append(k)
56  else:
57  pOther.append(k)
58  sMeta = []
59  sEvent = []
60  sOther = []
61  for k in sfkeys:
62  if k.startswith(event):
63  sEvent.append(k)
64  elif k.startswith('##'):
65  sMeta.append(k)
66  else:
67  sOther.append(k)
68 
69  if pMeta == sMeta:
70  pass
71  else:
72  print('Meta Data differs')
73 
74  if pEvent == sEvent:
75  pass
76  else:
77  print('Event data differs')
78 
79  if pOther != sOther:
80  pset = set(pOther)
81  sset = set(sOther)
82  pExtra = pset - sset
83  sExtra = sset - pset
84  if pExtra:
85  print('Extra Data in parallel file : ', pExtra)
86  if sExtra:
87  print('Extra Data in serial file : ', sExtra)
88  if sExtra or pExtra:
89  print('Files will have different sizes')
90  pf.Close()
91  sf.Close()
92 
93 
94 def switchDict(d):
95  # switch a dictionary around ; make the values the keys, and vice versa
96  # only works if all values are unique
97  nkeys = len(d.keys())
98  vals = d.values()
99  nvals = len(vals)
100  for v in vals:
101  if vals.count(v) > 1:
102  print('Dictionary cannot be switched, values not unique')
103  return None
104  print('Dict has keys/values : %i/%i' % (nkeys, nvals))
105  pairs = d.items() # returns (key, val) tuples in a list
106  newd = {}
107  for k, entry in pairs:
108  newd[entry] = k
109  return newd
110 
111 
112 def printDict(d, name='unspecified'):
113  # Print out a dictionary in the form
114  #
115  # Dictionary Name :
116  # key value
117  # key value
118  # ...
119  #
120  print('-' * 80)
121  print('Dictionary %s : ' % (name))
122  for k in iter(d.keys()):
123  print('\t', k, '\t', d[k])
124  print('-' * 80)
125 
126 
127 def Reader(readerType, filename, qacross, qToEngine):
128  #
129  # Process for reading a file
130  # One process for reading Serial File, another for Parallel File
131  #
132  # First the order of events is determined, (parallel != serial, usually)
133  #
134  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
135  # on both Serial-Reader and Parallel-Reader processes.
136  #
137  # The string repr of everything in the TES is placed in a dictionary and
138  # sent to the comparison Process, which compares the two dictionaries
139  #
140  a = AppMgr()
141  sel = a.evtsel()
142  evt = a.evtsvc()
143 
144  header = '/Event/Rec/Header'
145  sel.open(filename)
146  ct = 0
147  order = {}
148  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
149 
150  # determine the ordering
151  while True:
152  a.run(1)
153  if evt[header]:
154  eNumber = int(evt[header].evtNumber())
155  order[eNumber] = ct
156  ct += 1
157  else:
158  break
159 
160  if readerType == SER:
161  # send the ordering details to the parallel-reader
162  order = switchDict(order)
163  qacross.put(order)
164  qacross.put(None)
165  # changeName
166  serOrder = order
167  elif readerType == PAR:
168  # receive the serial ordering from queue, and send ordering to SerialReader
169  for serOrder in iter(qacross.get, None):
170  pass
171  lsks = len(serOrder.keys())
172  lpks = len(order.keys())
173  print('Events in Files (serial/parallel) : %i / %i' % (lsks, lpks))
174 
175  # now run files in the order specified by the serial ordering
176  # and send them one by one to the comparison engine
177  for i in iter(serOrder.keys()):
178  if readerType == PAR:
179  i = order[serOrder[i]]
180 
181  a.runSelectedEvents(fname, i)
182  lst = evt.getList()
183 
184  lst.sort()
185  ascii = dict(
186  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
187  qToEngine.put(ascii)
188  qToEngine.put(None)
189  print('%s Reader Finished' % (readerType))
190 
191 
192 def ComparisonEngine(pQueue, sQueue):
193  # The Comparison Engine runs on a seperate forked process and receives
194  # events in pairs, one each from Serial FileReader and Parallel FileReader
195  #
196  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
197  # and are compared using the compareEvents method
198  #
199  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
200  #
201  results = []
202  while True:
203  pitem = pQueue.get()
204  sitem = sQueue.get()
205  if pitem == sitem == None:
206  print('Termination Signals received ok')
207  break
208  elif pitem == None:
209  print('pitem != sitem : ', pitem, sitem)
210  break
211  elif sitem == None:
212  print('pitem != sitem : ', pitem, sitem)
213  break
214  results.append(compareEvents(pitem, sitem))
215  print('=' * 80)
216  print('Comparison Engine Finished')
217  print('-' * 80)
218  print('Total Events Checked : %i' % (len(results)))
219  print('Perfect Matches : %i' % (sum(results)))
220  print('Errors : %i' % (len(results) - sum(results)))
221  print('=' * 80)
222 
223 
225  # the __repr__() method for Event Data Objects will return a generic
226  # string "DataObject at 0xADDRESS" for non-Pythonised objects
227  # If these objects have the same path, they are equal, but this
228  # cannot be tested with "==" in Python, as the memory address will
229  # be different for the two different DataObjects, so this method
230  # will check if the difference is in the address
231  #
232  # args : a, b two string representations
233  ref = 'DataObject at 0x'
234  if a[:16] == b[:16] == ref:
235  return True
236  else:
237  return False
238 
239 
240 def compareEvents(s, p):
241  # events in form of dictionary, with form
242  # d[ path ] = tuple( className, string_repr )
243 
244  # check 1 : number of keys (paths)
245  sks = s.keys()
246  pks = p.keys()
247  sks.sort()
248  pks.sort()
249  if len(sks) == len(pks):
250  pass
251  else:
252  # There may be extra keys in the parallel file
253  # example: DstWriter may ask for /Event/Prev/MC/Header#1
254  # but in TESSerializer, *all* DataObjects will be sent
255  # including /Event/Prev and /Event/Prev/MC
256 
257  # check for extra keys in the parallel file which are just containing DataObjects
258  # if found, remove them
259 
260  extras = list(set(pks) - set(sks))
261  for e in extras:
262  if p[e][0] == 'DataObject':
263  pks.remove(e)
264  else:
265  print('Extra Other thing found!', e, p[e][0])
266  return False
267 
268  # check 2 : same paths?
269  if sks == pks:
270  pass
271  else:
272  return False
273 
274  # check 3 : check the content
275  l = len(sks)
276  diffs = []
277  for i in range(l):
278  key = sks[i]
279  # compare class name
280  if s[key][0] == p[key][0]:
281  pass
282  else:
283  diffs.append(key)
284  # compare string representation
285  if s[key][1] == p[key][1]:
286  pass
287  elif checkForAddressDifference(p[key][1], s[key][1]):
288  pass
289  else:
290  diffs.append(key)
291 
292  # finish
293  if diffs:
294  return False
295  else:
296  return True
297 
298 
299 def CheckFileRecords(par, ser):
300 
301  print("Checking File Records")
302 
303  parFSR = GetFSRdicts(par)
304  serFSR = GetFSRdicts(ser)
305 
306  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
307  set(serFSR["TimeSpanFSR"].iteritems())
308  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
309  set(serFSR["EventCountFSR"].iteritems())
310 
311  print("\nDifferent entries in TimeSpanFSR: \t" + \
312  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
313 
314  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
315  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
316  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
317  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
318  print("Different entries in " + str(k) + ": \tkey: " +
319  str(len(diff3)) + " increment: " + str(len(diff4)) +
320  " integral: " + str(len(diff5)))
321 
322 
323 def LumiFSR(lumi):
324 
325  runs = []
326  files = []
327  info = {}
328  keys = []
329 
330  for r in lumi.runNumbers():
331  runs.append(r)
332 
333  for f in lumi.fileIDs():
334  files.append(f)
335  s = str(lumi)
336  sa = s.split("info (key/incr/integral) : ")[-1]
337  sa = sa.split('/')[:-1]
338 
339  key = []
340  incr = []
341  integral = []
342  for rec in sa:
343  k, i, t = rec.split()
344  key.append(int(k))
345  incr.append(int(i))
346  integral.append(int(t))
347 
348  return (runs, files, key, incr, integral)
349 
350 
351 def GetFSRdict(filename, queue):
352 
353  FSR = {
354  "TimeSpanFSR": {
355  'earliest': 0,
356  'latest': 0
357  },
358  "LumiFSRBeamCrossing": {
359  'key': 0,
360  'incr': 0,
361  'integral': 0
362  },
363  "LumiFSRBeam1": {
364  'key': 0,
365  'incr': 0,
366  'integral': 0
367  },
368  "LumiFSRBeam2": {
369  'key': 0,
370  'incr': 0,
371  'integral': 0
372  },
373  "LumiFSRNoBeam": {
374  'key': 0,
375  'incr': 0,
376  'integral': 0
377  },
378  "EventCountFSR": {
379  'input': 0,
380  'output': 0,
381  'statusFlag': 0
382  }
383  }
384 
385  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
386  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
387  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
388  2011, 2011)
389  exec (options)
390  app = AppMgr()
391  app.run(1)
392  fsr = app.filerecordsvc()
393 
394  lst = fsr.getHistoNames()
395 
396  if lst:
397  for l in lst:
398 
399  ob = fsr.retrieveObject(l)
400 
401  if "LumiFSR" in l:
402 
403  assert ob.numberOfObjects() == 1
404  k = ob.containedObject(0)
405  runs, files, keys, increment, integral = LumiFSR(k)
406 
407  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
408  FSR[l[l.rfind('/') + 1:]]['files'] = files
409  FSR[l[l.rfind('/') + 1:]]['key'] = keys
410  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
411  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
412 
413  if "TimeSpanFSR" in l:
414 
415  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
416  0).earliest()
417  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
418 
419  if "EventCountFSR" in l:
420 
421  FSR["EventCountFSR"]['input'] = ob.input()
422  FSR["EventCountFSR"]['output'] = ob.output()
423  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
424 
425  app.stop()
426  app.finalize()
427 
428  queue.put(FSR)
429 
430 
431 def CompareFSR(pout, sout):
432 
433  parFSR = pout.get()
434  serFSR = sout.get()
435 
436  print("Comparing File Records")
437 
438  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
439  set(serFSR["TimeSpanFSR"].iteritems())
440  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
441  set(serFSR["EventCountFSR"].iteritems())
442 
443  print("\nDifferent entries in TimeSpanFSR: \t" + \
444  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
445 
446  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
447  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
448  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
449  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
450  print("Different entries in " + str(k) + ": \tkey: " +
451  str(len(diff3)) + " increment: " + str(len(diff4)) +
452  " integral: " + str(len(diff5)))
453 
454  print("\nParallel: \n" + str(parFSR))
455  print("\nSerial: \n" + str(serFSR))
456 
457 
458 if __name__ == '__main__':
459 
460  args = sys.argv
461  args.pop(0) # get rid of script name
462  if len(args) != 2:
463  print(
464  'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
465  )
466  sys.exit(0)
467  else:
468  par = 'PFN:' + args[0]
469  ser = 'PFN:' + args[1]
470  print('Parallel File to be analysed : %s' % (par))
471  print('Serial File to be analysed : %s' % (ser))
472 
473  pname = par[4:] # TFile doesn't need the "PFN:" prefix
474  sname = ser[4:]
475 
476  qacross = Queue()
477  pout = Queue()
478  sout = Queue()
479 
480  par = Process(target=Reader, args=(PAR, par, qacross, pout))
481  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
482  com = Process(target=ComparisonEngine, args=(pout, sout))
483 
484  #com.start() ; par.start() ; ser.start()
485  #ser.join() ; par.join() ; com.join()
486 
487  #CompareTrees( pname, sname )
488 
489  print("Check File Records")
490 
491  ser = sys.argv[0]
492  par = sys.argv[1]
493 
494  pout = Queue()
495  sout = Queue()
496 
497  sp = Process(target=GetFSRdict, args=(ser, sout))
498  pp = Process(target=GetFSRdict, args=(par, pout))
499  cp = Process(target=CompareFSR, args=(pout, sout))
500 
501  sp.start()
502  pp.start()
503  cp.start()
504  sp.join()
505  pp.join()
506  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
A small to stream Data I/O.
Definition: OutputStream.h:28
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
def Reader(readerType, filename, qacross, qToEngine)