The Gaudi Framework  v36r13 (995e4364)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 
12 # flake8: noqa (job options)
13 
14 from __future__ import print_function
15 
16 import sys
17 from multiprocessing import Process, Queue
18 
19 from Configurables import LHCbApp
20 from Gaudi.Configuration import *
21 from ROOT import TBuffer, TBufferFile, TFile
22 
23 from GaudiPython import AppMgr, gbl
24 
25 #
26 # loadFile.py
27 # -----------
28 # Open a dst file for inspection
29 #
30 
31 
32 def checkKeys(name):
33  # Check the TTree keys in each file
34  fname = name[4:] # TFile doesn't need the "PFN:" prefix
35  tf = TFile(fname, "REC")
36 
37 
38 importOptions("$STDOPTS/LHCbApplication.opts")
39 importOptions("$ENV_PROJECT_SOURCE_DIR/RootCnv/options/Setup.opts")
40 
41 OutputStream("DstWriter").Output = ""
42 HistogramPersistencySvc().OutputFile = ""
43 MessageSvc(OutputLevel=ERROR)
44 EventSelector().PrintFreq = 100
45 
46 ApplicationMgr(OutputLevel=ERROR, AppName="File Check - Serial vs Parallel")
47 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
48 
49 PAR = "PARALLEL"
50 SER = "SERIAL"
51 
52 
53 def CompareTrees(pname, sname):
54  pf = TFile(pname, "REC")
55  sf = TFile(sname, "REC")
56  event = "_Event"
57  pfks = pf.GetListOfKeys()
58  sfks = sf.GetListOfKeys()
59  pfkeys = list([pfk.GetName() for pfk in pfks])
60  pfkeys.sort()
61  sfkeys = list([sfk.GetName() for sfk in sfks])
62  sfkeys.sort()
63  pMeta = []
64  pEvent = []
65  pOther = []
66  for k in pfkeys:
67  if k.startswith(event):
68  pEvent.append(k)
69  elif k.startswith("##"):
70  pMeta.append(k)
71  else:
72  pOther.append(k)
73  sMeta = []
74  sEvent = []
75  sOther = []
76  for k in sfkeys:
77  if k.startswith(event):
78  sEvent.append(k)
79  elif k.startswith("##"):
80  sMeta.append(k)
81  else:
82  sOther.append(k)
83 
84  if pMeta == sMeta:
85  pass
86  else:
87  print("Meta Data differs")
88 
89  if pEvent == sEvent:
90  pass
91  else:
92  print("Event data differs")
93 
94  if pOther != sOther:
95  pset = set(pOther)
96  sset = set(sOther)
97  pExtra = pset - sset
98  sExtra = sset - pset
99  if pExtra:
100  print("Extra Data in parallel file : ", pExtra)
101  if sExtra:
102  print("Extra Data in serial file : ", sExtra)
103  if sExtra or pExtra:
104  print("Files will have different sizes")
105  pf.Close()
106  sf.Close()
107 
108 
109 def switchDict(d):
110  # switch a dictionary around ; make the values the keys, and vice versa
111  # only works if all values are unique
112  nkeys = len(d.keys())
113  vals = d.values()
114  nvals = len(vals)
115  for v in vals:
116  if vals.count(v) > 1:
117  print("Dictionary cannot be switched, values not unique")
118  return None
119  print("Dict has keys/values : %i/%i" % (nkeys, nvals))
120  pairs = d.items() # returns (key, val) tuples in a list
121  newd = {}
122  for k, entry in pairs:
123  newd[entry] = k
124  return newd
125 
126 
127 def printDict(d, name="unspecified"):
128  # Print out a dictionary in the form
129  #
130  # Dictionary Name :
131  # key value
132  # key value
133  # ...
134  #
135  print("-" * 80)
136  print("Dictionary %s : " % (name))
137  for k in iter(d.keys()):
138  print("\t", k, "\t", d[k])
139  print("-" * 80)
140 
141 
142 def Reader(readerType, filename, qacross, qToEngine):
143  #
144  # Process for reading a file
145  # One process for reading Serial File, another for Parallel File
146  #
147  # First the order of events is determined, (parallel != serial, usually)
148  #
149  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
150  # on both Serial-Reader and Parallel-Reader processes.
151  #
152  # The string repr of everything in the TES is placed in a dictionary and
153  # sent to the comparison Process, which compares the two dictionaries
154  #
155  a = AppMgr()
156  sel = a.evtsel()
157  evt = a.evtsvc()
158 
159  header = "/Event/Rec/Header"
160  sel.open(filename)
161  ct = 0
162  order = {}
163  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
164 
165  # determine the ordering
166  while True:
167  a.run(1)
168  if evt[header]:
169  eNumber = int(evt[header].evtNumber())
170  order[eNumber] = ct
171  ct += 1
172  else:
173  break
174 
175  if readerType == SER:
176  # send the ordering details to the parallel-reader
177  order = switchDict(order)
178  qacross.put(order)
179  qacross.put(None)
180  # changeName
181  serOrder = order
182  elif readerType == PAR:
183  # receive the serial ordering from queue, and send ordering to SerialReader
184  for serOrder in iter(qacross.get, None):
185  pass
186  lsks = len(serOrder.keys())
187  lpks = len(order.keys())
188  print("Events in Files (serial/parallel) : %i / %i" % (lsks, lpks))
189 
190  # now run files in the order specified by the serial ordering
191  # and send them one by one to the comparison engine
192  for i in iter(serOrder.keys()):
193  if readerType == PAR:
194  i = order[serOrder[i]]
195 
196  a.runSelectedEvents(fname, i)
197  lst = evt.getList()
198 
199  lst.sort()
200  ascii = dict([(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
201  qToEngine.put(ascii)
202  qToEngine.put(None)
203  print("%s Reader Finished" % (readerType))
204 
205 
206 def ComparisonEngine(pQueue, sQueue):
207  # The Comparison Engine runs on a seperate forked process and receives
208  # events in pairs, one each from Serial FileReader and Parallel FileReader
209  #
210  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
211  # and are compared using the compareEvents method
212  #
213  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
214  #
215  results = []
216  while True:
217  pitem = pQueue.get()
218  sitem = sQueue.get()
219  if pitem == sitem == None:
220  print("Termination Signals received ok")
221  break
222  elif pitem == None:
223  print("pitem != sitem : ", pitem, sitem)
224  break
225  elif sitem == None:
226  print("pitem != sitem : ", pitem, sitem)
227  break
228  results.append(compareEvents(pitem, sitem))
229  print("=" * 80)
230  print("Comparison Engine Finished")
231  print("-" * 80)
232  print("Total Events Checked : %i" % (len(results)))
233  print("Perfect Matches : %i" % (sum(results)))
234  print("Errors : %i" % (len(results) - sum(results)))
235  print("=" * 80)
236 
237 
239  # the __repr__() method for Event Data Objects will return a generic
240  # string "DataObject at 0xADDRESS" for non-Pythonised objects
241  # If these objects have the same path, they are equal, but this
242  # cannot be tested with "==" in Python, as the memory address will
243  # be different for the two different DataObjects, so this method
244  # will check if the difference is in the address
245  #
246  # args : a, b two string representations
247  ref = "DataObject at 0x"
248  if a[:16] == b[:16] == ref:
249  return True
250  else:
251  return False
252 
253 
254 def compareEvents(s, p):
255  # events in form of dictionary, with form
256  # d[ path ] = tuple( className, string_repr )
257 
258  # check 1 : number of keys (paths)
259  sks = s.keys()
260  pks = p.keys()
261  sks.sort()
262  pks.sort()
263  if len(sks) == len(pks):
264  pass
265  else:
266  # There may be extra keys in the parallel file
267  # example: DstWriter may ask for /Event/Prev/MC/Header#1
268  # but in TESSerializer, *all* DataObjects will be sent
269  # including /Event/Prev and /Event/Prev/MC
270 
271  # check for extra keys in the parallel file which are just containing DataObjects
272  # if found, remove them
273 
274  extras = list(set(pks) - set(sks))
275  for e in extras:
276  if p[e][0] == "DataObject":
277  pks.remove(e)
278  else:
279  print("Extra Other thing found!", e, p[e][0])
280  return False
281 
282  # check 2 : same paths?
283  if sks == pks:
284  pass
285  else:
286  return False
287 
288  # check 3 : check the content
289  l = len(sks)
290  diffs = []
291  for i in range(l):
292  key = sks[i]
293  # compare class name
294  if s[key][0] == p[key][0]:
295  pass
296  else:
297  diffs.append(key)
298  # compare string representation
299  if s[key][1] == p[key][1]:
300  pass
301  elif checkForAddressDifference(p[key][1], s[key][1]):
302  pass
303  else:
304  diffs.append(key)
305 
306  # finish
307  if diffs:
308  return False
309  else:
310  return True
311 
312 
313 def CheckFileRecords(par, ser):
314 
315  print("Checking File Records")
316 
317  parFSR = GetFSRdicts(par)
318  serFSR = GetFSRdicts(ser)
319 
320  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
321  serFSR["TimeSpanFSR"].iteritems()
322  )
323  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
324  serFSR["EventCountFSR"].iteritems()
325  )
326 
327  print(
328  "\nDifferent entries in TimeSpanFSR: \t"
329  + str(len(diff1))
330  + "\nDifferent entries in EventCountFSR:\t"
331  + str(len(diff2))
332  )
333 
334  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
335  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
336  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
337  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
338  print(
339  "Different entries in "
340  + str(k)
341  + ": \tkey: "
342  + str(len(diff3))
343  + " increment: "
344  + str(len(diff4))
345  + " integral: "
346  + str(len(diff5))
347  )
348 
349 
350 def LumiFSR(lumi):
351 
352  runs = []
353  files = []
354  info = {}
355  keys = []
356 
357  for r in lumi.runNumbers():
358  runs.append(r)
359 
360  for f in lumi.fileIDs():
361  files.append(f)
362  s = str(lumi)
363  sa = s.split("info (key/incr/integral) : ")[-1]
364  sa = sa.split("/")[:-1]
365 
366  key = []
367  incr = []
368  integral = []
369  for rec in sa:
370  k, i, t = rec.split()
371  key.append(int(k))
372  incr.append(int(i))
373  integral.append(int(t))
374 
375  return (runs, files, key, incr, integral)
376 
377 
378 def GetFSRdict(filename, queue):
379 
380  FSR = {
381  "TimeSpanFSR": {"earliest": 0, "latest": 0},
382  "LumiFSRBeamCrossing": {"key": 0, "incr": 0, "integral": 0},
383  "LumiFSRBeam1": {"key": 0, "incr": 0, "integral": 0},
384  "LumiFSRBeam2": {"key": 0, "incr": 0, "integral": 0},
385  "LumiFSRNoBeam": {"key": 0, "incr": 0, "integral": 0},
386  "EventCountFSR": {"input": 0, "output": 0, "statusFlag": 0},
387  }
388 
389  options = (
390  "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;"
391  % filename
392  )
393  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
394  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (2011, 2011)
395  exec(options)
396  app = AppMgr()
397  app.run(1)
398  fsr = app.filerecordsvc()
399 
400  lst = fsr.getHistoNames()
401 
402  if lst:
403  for l in lst:
404 
405  ob = fsr.retrieveObject(l)
406 
407  if "LumiFSR" in l:
408 
409  assert ob.numberOfObjects() == 1
410  k = ob.containedObject(0)
411  runs, files, keys, increment, integral = LumiFSR(k)
412 
413  FSR[l[l.rfind("/") + 1 :]]["runs"] = runs
414  FSR[l[l.rfind("/") + 1 :]]["files"] = files
415  FSR[l[l.rfind("/") + 1 :]]["key"] = keys
416  FSR[l[l.rfind("/") + 1 :]]["incr"] = increment
417  FSR[l[l.rfind("/") + 1 :]]["integral"] = integral
418 
419  if "TimeSpanFSR" in l:
420 
421  FSR["TimeSpanFSR"]["earliest"] = ob.containedObject(0).earliest()
422  FSR["TimeSpanFSR"]["latest"] = ob.containedObject(0).latest()
423 
424  if "EventCountFSR" in l:
425 
426  FSR["EventCountFSR"]["input"] = ob.input()
427  FSR["EventCountFSR"]["output"] = ob.output()
428  FSR["EventCountFSR"]["statusFlag"] = ob.statusFlag()
429 
430  app.stop()
431  app.finalize()
432 
433  queue.put(FSR)
434 
435 
436 def CompareFSR(pout, sout):
437 
438  parFSR = pout.get()
439  serFSR = sout.get()
440 
441  print("Comparing File Records")
442 
443  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
444  serFSR["TimeSpanFSR"].iteritems()
445  )
446  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
447  serFSR["EventCountFSR"].iteritems()
448  )
449 
450  print(
451  "\nDifferent entries in TimeSpanFSR: \t"
452  + str(len(diff1))
453  + "\nDifferent entries in EventCountFSR:\t"
454  + str(len(diff2))
455  )
456 
457  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
458  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
459  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
460  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
461  print(
462  "Different entries in "
463  + str(k)
464  + ": \tkey: "
465  + str(len(diff3))
466  + " increment: "
467  + str(len(diff4))
468  + " integral: "
469  + str(len(diff5))
470  )
471 
472  print("\nParallel: \n" + str(parFSR))
473  print("\nSerial: \n" + str(serFSR))
474 
475 
476 if __name__ == "__main__":
477 
478  args = sys.argv
479  args.pop(0) # get rid of script name
480  if len(args) != 2:
481  print(
482  "Please supply two arguments : > python loadFile <parallelFile> <serialFile>"
483  )
484  sys.exit(0)
485  else:
486  par = "PFN:" + args[0]
487  ser = "PFN:" + args[1]
488  print("Parallel File to be analysed : %s" % (par))
489  print("Serial File to be analysed : %s" % (ser))
490 
491  pname = par[4:] # TFile doesn't need the "PFN:" prefix
492  sname = ser[4:]
493 
494  qacross = Queue()
495  pout = Queue()
496  sout = Queue()
497 
498  par = Process(target=Reader, args=(PAR, par, qacross, pout))
499  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
500  com = Process(target=ComparisonEngine, args=(pout, sout))
501 
502  # com.start() ; par.start() ; ser.start()
503  # ser.join() ; par.join() ; com.join()
504 
505  # CompareTrees( pname, sname )
506 
507  print("Check File Records")
508 
509  ser = sys.argv[0]
510  par = sys.argv[1]
511 
512  pout = Queue()
513  sout = Queue()
514 
515  sp = Process(target=GetFSRdict, args=(ser, sout))
516  pp = Process(target=GetFSRdict, args=(par, pout))
517  cp = Process(target=CompareFSR, args=(pout, sout))
518 
519  sp.start()
520  pp.start()
521  cp.start()
522  sp.join()
523  pp.join()
524  cp.join()
OutputStream
A small to stream Data I/O.
Definition: OutputStream.h:38
compareOutputFiles.compareEvents
def compareEvents(s, p)
Definition: compareOutputFiles.py:254
compareOutputFiles.ComparisonEngine
ComparisonEngine
Definition: compareOutputFiles.py:500
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:869
compareOutputFiles.LumiFSR
def LumiFSR(lumi)
Definition: compareOutputFiles.py:350
compareOutputFiles.CompareFSR
CompareFSR
Definition: compareOutputFiles.py:517
compareOutputFiles.CompareTrees
def CompareTrees(pname, sname)
Definition: compareOutputFiles.py:53
Gaudi.Configuration
Definition: Configuration.py:1
GaudiPython.HistoUtils.__repr__
__repr__
Definition: HistoUtils.py:536
compareOutputFiles.GetFSRdict
GetFSRdict
Definition: compareOutputFiles.py:515
compareOutputFiles.checkKeys
def checkKeys(name)
Definition: compareOutputFiles.py:32
compareOutputFiles.printDict
def printDict(d, name="unspecified")
Definition: compareOutputFiles.py:127
compareOutputFiles.CheckFileRecords
def CheckFileRecords(par, ser)
Definition: compareOutputFiles.py:313
compareOutputFiles.switchDict
def switchDict(d)
Definition: compareOutputFiles.py:109
GaudiKernel.ProcessJobOptions.importOptions
def importOptions(optsfile)
Definition: ProcessJobOptions.py:541
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:545
MessageSvc
Definition: MessageSvc.h:40
HistogramPersistencySvc
HistogramPersistencySvc class implementation definition.
Definition: HistogramPersistencySvc.h:57
ApplicationMgr
Definition: ApplicationMgr.h:57
EventSelector
Definition of class EventSelector.
Definition: EventSelector.h:63
compareOutputFiles.Reader
Reader
Definition: compareOutputFiles.py:498
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
compareOutputFiles.checkForAddressDifference
def checkForAddressDifference(a, b)
Definition: compareOutputFiles.py:238