The Gaudi Framework  v38r1p1 (ae26267b)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 
12 # flake8: noqa (job options)
13 
14 
15 import sys
16 from multiprocessing import Process, Queue
17 
18 from Configurables import LHCbApp
19 from Gaudi.Configuration import *
20 from ROOT import TBuffer, TBufferFile, TFile
21 
22 from GaudiPython import AppMgr, gbl
23 
24 #
25 # loadFile.py
26 # -----------
27 # Open a dst file for inspection
28 #
29 
30 
31 def checkKeys(name):
32  # Check the TTree keys in each file
33  fname = name[4:] # TFile doesn't need the "PFN:" prefix
34  tf = TFile(fname, "REC")
35 
36 
37 importOptions("$STDOPTS/LHCbApplication.opts")
38 importOptions("$ENV_PROJECT_SOURCE_DIR/RootCnv/options/Setup.opts")
39 
40 OutputStream("DstWriter").Output = ""
41 HistogramPersistencySvc().OutputFile = ""
42 MessageSvc(OutputLevel=ERROR)
43 EventSelector().PrintFreq = 100
44 
45 ApplicationMgr(OutputLevel=ERROR, AppName="File Check - Serial vs Parallel")
46 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
47 
48 PAR = "PARALLEL"
49 SER = "SERIAL"
50 
51 
52 def CompareTrees(pname, sname):
53  pf = TFile(pname, "REC")
54  sf = TFile(sname, "REC")
55  event = "_Event"
56  pfks = pf.GetListOfKeys()
57  sfks = sf.GetListOfKeys()
58  pfkeys = list([pfk.GetName() for pfk in pfks])
59  pfkeys.sort()
60  sfkeys = list([sfk.GetName() for sfk in sfks])
61  sfkeys.sort()
62  pMeta = []
63  pEvent = []
64  pOther = []
65  for k in pfkeys:
66  if k.startswith(event):
67  pEvent.append(k)
68  elif k.startswith("##"):
69  pMeta.append(k)
70  else:
71  pOther.append(k)
72  sMeta = []
73  sEvent = []
74  sOther = []
75  for k in sfkeys:
76  if k.startswith(event):
77  sEvent.append(k)
78  elif k.startswith("##"):
79  sMeta.append(k)
80  else:
81  sOther.append(k)
82 
83  if pMeta == sMeta:
84  pass
85  else:
86  print("Meta Data differs")
87 
88  if pEvent == sEvent:
89  pass
90  else:
91  print("Event data differs")
92 
93  if pOther != sOther:
94  pset = set(pOther)
95  sset = set(sOther)
96  pExtra = pset - sset
97  sExtra = sset - pset
98  if pExtra:
99  print("Extra Data in parallel file : ", pExtra)
100  if sExtra:
101  print("Extra Data in serial file : ", sExtra)
102  if sExtra or pExtra:
103  print("Files will have different sizes")
104  pf.Close()
105  sf.Close()
106 
107 
108 def switchDict(d):
109  # switch a dictionary around ; make the values the keys, and vice versa
110  # only works if all values are unique
111  nkeys = len(d.keys())
112  vals = d.values()
113  nvals = len(vals)
114  for v in vals:
115  if vals.count(v) > 1:
116  print("Dictionary cannot be switched, values not unique")
117  return None
118  print("Dict has keys/values : %i/%i" % (nkeys, nvals))
119  pairs = d.items() # returns (key, val) tuples in a list
120  newd = {}
121  for k, entry in pairs:
122  newd[entry] = k
123  return newd
124 
125 
126 def printDict(d, name="unspecified"):
127  # Print out a dictionary in the form
128  #
129  # Dictionary Name :
130  # key value
131  # key value
132  # ...
133  #
134  print("-" * 80)
135  print("Dictionary %s : " % (name))
136  for k in iter(d.keys()):
137  print("\t", k, "\t", d[k])
138  print("-" * 80)
139 
140 
141 def Reader(readerType, filename, qacross, qToEngine):
142  #
143  # Process for reading a file
144  # One process for reading Serial File, another for Parallel File
145  #
146  # First the order of events is determined, (parallel != serial, usually)
147  #
148  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
149  # on both Serial-Reader and Parallel-Reader processes.
150  #
151  # The string repr of everything in the TES is placed in a dictionary and
152  # sent to the comparison Process, which compares the two dictionaries
153  #
154  a = AppMgr()
155  sel = a.evtsel()
156  evt = a.evtsvc()
157 
158  header = "/Event/Rec/Header"
159  sel.open(filename)
160  ct = 0
161  order = {}
162  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
163 
164  # determine the ordering
165  while True:
166  a.run(1)
167  if evt[header]:
168  eNumber = int(evt[header].evtNumber())
169  order[eNumber] = ct
170  ct += 1
171  else:
172  break
173 
174  if readerType == SER:
175  # send the ordering details to the parallel-reader
176  order = switchDict(order)
177  qacross.put(order)
178  qacross.put(None)
179  # changeName
180  serOrder = order
181  elif readerType == PAR:
182  # receive the serial ordering from queue, and send ordering to SerialReader
183  for serOrder in iter(qacross.get, None):
184  pass
185  lsks = len(serOrder.keys())
186  lpks = len(order.keys())
187  print("Events in Files (serial/parallel) : %i / %i" % (lsks, lpks))
188 
189  # now run files in the order specified by the serial ordering
190  # and send them one by one to the comparison engine
191  for i in iter(serOrder.keys()):
192  if readerType == PAR:
193  i = order[serOrder[i]]
194 
195  a.runSelectedEvents(fname, i)
196  lst = evt.getList()
197 
198  lst.sort()
199  ascii = dict([(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
200  qToEngine.put(ascii)
201  qToEngine.put(None)
202  print("%s Reader Finished" % (readerType))
203 
204 
205 def ComparisonEngine(pQueue, sQueue):
206  # The Comparison Engine runs on a seperate forked process and receives
207  # events in pairs, one each from Serial FileReader and Parallel FileReader
208  #
209  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
210  # and are compared using the compareEvents method
211  #
212  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
213  #
214  results = []
215  while True:
216  pitem = pQueue.get()
217  sitem = sQueue.get()
218  if pitem == sitem == None:
219  print("Termination Signals received ok")
220  break
221  elif pitem == None:
222  print("pitem != sitem : ", pitem, sitem)
223  break
224  elif sitem == None:
225  print("pitem != sitem : ", pitem, sitem)
226  break
227  results.append(compareEvents(pitem, sitem))
228  print("=" * 80)
229  print("Comparison Engine Finished")
230  print("-" * 80)
231  print("Total Events Checked : %i" % (len(results)))
232  print("Perfect Matches : %i" % (sum(results)))
233  print("Errors : %i" % (len(results) - sum(results)))
234  print("=" * 80)
235 
236 
238  # the __repr__() method for Event Data Objects will return a generic
239  # string "DataObject at 0xADDRESS" for non-Pythonised objects
240  # If these objects have the same path, they are equal, but this
241  # cannot be tested with "==" in Python, as the memory address will
242  # be different for the two different DataObjects, so this method
243  # will check if the difference is in the address
244  #
245  # args : a, b two string representations
246  ref = "DataObject at 0x"
247  if a[:16] == b[:16] == ref:
248  return True
249  else:
250  return False
251 
252 
253 def compareEvents(s, p):
254  # events in form of dictionary, with form
255  # d[ path ] = tuple( className, string_repr )
256 
257  # check 1 : number of keys (paths)
258  sks = s.keys()
259  pks = p.keys()
260  sks.sort()
261  pks.sort()
262  if len(sks) == len(pks):
263  pass
264  else:
265  # There may be extra keys in the parallel file
266  # example: DstWriter may ask for /Event/Prev/MC/Header#1
267  # but in TESSerializer, *all* DataObjects will be sent
268  # including /Event/Prev and /Event/Prev/MC
269 
270  # check for extra keys in the parallel file which are just containing DataObjects
271  # if found, remove them
272 
273  extras = list(set(pks) - set(sks))
274  for e in extras:
275  if p[e][0] == "DataObject":
276  pks.remove(e)
277  else:
278  print("Extra Other thing found!", e, p[e][0])
279  return False
280 
281  # check 2 : same paths?
282  if sks == pks:
283  pass
284  else:
285  return False
286 
287  # check 3 : check the content
288  l = len(sks)
289  diffs = []
290  for i in range(l):
291  key = sks[i]
292  # compare class name
293  if s[key][0] == p[key][0]:
294  pass
295  else:
296  diffs.append(key)
297  # compare string representation
298  if s[key][1] == p[key][1]:
299  pass
300  elif checkForAddressDifference(p[key][1], s[key][1]):
301  pass
302  else:
303  diffs.append(key)
304 
305  # finish
306  if diffs:
307  return False
308  else:
309  return True
310 
311 
312 def CheckFileRecords(par, ser):
313  print("Checking File Records")
314 
315  parFSR = GetFSRdicts(par)
316  serFSR = GetFSRdicts(ser)
317 
318  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
319  serFSR["TimeSpanFSR"].iteritems()
320  )
321  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
322  serFSR["EventCountFSR"].iteritems()
323  )
324 
325  print(
326  "\nDifferent entries in TimeSpanFSR: \t"
327  + str(len(diff1))
328  + "\nDifferent entries in EventCountFSR:\t"
329  + str(len(diff2))
330  )
331 
332  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
333  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
334  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
335  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
336  print(
337  "Different entries in "
338  + str(k)
339  + ": \tkey: "
340  + str(len(diff3))
341  + " increment: "
342  + str(len(diff4))
343  + " integral: "
344  + str(len(diff5))
345  )
346 
347 
348 def LumiFSR(lumi):
349  runs = []
350  files = []
351  info = {}
352  keys = []
353 
354  for r in lumi.runNumbers():
355  runs.append(r)
356 
357  for f in lumi.fileIDs():
358  files.append(f)
359  s = str(lumi)
360  sa = s.split("info (key/incr/integral) : ")[-1]
361  sa = sa.split("/")[:-1]
362 
363  key = []
364  incr = []
365  integral = []
366  for rec in sa:
367  k, i, t = rec.split()
368  key.append(int(k))
369  incr.append(int(i))
370  integral.append(int(t))
371 
372  return (runs, files, key, incr, integral)
373 
374 
375 def GetFSRdict(filename, queue):
376  FSR = {
377  "TimeSpanFSR": {"earliest": 0, "latest": 0},
378  "LumiFSRBeamCrossing": {"key": 0, "incr": 0, "integral": 0},
379  "LumiFSRBeam1": {"key": 0, "incr": 0, "integral": 0},
380  "LumiFSRBeam2": {"key": 0, "incr": 0, "integral": 0},
381  "LumiFSRNoBeam": {"key": 0, "incr": 0, "integral": 0},
382  "EventCountFSR": {"input": 0, "output": 0, "statusFlag": 0},
383  }
384 
385  options = (
386  "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;"
387  % filename
388  )
389  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
390  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (2011, 2011)
391  exec(options)
392  app = AppMgr()
393  app.run(1)
394  fsr = app.filerecordsvc()
395 
396  lst = fsr.getHistoNames()
397 
398  if lst:
399  for l in lst:
400  ob = fsr.retrieveObject(l)
401 
402  if "LumiFSR" in l:
403  assert ob.numberOfObjects() == 1
404  k = ob.containedObject(0)
405  runs, files, keys, increment, integral = LumiFSR(k)
406 
407  FSR[l[l.rfind("/") + 1 :]]["runs"] = runs
408  FSR[l[l.rfind("/") + 1 :]]["files"] = files
409  FSR[l[l.rfind("/") + 1 :]]["key"] = keys
410  FSR[l[l.rfind("/") + 1 :]]["incr"] = increment
411  FSR[l[l.rfind("/") + 1 :]]["integral"] = integral
412 
413  if "TimeSpanFSR" in l:
414  FSR["TimeSpanFSR"]["earliest"] = ob.containedObject(0).earliest()
415  FSR["TimeSpanFSR"]["latest"] = ob.containedObject(0).latest()
416 
417  if "EventCountFSR" in l:
418  FSR["EventCountFSR"]["input"] = ob.input()
419  FSR["EventCountFSR"]["output"] = ob.output()
420  FSR["EventCountFSR"]["statusFlag"] = ob.statusFlag()
421 
422  app.stop()
423  app.finalize()
424 
425  queue.put(FSR)
426 
427 
428 def CompareFSR(pout, sout):
429  parFSR = pout.get()
430  serFSR = sout.get()
431 
432  print("Comparing File Records")
433 
434  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
435  serFSR["TimeSpanFSR"].iteritems()
436  )
437  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
438  serFSR["EventCountFSR"].iteritems()
439  )
440 
441  print(
442  "\nDifferent entries in TimeSpanFSR: \t"
443  + str(len(diff1))
444  + "\nDifferent entries in EventCountFSR:\t"
445  + str(len(diff2))
446  )
447 
448  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
449  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
450  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
451  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
452  print(
453  "Different entries in "
454  + str(k)
455  + ": \tkey: "
456  + str(len(diff3))
457  + " increment: "
458  + str(len(diff4))
459  + " integral: "
460  + str(len(diff5))
461  )
462 
463  print("\nParallel: \n" + str(parFSR))
464  print("\nSerial: \n" + str(serFSR))
465 
466 
467 if __name__ == "__main__":
468  args = sys.argv
469  args.pop(0) # get rid of script name
470  if len(args) != 2:
471  print(
472  "Please supply two arguments : > python loadFile <parallelFile> <serialFile>"
473  )
474  sys.exit(0)
475  else:
476  par = "PFN:" + args[0]
477  ser = "PFN:" + args[1]
478  print("Parallel File to be analysed : %s" % (par))
479  print("Serial File to be analysed : %s" % (ser))
480 
481  pname = par[4:] # TFile doesn't need the "PFN:" prefix
482  sname = ser[4:]
483 
484  qacross = Queue()
485  pout = Queue()
486  sout = Queue()
487 
488  par = Process(target=Reader, args=(PAR, par, qacross, pout))
489  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
490  com = Process(target=ComparisonEngine, args=(pout, sout))
491 
492  # com.start() ; par.start() ; ser.start()
493  # ser.join() ; par.join() ; com.join()
494 
495  # CompareTrees( pname, sname )
496 
497  print("Check File Records")
498 
499  ser = sys.argv[0]
500  par = sys.argv[1]
501 
502  pout = Queue()
503  sout = Queue()
504 
505  sp = Process(target=GetFSRdict, args=(ser, sout))
506  pp = Process(target=GetFSRdict, args=(par, pout))
507  cp = Process(target=CompareFSR, args=(pout, sout))
508 
509  sp.start()
510  pp.start()
511  cp.start()
512  sp.join()
513  pp.join()
514  cp.join()
OutputStream
A small to stream Data I/O.
Definition: OutputStream.h:38
compareOutputFiles.compareEvents
def compareEvents(s, p)
Definition: compareOutputFiles.py:253
compareOutputFiles.ComparisonEngine
ComparisonEngine
Definition: compareOutputFiles.py:490
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:887
compareOutputFiles.LumiFSR
def LumiFSR(lumi)
Definition: compareOutputFiles.py:348
compareOutputFiles.CompareFSR
CompareFSR
Definition: compareOutputFiles.py:507
compareOutputFiles.CompareTrees
def CompareTrees(pname, sname)
Definition: compareOutputFiles.py:52
Gaudi.Configuration
Definition: Configuration.py:1
compareOutputFiles.GetFSRdict
GetFSRdict
Definition: compareOutputFiles.py:505
compareOutputFiles.checkKeys
def checkKeys(name)
Definition: compareOutputFiles.py:31
compareOutputFiles.printDict
def printDict(d, name="unspecified")
Definition: compareOutputFiles.py:126
compareOutputFiles.CheckFileRecords
def CheckFileRecords(par, ser)
Definition: compareOutputFiles.py:312
compareOutputFiles.switchDict
def switchDict(d)
Definition: compareOutputFiles.py:108
GaudiKernel.ProcessJobOptions.importOptions
def importOptions(optsfile)
Definition: ProcessJobOptions.py:541
MessageSvc
Definition: MessageSvc.h:40
HistogramPersistencySvc
HistogramPersistencySvc class implementation definition.
Definition: HistogramPersistencySvc.h:57
ApplicationMgr
Definition: ApplicationMgr.h:57
EventSelector
Definition of class EventSelector.
Definition: EventSelector.h:63
compareOutputFiles.Reader
Reader
Definition: compareOutputFiles.py:488
GaudiAlg.HistoUtils.__repr__
__repr__
Definition: HistoUtils.py:535
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:98
compareOutputFiles.checkForAddressDifference
def checkForAddressDifference(a, b)
Definition: compareOutputFiles.py:237