The Gaudi Framework  master (37c0b60a)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 
12 # We do not lint job options
13 # ruff: noqa
14 
15 
16 import sys
17 from multiprocessing import Process, Queue
18 
19 from Configurables import LHCbApp
20 from Gaudi.Configuration import *
21 from ROOT import TBuffer, TBufferFile, TFile
22 
23 from GaudiPython import AppMgr, gbl
24 
25 #
26 # loadFile.py
27 # -----------
28 # Open a dst file for inspection
29 #
30 
31 
32 def checkKeys(name):
33  # Check the TTree keys in each file
34  fname = name[4:] # TFile doesn't need the "PFN:" prefix
35  tf = TFile(fname, "REC")
36 
37 
38 importOptions("$STDOPTS/LHCbApplication.opts")
39 importOptions("$ENV_PROJECT_SOURCE_DIR/RootCnv/options/Setup.opts")
40 
41 OutputStream("DstWriter").Output = ""
42 HistogramPersistencySvc().OutputFile = ""
43 MessageSvc(OutputLevel=ERROR)
44 EventSelector().PrintFreq = 100
45 
46 ApplicationMgr(OutputLevel=ERROR, AppName="File Check - Serial vs Parallel")
47 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
48 
49 PAR = "PARALLEL"
50 SER = "SERIAL"
51 
52 
53 def CompareTrees(pname, sname):
54  pf = TFile(pname, "REC")
55  sf = TFile(sname, "REC")
56  event = "_Event"
57  pfks = pf.GetListOfKeys()
58  sfks = sf.GetListOfKeys()
59  pfkeys = list([pfk.GetName() for pfk in pfks])
60  pfkeys.sort()
61  sfkeys = list([sfk.GetName() for sfk in sfks])
62  sfkeys.sort()
63  pMeta = []
64  pEvent = []
65  pOther = []
66  for k in pfkeys:
67  if k.startswith(event):
68  pEvent.append(k)
69  elif k.startswith("##"):
70  pMeta.append(k)
71  else:
72  pOther.append(k)
73  sMeta = []
74  sEvent = []
75  sOther = []
76  for k in sfkeys:
77  if k.startswith(event):
78  sEvent.append(k)
79  elif k.startswith("##"):
80  sMeta.append(k)
81  else:
82  sOther.append(k)
83 
84  if pMeta == sMeta:
85  pass
86  else:
87  print("Meta Data differs")
88 
89  if pEvent == sEvent:
90  pass
91  else:
92  print("Event data differs")
93 
94  if pOther != sOther:
95  pset = set(pOther)
96  sset = set(sOther)
97  pExtra = pset - sset
98  sExtra = sset - pset
99  if pExtra:
100  print("Extra Data in parallel file : ", pExtra)
101  if sExtra:
102  print("Extra Data in serial file : ", sExtra)
103  if sExtra or pExtra:
104  print("Files will have different sizes")
105  pf.Close()
106  sf.Close()
107 
108 
109 def switchDict(d):
110  # switch a dictionary around ; make the values the keys, and vice versa
111  # only works if all values are unique
112  nkeys = len(d.keys())
113  vals = d.values()
114  nvals = len(vals)
115  for v in vals:
116  if vals.count(v) > 1:
117  print("Dictionary cannot be switched, values not unique")
118  return None
119  print("Dict has keys/values : %i/%i" % (nkeys, nvals))
120  pairs = d.items() # returns (key, val) tuples in a list
121  newd = {}
122  for k, entry in pairs:
123  newd[entry] = k
124  return newd
125 
126 
127 def printDict(d, name="unspecified"):
128  # Print out a dictionary in the form
129  #
130  # Dictionary Name :
131  # key value
132  # key value
133  # ...
134  #
135  print("-" * 80)
136  print("Dictionary %s : " % (name))
137  for k in iter(d.keys()):
138  print("\t", k, "\t", d[k])
139  print("-" * 80)
140 
141 
142 def Reader(readerType, filename, qacross, qToEngine):
143  #
144  # Process for reading a file
145  # One process for reading Serial File, another for Parallel File
146  #
147  # First the order of events is determined, (parallel != serial, usually)
148  #
149  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
150  # on both Serial-Reader and Parallel-Reader processes.
151  #
152  # The string repr of everything in the TES is placed in a dictionary and
153  # sent to the comparison Process, which compares the two dictionaries
154  #
155  a = AppMgr()
156  sel = a.evtsel()
157  evt = a.evtsvc()
158 
159  header = "/Event/Rec/Header"
160  sel.open(filename)
161  ct = 0
162  order = {}
163  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
164 
165  # determine the ordering
166  while True:
167  a.run(1)
168  if evt[header]:
169  eNumber = int(evt[header].evtNumber())
170  order[eNumber] = ct
171  ct += 1
172  else:
173  break
174 
175  if readerType == SER:
176  # send the ordering details to the parallel-reader
177  order = switchDict(order)
178  qacross.put(order)
179  qacross.put(None)
180  # changeName
181  serOrder = order
182  elif readerType == PAR:
183  # receive the serial ordering from queue, and send ordering to SerialReader
184  for serOrder in iter(qacross.get, None):
185  pass
186  lsks = len(serOrder.keys())
187  lpks = len(order.keys())
188  print("Events in Files (serial/parallel) : %i / %i" % (lsks, lpks))
189 
190  # now run files in the order specified by the serial ordering
191  # and send them one by one to the comparison engine
192  for i in iter(serOrder.keys()):
193  if readerType == PAR:
194  i = order[serOrder[i]]
195 
196  a.runSelectedEvents(fname, i)
197  lst = evt.getList()
198 
199  lst.sort()
200  ascii = dict([(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
201  qToEngine.put(ascii)
202  qToEngine.put(None)
203  print("%s Reader Finished" % (readerType))
204 
205 
206 def ComparisonEngine(pQueue, sQueue):
207  # The Comparison Engine runs on a seperate forked process and receives
208  # events in pairs, one each from Serial FileReader and Parallel FileReader
209  #
210  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
211  # and are compared using the compareEvents method
212  #
213  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
214  #
215  results = []
216  while True:
217  pitem = pQueue.get()
218  sitem = sQueue.get()
219  if pitem == sitem == None:
220  print("Termination Signals received ok")
221  break
222  elif pitem == None:
223  print("pitem != sitem : ", pitem, sitem)
224  break
225  elif sitem == None:
226  print("pitem != sitem : ", pitem, sitem)
227  break
228  results.append(compareEvents(pitem, sitem))
229  print("=" * 80)
230  print("Comparison Engine Finished")
231  print("-" * 80)
232  print("Total Events Checked : %i" % (len(results)))
233  print("Perfect Matches : %i" % (sum(results)))
234  print("Errors : %i" % (len(results) - sum(results)))
235  print("=" * 80)
236 
237 
239  # the __repr__() method for Event Data Objects will return a generic
240  # string "DataObject at 0xADDRESS" for non-Pythonised objects
241  # If these objects have the same path, they are equal, but this
242  # cannot be tested with "==" in Python, as the memory address will
243  # be different for the two different DataObjects, so this method
244  # will check if the difference is in the address
245  #
246  # args : a, b two string representations
247  ref = "DataObject at 0x"
248  if a[:16] == b[:16] == ref:
249  return True
250  else:
251  return False
252 
253 
254 def compareEvents(s, p):
255  # events in form of dictionary, with form
256  # d[ path ] = tuple( className, string_repr )
257 
258  # check 1 : number of keys (paths)
259  sks = s.keys()
260  pks = p.keys()
261  sks.sort()
262  pks.sort()
263  if len(sks) == len(pks):
264  pass
265  else:
266  # There may be extra keys in the parallel file
267  # example: DstWriter may ask for /Event/Prev/MC/Header#1
268  # but in TESSerializer, *all* DataObjects will be sent
269  # including /Event/Prev and /Event/Prev/MC
270 
271  # check for extra keys in the parallel file which are just containing DataObjects
272  # if found, remove them
273 
274  extras = list(set(pks) - set(sks))
275  for e in extras:
276  if p[e][0] == "DataObject":
277  pks.remove(e)
278  else:
279  print("Extra Other thing found!", e, p[e][0])
280  return False
281 
282  # check 2 : same paths?
283  if sks == pks:
284  pass
285  else:
286  return False
287 
288  # check 3 : check the content
289  l = len(sks)
290  diffs = []
291  for i in range(l):
292  key = sks[i]
293  # compare class name
294  if s[key][0] == p[key][0]:
295  pass
296  else:
297  diffs.append(key)
298  # compare string representation
299  if s[key][1] == p[key][1]:
300  pass
301  elif checkForAddressDifference(p[key][1], s[key][1]):
302  pass
303  else:
304  diffs.append(key)
305 
306  # finish
307  if diffs:
308  return False
309  else:
310  return True
311 
312 
313 def CheckFileRecords(par, ser):
314  print("Checking File Records")
315 
316  parFSR = GetFSRdicts(par)
317  serFSR = GetFSRdicts(ser)
318 
319  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
320  serFSR["TimeSpanFSR"].iteritems()
321  )
322  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
323  serFSR["EventCountFSR"].iteritems()
324  )
325 
326  print(
327  "\nDifferent entries in TimeSpanFSR: \t"
328  + str(len(diff1))
329  + "\nDifferent entries in EventCountFSR:\t"
330  + str(len(diff2))
331  )
332 
333  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
334  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
335  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
336  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
337  print(
338  "Different entries in "
339  + str(k)
340  + ": \tkey: "
341  + str(len(diff3))
342  + " increment: "
343  + str(len(diff4))
344  + " integral: "
345  + str(len(diff5))
346  )
347 
348 
349 def LumiFSR(lumi):
350  runs = []
351  files = []
352  info = {}
353  keys = []
354 
355  for r in lumi.runNumbers():
356  runs.append(r)
357 
358  for f in lumi.fileIDs():
359  files.append(f)
360  s = str(lumi)
361  sa = s.split("info (key/incr/integral) : ")[-1]
362  sa = sa.split("/")[:-1]
363 
364  key = []
365  incr = []
366  integral = []
367  for rec in sa:
368  k, i, t = rec.split()
369  key.append(int(k))
370  incr.append(int(i))
371  integral.append(int(t))
372 
373  return (runs, files, key, incr, integral)
374 
375 
376 def GetFSRdict(filename, queue):
377  FSR = {
378  "TimeSpanFSR": {"earliest": 0, "latest": 0},
379  "LumiFSRBeamCrossing": {"key": 0, "incr": 0, "integral": 0},
380  "LumiFSRBeam1": {"key": 0, "incr": 0, "integral": 0},
381  "LumiFSRBeam2": {"key": 0, "incr": 0, "integral": 0},
382  "LumiFSRNoBeam": {"key": 0, "incr": 0, "integral": 0},
383  "EventCountFSR": {"input": 0, "output": 0, "statusFlag": 0},
384  }
385 
386  options = (
387  "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;"
388  % filename
389  )
390  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
391  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (2011, 2011)
392  exec(options)
393  app = AppMgr()
394  app.run(1)
395  fsr = app.filerecordsvc()
396 
397  lst = fsr.getHistoNames()
398 
399  if lst:
400  for l in lst:
401  ob = fsr.retrieveObject(l)
402 
403  if "LumiFSR" in l:
404  assert ob.numberOfObjects() == 1
405  k = ob.containedObject(0)
406  runs, files, keys, increment, integral = LumiFSR(k)
407 
408  FSR[l[l.rfind("/") + 1 :]]["runs"] = runs
409  FSR[l[l.rfind("/") + 1 :]]["files"] = files
410  FSR[l[l.rfind("/") + 1 :]]["key"] = keys
411  FSR[l[l.rfind("/") + 1 :]]["incr"] = increment
412  FSR[l[l.rfind("/") + 1 :]]["integral"] = integral
413 
414  if "TimeSpanFSR" in l:
415  FSR["TimeSpanFSR"]["earliest"] = ob.containedObject(0).earliest()
416  FSR["TimeSpanFSR"]["latest"] = ob.containedObject(0).latest()
417 
418  if "EventCountFSR" in l:
419  FSR["EventCountFSR"]["input"] = ob.input()
420  FSR["EventCountFSR"]["output"] = ob.output()
421  FSR["EventCountFSR"]["statusFlag"] = ob.statusFlag()
422 
423  app.stop()
424  app.finalize()
425 
426  queue.put(FSR)
427 
428 
429 def CompareFSR(pout, sout):
430  parFSR = pout.get()
431  serFSR = sout.get()
432 
433  print("Comparing File Records")
434 
435  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
436  serFSR["TimeSpanFSR"].iteritems()
437  )
438  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
439  serFSR["EventCountFSR"].iteritems()
440  )
441 
442  print(
443  "\nDifferent entries in TimeSpanFSR: \t"
444  + str(len(diff1))
445  + "\nDifferent entries in EventCountFSR:\t"
446  + str(len(diff2))
447  )
448 
449  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
450  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
451  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
452  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
453  print(
454  "Different entries in "
455  + str(k)
456  + ": \tkey: "
457  + str(len(diff3))
458  + " increment: "
459  + str(len(diff4))
460  + " integral: "
461  + str(len(diff5))
462  )
463 
464  print("\nParallel: \n" + str(parFSR))
465  print("\nSerial: \n" + str(serFSR))
466 
467 
468 if __name__ == "__main__":
469  args = sys.argv
470  args.pop(0) # get rid of script name
471  if len(args) != 2:
472  print(
473  "Please supply two arguments : > python loadFile <parallelFile> <serialFile>"
474  )
475  sys.exit(0)
476  else:
477  par = "PFN:" + args[0]
478  ser = "PFN:" + args[1]
479  print("Parallel File to be analysed : %s" % (par))
480  print("Serial File to be analysed : %s" % (ser))
481 
482  pname = par[4:] # TFile doesn't need the "PFN:" prefix
483  sname = ser[4:]
484 
485  qacross = Queue()
486  pout = Queue()
487  sout = Queue()
488 
489  par = Process(target=Reader, args=(PAR, par, qacross, pout))
490  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
491  com = Process(target=ComparisonEngine, args=(pout, sout))
492 
493  # com.start() ; par.start() ; ser.start()
494  # ser.join() ; par.join() ; com.join()
495 
496  # CompareTrees( pname, sname )
497 
498  print("Check File Records")
499 
500  ser = sys.argv[0]
501  par = sys.argv[1]
502 
503  pout = Queue()
504  sout = Queue()
505 
506  sp = Process(target=GetFSRdict, args=(ser, sout))
507  pp = Process(target=GetFSRdict, args=(par, pout))
508  cp = Process(target=CompareFSR, args=(pout, sout))
509 
510  sp.start()
511  pp.start()
512  cp.start()
513  sp.join()
514  pp.join()
515  cp.join()
OutputStream
A small to stream Data I/O.
Definition: OutputStream.h:38
compareOutputFiles.compareEvents
def compareEvents(s, p)
Definition: compareOutputFiles.py:254
GaudiPartProp.decorators.__repr__
__repr__
decorate the vector of properties
Definition: decorators.py:173
compareOutputFiles.ComparisonEngine
ComparisonEngine
Definition: compareOutputFiles.py:491
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:887
compareOutputFiles.LumiFSR
def LumiFSR(lumi)
Definition: compareOutputFiles.py:349
compareOutputFiles.CompareFSR
CompareFSR
Definition: compareOutputFiles.py:508
compareOutputFiles.CompareTrees
def CompareTrees(pname, sname)
Definition: compareOutputFiles.py:53
Gaudi.Configuration
Definition: Configuration.py:1
compareOutputFiles.GetFSRdict
GetFSRdict
Definition: compareOutputFiles.py:506
compareOutputFiles.checkKeys
def checkKeys(name)
Definition: compareOutputFiles.py:32
compareOutputFiles.printDict
def printDict(d, name="unspecified")
Definition: compareOutputFiles.py:127
compareOutputFiles.CheckFileRecords
def CheckFileRecords(par, ser)
Definition: compareOutputFiles.py:313
compareOutputFiles.switchDict
def switchDict(d)
Definition: compareOutputFiles.py:109
GaudiKernel.ProcessJobOptions.importOptions
def importOptions(optsfile)
Definition: ProcessJobOptions.py:541
MessageSvc
Definition: MessageSvc.h:40
HistogramPersistencySvc
HistogramPersistencySvc class implementation definition.
Definition: HistogramPersistencySvc.h:57
ApplicationMgr
Definition: ApplicationMgr.h:57
EventSelector
Definition of class EventSelector.
Definition: EventSelector.h:63
compareOutputFiles.Reader
Reader
Definition: compareOutputFiles.py:489
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:97
compareOutputFiles.checkForAddressDifference
def checkForAddressDifference(a, b)
Definition: compareOutputFiles.py:238