The Gaudi Framework  v33r0 (d5ea422b)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 from Gaudi.Configuration import *
13 from GaudiPython import AppMgr, gbl
14 from ROOT import TFile, TBufferFile, TBuffer
15 from multiprocessing import Process, Queue
16 from Configurables import LHCbApp
17 import sys
18 
19 #
20 # loadFile.py
21 # -----------
22 # Open a dst file for inspection
23 #
24 
25 
26 def checkKeys(name):
27  # Check the TTree keys in each file
28  fname = name[4:] # TFile doesn't need the "PFN:" prefix
29  tf = TFile(fname, 'REC')
30 
31 
32 importOptions('$STDOPTS/LHCbApplication.opts')
33 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
34 importOptions('$GAUDICNVROOT/options/Setup.opts')
35 
36 OutputStream("DstWriter").Output = ''
37 HistogramPersistencySvc().OutputFile = ''
38 MessageSvc(OutputLevel=ERROR)
39 EventSelector().PrintFreq = 100
40 
41 ApplicationMgr(OutputLevel=ERROR, AppName='File Check - Serial vs Parallel')
42 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
43 
44 PAR = 'PARALLEL'
45 SER = 'SERIAL'
46 
47 
48 def CompareTrees(pname, sname):
49  pf = TFile(pname, 'REC')
50  sf = TFile(sname, 'REC')
51  event = '_Event'
52  pfks = pf.GetListOfKeys()
53  sfks = sf.GetListOfKeys()
54  pfkeys = list([pfk.GetName() for pfk in pfks])
55  pfkeys.sort()
56  sfkeys = list([sfk.GetName() for sfk in sfks])
57  sfkeys.sort()
58  pMeta = []
59  pEvent = []
60  pOther = []
61  for k in pfkeys:
62  if k.startswith(event):
63  pEvent.append(k)
64  elif k.startswith('##'):
65  pMeta.append(k)
66  else:
67  pOther.append(k)
68  sMeta = []
69  sEvent = []
70  sOther = []
71  for k in sfkeys:
72  if k.startswith(event):
73  sEvent.append(k)
74  elif k.startswith('##'):
75  sMeta.append(k)
76  else:
77  sOther.append(k)
78 
79  if pMeta == sMeta:
80  pass
81  else:
82  print('Meta Data differs')
83 
84  if pEvent == sEvent:
85  pass
86  else:
87  print('Event data differs')
88 
89  if pOther != sOther:
90  pset = set(pOther)
91  sset = set(sOther)
92  pExtra = pset - sset
93  sExtra = sset - pset
94  if pExtra:
95  print('Extra Data in parallel file : ', pExtra)
96  if sExtra:
97  print('Extra Data in serial file : ', sExtra)
98  if sExtra or pExtra:
99  print('Files will have different sizes')
100  pf.Close()
101  sf.Close()
102 
103 
104 def switchDict(d):
105  # switch a dictionary around ; make the values the keys, and vice versa
106  # only works if all values are unique
107  nkeys = len(d.keys())
108  vals = d.values()
109  nvals = len(vals)
110  for v in vals:
111  if vals.count(v) > 1:
112  print('Dictionary cannot be switched, values not unique')
113  return None
114  print('Dict has keys/values : %i/%i' % (nkeys, nvals))
115  pairs = d.items() # returns (key, val) tuples in a list
116  newd = {}
117  for k, entry in pairs:
118  newd[entry] = k
119  return newd
120 
121 
122 def printDict(d, name='unspecified'):
123  # Print out a dictionary in the form
124  #
125  # Dictionary Name :
126  # key value
127  # key value
128  # ...
129  #
130  print('-' * 80)
131  print('Dictionary %s : ' % (name))
132  for k in iter(d.keys()):
133  print('\t', k, '\t', d[k])
134  print('-' * 80)
135 
136 
137 def Reader(readerType, filename, qacross, qToEngine):
138  #
139  # Process for reading a file
140  # One process for reading Serial File, another for Parallel File
141  #
142  # First the order of events is determined, (parallel != serial, usually)
143  #
144  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
145  # on both Serial-Reader and Parallel-Reader processes.
146  #
147  # The string repr of everything in the TES is placed in a dictionary and
148  # sent to the comparison Process, which compares the two dictionaries
149  #
150  a = AppMgr()
151  sel = a.evtsel()
152  evt = a.evtsvc()
153 
154  header = '/Event/Rec/Header'
155  sel.open(filename)
156  ct = 0
157  order = {}
158  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
159 
160  # determine the ordering
161  while True:
162  a.run(1)
163  if evt[header]:
164  eNumber = int(evt[header].evtNumber())
165  order[eNumber] = ct
166  ct += 1
167  else:
168  break
169 
170  if readerType == SER:
171  # send the ordering details to the parallel-reader
172  order = switchDict(order)
173  qacross.put(order)
174  qacross.put(None)
175  # changeName
176  serOrder = order
177  elif readerType == PAR:
178  # receive the serial ordering from queue, and send ordering to SerialReader
179  for serOrder in iter(qacross.get, None):
180  pass
181  lsks = len(serOrder.keys())
182  lpks = len(order.keys())
183  print('Events in Files (serial/parallel) : %i / %i' % (lsks, lpks))
184 
185  # now run files in the order specified by the serial ordering
186  # and send them one by one to the comparison engine
187  for i in iter(serOrder.keys()):
188  if readerType == PAR:
189  i = order[serOrder[i]]
190 
191  a.runSelectedEvents(fname, i)
192  lst = evt.getList()
193 
194  lst.sort()
195  ascii = dict(
196  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
197  qToEngine.put(ascii)
198  qToEngine.put(None)
199  print('%s Reader Finished' % (readerType))
200 
201 
202 def ComparisonEngine(pQueue, sQueue):
203  # The Comparison Engine runs on a seperate forked process and receives
204  # events in pairs, one each from Serial FileReader and Parallel FileReader
205  #
206  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
207  # and are compared using the compareEvents method
208  #
209  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
210  #
211  results = []
212  while True:
213  pitem = pQueue.get()
214  sitem = sQueue.get()
215  if pitem == sitem == None:
216  print('Termination Signals received ok')
217  break
218  elif pitem == None:
219  print('pitem != sitem : ', pitem, sitem)
220  break
221  elif sitem == None:
222  print('pitem != sitem : ', pitem, sitem)
223  break
224  results.append(compareEvents(pitem, sitem))
225  print('=' * 80)
226  print('Comparison Engine Finished')
227  print('-' * 80)
228  print('Total Events Checked : %i' % (len(results)))
229  print('Perfect Matches : %i' % (sum(results)))
230  print('Errors : %i' % (len(results) - sum(results)))
231  print('=' * 80)
232 
233 
235  # the __repr__() method for Event Data Objects will return a generic
236  # string "DataObject at 0xADDRESS" for non-Pythonised objects
237  # If these objects have the same path, they are equal, but this
238  # cannot be tested with "==" in Python, as the memory address will
239  # be different for the two different DataObjects, so this method
240  # will check if the difference is in the address
241  #
242  # args : a, b two string representations
243  ref = 'DataObject at 0x'
244  if a[:16] == b[:16] == ref:
245  return True
246  else:
247  return False
248 
249 
250 def compareEvents(s, p):
251  # events in form of dictionary, with form
252  # d[ path ] = tuple( className, string_repr )
253 
254  # check 1 : number of keys (paths)
255  sks = s.keys()
256  pks = p.keys()
257  sks.sort()
258  pks.sort()
259  if len(sks) == len(pks):
260  pass
261  else:
262  # There may be extra keys in the parallel file
263  # example: DstWriter may ask for /Event/Prev/MC/Header#1
264  # but in TESSerializer, *all* DataObjects will be sent
265  # including /Event/Prev and /Event/Prev/MC
266 
267  # check for extra keys in the parallel file which are just containing DataObjects
268  # if found, remove them
269 
270  extras = list(set(pks) - set(sks))
271  for e in extras:
272  if p[e][0] == 'DataObject':
273  pks.remove(e)
274  else:
275  print('Extra Other thing found!', e, p[e][0])
276  return False
277 
278  # check 2 : same paths?
279  if sks == pks:
280  pass
281  else:
282  return False
283 
284  # check 3 : check the content
285  l = len(sks)
286  diffs = []
287  for i in range(l):
288  key = sks[i]
289  # compare class name
290  if s[key][0] == p[key][0]:
291  pass
292  else:
293  diffs.append(key)
294  # compare string representation
295  if s[key][1] == p[key][1]:
296  pass
297  elif checkForAddressDifference(p[key][1], s[key][1]):
298  pass
299  else:
300  diffs.append(key)
301 
302  # finish
303  if diffs:
304  return False
305  else:
306  return True
307 
308 
309 def CheckFileRecords(par, ser):
310 
311  print("Checking File Records")
312 
313  parFSR = GetFSRdicts(par)
314  serFSR = GetFSRdicts(ser)
315 
316  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
317  set(serFSR["TimeSpanFSR"].iteritems())
318  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
319  set(serFSR["EventCountFSR"].iteritems())
320 
321  print("\nDifferent entries in TimeSpanFSR: \t" + \
322  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
323 
324  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
325  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
326  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
327  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
328  print("Different entries in " + str(k) + ": \tkey: " +
329  str(len(diff3)) + " increment: " + str(len(diff4)) +
330  " integral: " + str(len(diff5)))
331 
332 
333 def LumiFSR(lumi):
334 
335  runs = []
336  files = []
337  info = {}
338  keys = []
339 
340  for r in lumi.runNumbers():
341  runs.append(r)
342 
343  for f in lumi.fileIDs():
344  files.append(f)
345  s = str(lumi)
346  sa = s.split("info (key/incr/integral) : ")[-1]
347  sa = sa.split('/')[:-1]
348 
349  key = []
350  incr = []
351  integral = []
352  for rec in sa:
353  k, i, t = rec.split()
354  key.append(int(k))
355  incr.append(int(i))
356  integral.append(int(t))
357 
358  return (runs, files, key, incr, integral)
359 
360 
361 def GetFSRdict(filename, queue):
362 
363  FSR = {
364  "TimeSpanFSR": {
365  'earliest': 0,
366  'latest': 0
367  },
368  "LumiFSRBeamCrossing": {
369  'key': 0,
370  'incr': 0,
371  'integral': 0
372  },
373  "LumiFSRBeam1": {
374  'key': 0,
375  'incr': 0,
376  'integral': 0
377  },
378  "LumiFSRBeam2": {
379  'key': 0,
380  'incr': 0,
381  'integral': 0
382  },
383  "LumiFSRNoBeam": {
384  'key': 0,
385  'incr': 0,
386  'integral': 0
387  },
388  "EventCountFSR": {
389  'input': 0,
390  'output': 0,
391  'statusFlag': 0
392  }
393  }
394 
395  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
396  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
397  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
398  2011, 2011)
399  exec (options)
400  app = AppMgr()
401  app.run(1)
402  fsr = app.filerecordsvc()
403 
404  lst = fsr.getHistoNames()
405 
406  if lst:
407  for l in lst:
408 
409  ob = fsr.retrieveObject(l)
410 
411  if "LumiFSR" in l:
412 
413  assert ob.numberOfObjects() == 1
414  k = ob.containedObject(0)
415  runs, files, keys, increment, integral = LumiFSR(k)
416 
417  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
418  FSR[l[l.rfind('/') + 1:]]['files'] = files
419  FSR[l[l.rfind('/') + 1:]]['key'] = keys
420  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
421  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
422 
423  if "TimeSpanFSR" in l:
424 
425  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
426  0).earliest()
427  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
428 
429  if "EventCountFSR" in l:
430 
431  FSR["EventCountFSR"]['input'] = ob.input()
432  FSR["EventCountFSR"]['output'] = ob.output()
433  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
434 
435  app.stop()
436  app.finalize()
437 
438  queue.put(FSR)
439 
440 
441 def CompareFSR(pout, sout):
442 
443  parFSR = pout.get()
444  serFSR = sout.get()
445 
446  print("Comparing File Records")
447 
448  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
449  set(serFSR["TimeSpanFSR"].iteritems())
450  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
451  set(serFSR["EventCountFSR"].iteritems())
452 
453  print("\nDifferent entries in TimeSpanFSR: \t" + \
454  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
455 
456  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
457  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
458  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
459  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
460  print("Different entries in " + str(k) + ": \tkey: " +
461  str(len(diff3)) + " increment: " + str(len(diff4)) +
462  " integral: " + str(len(diff5)))
463 
464  print("\nParallel: \n" + str(parFSR))
465  print("\nSerial: \n" + str(serFSR))
466 
467 
468 if __name__ == '__main__':
469 
470  args = sys.argv
471  args.pop(0) # get rid of script name
472  if len(args) != 2:
473  print(
474  'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
475  )
476  sys.exit(0)
477  else:
478  par = 'PFN:' + args[0]
479  ser = 'PFN:' + args[1]
480  print('Parallel File to be analysed : %s' % (par))
481  print('Serial File to be analysed : %s' % (ser))
482 
483  pname = par[4:] # TFile doesn't need the "PFN:" prefix
484  sname = ser[4:]
485 
486  qacross = Queue()
487  pout = Queue()
488  sout = Queue()
489 
490  par = Process(target=Reader, args=(PAR, par, qacross, pout))
491  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
492  com = Process(target=ComparisonEngine, args=(pout, sout))
493 
494  #com.start() ; par.start() ; ser.start()
495  #ser.join() ; par.join() ; com.join()
496 
497  #CompareTrees( pname, sname )
498 
499  print("Check File Records")
500 
501  ser = sys.argv[0]
502  par = sys.argv[1]
503 
504  pout = Queue()
505  sout = Queue()
506 
507  sp = Process(target=GetFSRdict, args=(ser, sout))
508  pp = Process(target=GetFSRdict, args=(par, pout))
509  cp = Process(target=CompareFSR, args=(pout, sout))
510 
511  sp.start()
512  pp.start()
513  cp.start()
514  sp.join()
515  pp.join()
516  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
A small to stream Data I/O.
Definition: OutputStream.h:38
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:63
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
def Reader(readerType, filename, qacross, qToEngine)