The Gaudi Framework  v36r1 (3e2fb5a8)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 from Gaudi.Configuration import *
13 from GaudiPython import AppMgr, gbl
14 from ROOT import TFile, TBufferFile, TBuffer
15 from multiprocessing import Process, Queue
16 from Configurables import LHCbApp
17 import sys
18 
19 #
20 # loadFile.py
21 # -----------
22 # Open a dst file for inspection
23 #
24 
25 
26 def checkKeys(name):
27  # Check the TTree keys in each file
28  fname = name[4:] # TFile doesn't need the "PFN:" prefix
29  tf = TFile(fname, 'REC')
30 
31 
32 importOptions('$STDOPTS/LHCbApplication.opts')
33 importOptions('$ENV_CMAKE_SOURCE_DIR/RootCnv/options/Setup.opts')
34 
35 OutputStream("DstWriter").Output = ''
36 HistogramPersistencySvc().OutputFile = ''
37 MessageSvc(OutputLevel=ERROR)
38 EventSelector().PrintFreq = 100
39 
40 ApplicationMgr(OutputLevel=ERROR, AppName='File Check - Serial vs Parallel')
41 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
42 
43 PAR = 'PARALLEL'
44 SER = 'SERIAL'
45 
46 
47 def CompareTrees(pname, sname):
48  pf = TFile(pname, 'REC')
49  sf = TFile(sname, 'REC')
50  event = '_Event'
51  pfks = pf.GetListOfKeys()
52  sfks = sf.GetListOfKeys()
53  pfkeys = list([pfk.GetName() for pfk in pfks])
54  pfkeys.sort()
55  sfkeys = list([sfk.GetName() for sfk in sfks])
56  sfkeys.sort()
57  pMeta = []
58  pEvent = []
59  pOther = []
60  for k in pfkeys:
61  if k.startswith(event):
62  pEvent.append(k)
63  elif k.startswith('##'):
64  pMeta.append(k)
65  else:
66  pOther.append(k)
67  sMeta = []
68  sEvent = []
69  sOther = []
70  for k in sfkeys:
71  if k.startswith(event):
72  sEvent.append(k)
73  elif k.startswith('##'):
74  sMeta.append(k)
75  else:
76  sOther.append(k)
77 
78  if pMeta == sMeta:
79  pass
80  else:
81  print('Meta Data differs')
82 
83  if pEvent == sEvent:
84  pass
85  else:
86  print('Event data differs')
87 
88  if pOther != sOther:
89  pset = set(pOther)
90  sset = set(sOther)
91  pExtra = pset - sset
92  sExtra = sset - pset
93  if pExtra:
94  print('Extra Data in parallel file : ', pExtra)
95  if sExtra:
96  print('Extra Data in serial file : ', sExtra)
97  if sExtra or pExtra:
98  print('Files will have different sizes')
99  pf.Close()
100  sf.Close()
101 
102 
103 def switchDict(d):
104  # switch a dictionary around ; make the values the keys, and vice versa
105  # only works if all values are unique
106  nkeys = len(d.keys())
107  vals = d.values()
108  nvals = len(vals)
109  for v in vals:
110  if vals.count(v) > 1:
111  print('Dictionary cannot be switched, values not unique')
112  return None
113  print('Dict has keys/values : %i/%i' % (nkeys, nvals))
114  pairs = d.items() # returns (key, val) tuples in a list
115  newd = {}
116  for k, entry in pairs:
117  newd[entry] = k
118  return newd
119 
120 
121 def printDict(d, name='unspecified'):
122  # Print out a dictionary in the form
123  #
124  # Dictionary Name :
125  # key value
126  # key value
127  # ...
128  #
129  print('-' * 80)
130  print('Dictionary %s : ' % (name))
131  for k in iter(d.keys()):
132  print('\t', k, '\t', d[k])
133  print('-' * 80)
134 
135 
136 def Reader(readerType, filename, qacross, qToEngine):
137  #
138  # Process for reading a file
139  # One process for reading Serial File, another for Parallel File
140  #
141  # First the order of events is determined, (parallel != serial, usually)
142  #
143  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
144  # on both Serial-Reader and Parallel-Reader processes.
145  #
146  # The string repr of everything in the TES is placed in a dictionary and
147  # sent to the comparison Process, which compares the two dictionaries
148  #
149  a = AppMgr()
150  sel = a.evtsel()
151  evt = a.evtsvc()
152 
153  header = '/Event/Rec/Header'
154  sel.open(filename)
155  ct = 0
156  order = {}
157  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
158 
159  # determine the ordering
160  while True:
161  a.run(1)
162  if evt[header]:
163  eNumber = int(evt[header].evtNumber())
164  order[eNumber] = ct
165  ct += 1
166  else:
167  break
168 
169  if readerType == SER:
170  # send the ordering details to the parallel-reader
171  order = switchDict(order)
172  qacross.put(order)
173  qacross.put(None)
174  # changeName
175  serOrder = order
176  elif readerType == PAR:
177  # receive the serial ordering from queue, and send ordering to SerialReader
178  for serOrder in iter(qacross.get, None):
179  pass
180  lsks = len(serOrder.keys())
181  lpks = len(order.keys())
182  print('Events in Files (serial/parallel) : %i / %i' % (lsks, lpks))
183 
184  # now run files in the order specified by the serial ordering
185  # and send them one by one to the comparison engine
186  for i in iter(serOrder.keys()):
187  if readerType == PAR:
188  i = order[serOrder[i]]
189 
190  a.runSelectedEvents(fname, i)
191  lst = evt.getList()
192 
193  lst.sort()
194  ascii = dict(
195  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
196  qToEngine.put(ascii)
197  qToEngine.put(None)
198  print('%s Reader Finished' % (readerType))
199 
200 
201 def ComparisonEngine(pQueue, sQueue):
202  # The Comparison Engine runs on a seperate forked process and receives
203  # events in pairs, one each from Serial FileReader and Parallel FileReader
204  #
205  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
206  # and are compared using the compareEvents method
207  #
208  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
209  #
210  results = []
211  while True:
212  pitem = pQueue.get()
213  sitem = sQueue.get()
214  if pitem == sitem == None:
215  print('Termination Signals received ok')
216  break
217  elif pitem == None:
218  print('pitem != sitem : ', pitem, sitem)
219  break
220  elif sitem == None:
221  print('pitem != sitem : ', pitem, sitem)
222  break
223  results.append(compareEvents(pitem, sitem))
224  print('=' * 80)
225  print('Comparison Engine Finished')
226  print('-' * 80)
227  print('Total Events Checked : %i' % (len(results)))
228  print('Perfect Matches : %i' % (sum(results)))
229  print('Errors : %i' % (len(results) - sum(results)))
230  print('=' * 80)
231 
232 
234  # the __repr__() method for Event Data Objects will return a generic
235  # string "DataObject at 0xADDRESS" for non-Pythonised objects
236  # If these objects have the same path, they are equal, but this
237  # cannot be tested with "==" in Python, as the memory address will
238  # be different for the two different DataObjects, so this method
239  # will check if the difference is in the address
240  #
241  # args : a, b two string representations
242  ref = 'DataObject at 0x'
243  if a[:16] == b[:16] == ref:
244  return True
245  else:
246  return False
247 
248 
249 def compareEvents(s, p):
250  # events in form of dictionary, with form
251  # d[ path ] = tuple( className, string_repr )
252 
253  # check 1 : number of keys (paths)
254  sks = s.keys()
255  pks = p.keys()
256  sks.sort()
257  pks.sort()
258  if len(sks) == len(pks):
259  pass
260  else:
261  # There may be extra keys in the parallel file
262  # example: DstWriter may ask for /Event/Prev/MC/Header#1
263  # but in TESSerializer, *all* DataObjects will be sent
264  # including /Event/Prev and /Event/Prev/MC
265 
266  # check for extra keys in the parallel file which are just containing DataObjects
267  # if found, remove them
268 
269  extras = list(set(pks) - set(sks))
270  for e in extras:
271  if p[e][0] == 'DataObject':
272  pks.remove(e)
273  else:
274  print('Extra Other thing found!', e, p[e][0])
275  return False
276 
277  # check 2 : same paths?
278  if sks == pks:
279  pass
280  else:
281  return False
282 
283  # check 3 : check the content
284  l = len(sks)
285  diffs = []
286  for i in range(l):
287  key = sks[i]
288  # compare class name
289  if s[key][0] == p[key][0]:
290  pass
291  else:
292  diffs.append(key)
293  # compare string representation
294  if s[key][1] == p[key][1]:
295  pass
296  elif checkForAddressDifference(p[key][1], s[key][1]):
297  pass
298  else:
299  diffs.append(key)
300 
301  # finish
302  if diffs:
303  return False
304  else:
305  return True
306 
307 
308 def CheckFileRecords(par, ser):
309 
310  print("Checking File Records")
311 
312  parFSR = GetFSRdicts(par)
313  serFSR = GetFSRdicts(ser)
314 
315  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
316  set(serFSR["TimeSpanFSR"].iteritems())
317  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
318  set(serFSR["EventCountFSR"].iteritems())
319 
320  print("\nDifferent entries in TimeSpanFSR: \t" + \
321  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
322 
323  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
324  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
325  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
326  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
327  print("Different entries in " + str(k) + ": \tkey: " +
328  str(len(diff3)) + " increment: " + str(len(diff4)) +
329  " integral: " + str(len(diff5)))
330 
331 
332 def LumiFSR(lumi):
333 
334  runs = []
335  files = []
336  info = {}
337  keys = []
338 
339  for r in lumi.runNumbers():
340  runs.append(r)
341 
342  for f in lumi.fileIDs():
343  files.append(f)
344  s = str(lumi)
345  sa = s.split("info (key/incr/integral) : ")[-1]
346  sa = sa.split('/')[:-1]
347 
348  key = []
349  incr = []
350  integral = []
351  for rec in sa:
352  k, i, t = rec.split()
353  key.append(int(k))
354  incr.append(int(i))
355  integral.append(int(t))
356 
357  return (runs, files, key, incr, integral)
358 
359 
360 def GetFSRdict(filename, queue):
361 
362  FSR = {
363  "TimeSpanFSR": {
364  'earliest': 0,
365  'latest': 0
366  },
367  "LumiFSRBeamCrossing": {
368  'key': 0,
369  'incr': 0,
370  'integral': 0
371  },
372  "LumiFSRBeam1": {
373  'key': 0,
374  'incr': 0,
375  'integral': 0
376  },
377  "LumiFSRBeam2": {
378  'key': 0,
379  'incr': 0,
380  'integral': 0
381  },
382  "LumiFSRNoBeam": {
383  'key': 0,
384  'incr': 0,
385  'integral': 0
386  },
387  "EventCountFSR": {
388  'input': 0,
389  'output': 0,
390  'statusFlag': 0
391  }
392  }
393 
394  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
395  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
396  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
397  2011, 2011)
398  exec(options)
399  app = AppMgr()
400  app.run(1)
401  fsr = app.filerecordsvc()
402 
403  lst = fsr.getHistoNames()
404 
405  if lst:
406  for l in lst:
407 
408  ob = fsr.retrieveObject(l)
409 
410  if "LumiFSR" in l:
411 
412  assert ob.numberOfObjects() == 1
413  k = ob.containedObject(0)
414  runs, files, keys, increment, integral = LumiFSR(k)
415 
416  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
417  FSR[l[l.rfind('/') + 1:]]['files'] = files
418  FSR[l[l.rfind('/') + 1:]]['key'] = keys
419  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
420  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
421 
422  if "TimeSpanFSR" in l:
423 
424  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
425  0).earliest()
426  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
427 
428  if "EventCountFSR" in l:
429 
430  FSR["EventCountFSR"]['input'] = ob.input()
431  FSR["EventCountFSR"]['output'] = ob.output()
432  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
433 
434  app.stop()
435  app.finalize()
436 
437  queue.put(FSR)
438 
439 
440 def CompareFSR(pout, sout):
441 
442  parFSR = pout.get()
443  serFSR = sout.get()
444 
445  print("Comparing File Records")
446 
447  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
448  set(serFSR["TimeSpanFSR"].iteritems())
449  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
450  set(serFSR["EventCountFSR"].iteritems())
451 
452  print("\nDifferent entries in TimeSpanFSR: \t" + \
453  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2)))
454 
455  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
456  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
457  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
458  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
459  print("Different entries in " + str(k) + ": \tkey: " +
460  str(len(diff3)) + " increment: " + str(len(diff4)) +
461  " integral: " + str(len(diff5)))
462 
463  print("\nParallel: \n" + str(parFSR))
464  print("\nSerial: \n" + str(serFSR))
465 
466 
467 if __name__ == '__main__':
468 
469  args = sys.argv
470  args.pop(0) # get rid of script name
471  if len(args) != 2:
472  print(
473  'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
474  )
475  sys.exit(0)
476  else:
477  par = 'PFN:' + args[0]
478  ser = 'PFN:' + args[1]
479  print('Parallel File to be analysed : %s' % (par))
480  print('Serial File to be analysed : %s' % (ser))
481 
482  pname = par[4:] # TFile doesn't need the "PFN:" prefix
483  sname = ser[4:]
484 
485  qacross = Queue()
486  pout = Queue()
487  sout = Queue()
488 
489  par = Process(target=Reader, args=(PAR, par, qacross, pout))
490  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
491  com = Process(target=ComparisonEngine, args=(pout, sout))
492 
493  #com.start() ; par.start() ; ser.start()
494  #ser.join() ; par.join() ; com.join()
495 
496  #CompareTrees( pname, sname )
497 
498  print("Check File Records")
499 
500  ser = sys.argv[0]
501  par = sys.argv[1]
502 
503  pout = Queue()
504  sout = Queue()
505 
506  sp = Process(target=GetFSRdict, args=(ser, sout))
507  pp = Process(target=GetFSRdict, args=(par, pout))
508  cp = Process(target=CompareFSR, args=(pout, sout))
509 
510  sp.start()
511  pp.start()
512  cp.start()
513  sp.join()
514  pp.join()
515  cp.join()
OutputStream
A small to stream Data I/O.
Definition: OutputStream.h:38
compareOutputFiles.compareEvents
def compareEvents(s, p)
Definition: compareOutputFiles.py:249
compareOutputFiles.ComparisonEngine
def ComparisonEngine(pQueue, sQueue)
Definition: compareOutputFiles.py:201
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:842
compareOutputFiles.LumiFSR
def LumiFSR(lumi)
Definition: compareOutputFiles.py:332
compareOutputFiles.CompareTrees
def CompareTrees(pname, sname)
Definition: compareOutputFiles.py:47
Gaudi.Configuration
Definition: Configuration.py:1
GaudiPython.HistoUtils.__repr__
__repr__
Definition: HistoUtils.py:519
compareOutputFiles.checkKeys
def checkKeys(name)
Definition: compareOutputFiles.py:26
compareOutputFiles.CheckFileRecords
def CheckFileRecords(par, ser)
Definition: compareOutputFiles.py:308
compareOutputFiles.switchDict
def switchDict(d)
Definition: compareOutputFiles.py:103
GaudiKernel.ProcessJobOptions.importOptions
def importOptions(optsfile)
Definition: ProcessJobOptions.py:491
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:525
MessageSvc
Definition: MessageSvc.h:40
compareOutputFiles.GetFSRdict
def GetFSRdict(filename, queue)
Definition: compareOutputFiles.py:360
HistogramPersistencySvc
HistogramPersistencySvc class implementation definition.
Definition: HistogramPersistencySvc.h:57
ApplicationMgr
Definition: ApplicationMgr.h:57
EventSelector
Definition of class EventSelector.
Definition: EventSelector.h:63
compareOutputFiles.Reader
def Reader(readerType, filename, qacross, qToEngine)
Definition: compareOutputFiles.py:136
compareOutputFiles.CompareFSR
def CompareFSR(pout, sout)
Definition: compareOutputFiles.py:440
compareOutputFiles.printDict
def printDict(d, name='unspecified')
Definition: compareOutputFiles.py:121
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:97
compareOutputFiles.checkForAddressDifference
def checkForAddressDifference(a, b)
Definition: compareOutputFiles.py:233