The Gaudi Framework  v36r9p1 (5c15b2bb)
compareOutputFiles.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 
13 import sys
14 from multiprocessing import Process, Queue
15 
16 from Configurables import LHCbApp
17 from Gaudi.Configuration import *
18 from ROOT import TBuffer, TBufferFile, TFile
19 
20 from GaudiPython import AppMgr, gbl
21 
22 #
23 # loadFile.py
24 # -----------
25 # Open a dst file for inspection
26 #
27 
28 
29 def checkKeys(name):
30  # Check the TTree keys in each file
31  fname = name[4:] # TFile doesn't need the "PFN:" prefix
32  tf = TFile(fname, "REC")
33 
34 
35 importOptions("$STDOPTS/LHCbApplication.opts")
36 importOptions("$ENV_PROJECT_SOURCE_DIR/RootCnv/options/Setup.opts")
37 
38 OutputStream("DstWriter").Output = ""
39 HistogramPersistencySvc().OutputFile = ""
40 MessageSvc(OutputLevel=ERROR)
41 EventSelector().PrintFreq = 100
42 
43 ApplicationMgr(OutputLevel=ERROR, AppName="File Check - Serial vs Parallel")
44 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
45 
46 PAR = "PARALLEL"
47 SER = "SERIAL"
48 
49 
50 def CompareTrees(pname, sname):
51  pf = TFile(pname, "REC")
52  sf = TFile(sname, "REC")
53  event = "_Event"
54  pfks = pf.GetListOfKeys()
55  sfks = sf.GetListOfKeys()
56  pfkeys = list([pfk.GetName() for pfk in pfks])
57  pfkeys.sort()
58  sfkeys = list([sfk.GetName() for sfk in sfks])
59  sfkeys.sort()
60  pMeta = []
61  pEvent = []
62  pOther = []
63  for k in pfkeys:
64  if k.startswith(event):
65  pEvent.append(k)
66  elif k.startswith("##"):
67  pMeta.append(k)
68  else:
69  pOther.append(k)
70  sMeta = []
71  sEvent = []
72  sOther = []
73  for k in sfkeys:
74  if k.startswith(event):
75  sEvent.append(k)
76  elif k.startswith("##"):
77  sMeta.append(k)
78  else:
79  sOther.append(k)
80 
81  if pMeta == sMeta:
82  pass
83  else:
84  print("Meta Data differs")
85 
86  if pEvent == sEvent:
87  pass
88  else:
89  print("Event data differs")
90 
91  if pOther != sOther:
92  pset = set(pOther)
93  sset = set(sOther)
94  pExtra = pset - sset
95  sExtra = sset - pset
96  if pExtra:
97  print("Extra Data in parallel file : ", pExtra)
98  if sExtra:
99  print("Extra Data in serial file : ", sExtra)
100  if sExtra or pExtra:
101  print("Files will have different sizes")
102  pf.Close()
103  sf.Close()
104 
105 
106 def switchDict(d):
107  # switch a dictionary around ; make the values the keys, and vice versa
108  # only works if all values are unique
109  nkeys = len(d.keys())
110  vals = d.values()
111  nvals = len(vals)
112  for v in vals:
113  if vals.count(v) > 1:
114  print("Dictionary cannot be switched, values not unique")
115  return None
116  print("Dict has keys/values : %i/%i" % (nkeys, nvals))
117  pairs = d.items() # returns (key, val) tuples in a list
118  newd = {}
119  for k, entry in pairs:
120  newd[entry] = k
121  return newd
122 
123 
124 def printDict(d, name="unspecified"):
125  # Print out a dictionary in the form
126  #
127  # Dictionary Name :
128  # key value
129  # key value
130  # ...
131  #
132  print("-" * 80)
133  print("Dictionary %s : " % (name))
134  for k in iter(d.keys()):
135  print("\t", k, "\t", d[k])
136  print("-" * 80)
137 
138 
139 def Reader(readerType, filename, qacross, qToEngine):
140  #
141  # Process for reading a file
142  # One process for reading Serial File, another for Parallel File
143  #
144  # First the order of events is determined, (parallel != serial, usually)
145  #
146  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
147  # on both Serial-Reader and Parallel-Reader processes.
148  #
149  # The string repr of everything in the TES is placed in a dictionary and
150  # sent to the comparison Process, which compares the two dictionaries
151  #
152  a = AppMgr()
153  sel = a.evtsel()
154  evt = a.evtsvc()
155 
156  header = "/Event/Rec/Header"
157  sel.open(filename)
158  ct = 0
159  order = {}
160  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
161 
162  # determine the ordering
163  while True:
164  a.run(1)
165  if evt[header]:
166  eNumber = int(evt[header].evtNumber())
167  order[eNumber] = ct
168  ct += 1
169  else:
170  break
171 
172  if readerType == SER:
173  # send the ordering details to the parallel-reader
174  order = switchDict(order)
175  qacross.put(order)
176  qacross.put(None)
177  # changeName
178  serOrder = order
179  elif readerType == PAR:
180  # receive the serial ordering from queue, and send ordering to SerialReader
181  for serOrder in iter(qacross.get, None):
182  pass
183  lsks = len(serOrder.keys())
184  lpks = len(order.keys())
185  print("Events in Files (serial/parallel) : %i / %i" % (lsks, lpks))
186 
187  # now run files in the order specified by the serial ordering
188  # and send them one by one to the comparison engine
189  for i in iter(serOrder.keys()):
190  if readerType == PAR:
191  i = order[serOrder[i]]
192 
193  a.runSelectedEvents(fname, i)
194  lst = evt.getList()
195 
196  lst.sort()
197  ascii = dict([(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
198  qToEngine.put(ascii)
199  qToEngine.put(None)
200  print("%s Reader Finished" % (readerType))
201 
202 
203 def ComparisonEngine(pQueue, sQueue):
204  # The Comparison Engine runs on a seperate forked process and receives
205  # events in pairs, one each from Serial FileReader and Parallel FileReader
206  #
207  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
208  # and are compared using the compareEvents method
209  #
210  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
211  #
212  results = []
213  while True:
214  pitem = pQueue.get()
215  sitem = sQueue.get()
216  if pitem == sitem == None:
217  print("Termination Signals received ok")
218  break
219  elif pitem == None:
220  print("pitem != sitem : ", pitem, sitem)
221  break
222  elif sitem == None:
223  print("pitem != sitem : ", pitem, sitem)
224  break
225  results.append(compareEvents(pitem, sitem))
226  print("=" * 80)
227  print("Comparison Engine Finished")
228  print("-" * 80)
229  print("Total Events Checked : %i" % (len(results)))
230  print("Perfect Matches : %i" % (sum(results)))
231  print("Errors : %i" % (len(results) - sum(results)))
232  print("=" * 80)
233 
234 
236  # the __repr__() method for Event Data Objects will return a generic
237  # string "DataObject at 0xADDRESS" for non-Pythonised objects
238  # If these objects have the same path, they are equal, but this
239  # cannot be tested with "==" in Python, as the memory address will
240  # be different for the two different DataObjects, so this method
241  # will check if the difference is in the address
242  #
243  # args : a, b two string representations
244  ref = "DataObject at 0x"
245  if a[:16] == b[:16] == ref:
246  return True
247  else:
248  return False
249 
250 
251 def compareEvents(s, p):
252  # events in form of dictionary, with form
253  # d[ path ] = tuple( className, string_repr )
254 
255  # check 1 : number of keys (paths)
256  sks = s.keys()
257  pks = p.keys()
258  sks.sort()
259  pks.sort()
260  if len(sks) == len(pks):
261  pass
262  else:
263  # There may be extra keys in the parallel file
264  # example: DstWriter may ask for /Event/Prev/MC/Header#1
265  # but in TESSerializer, *all* DataObjects will be sent
266  # including /Event/Prev and /Event/Prev/MC
267 
268  # check for extra keys in the parallel file which are just containing DataObjects
269  # if found, remove them
270 
271  extras = list(set(pks) - set(sks))
272  for e in extras:
273  if p[e][0] == "DataObject":
274  pks.remove(e)
275  else:
276  print("Extra Other thing found!", e, p[e][0])
277  return False
278 
279  # check 2 : same paths?
280  if sks == pks:
281  pass
282  else:
283  return False
284 
285  # check 3 : check the content
286  l = len(sks)
287  diffs = []
288  for i in range(l):
289  key = sks[i]
290  # compare class name
291  if s[key][0] == p[key][0]:
292  pass
293  else:
294  diffs.append(key)
295  # compare string representation
296  if s[key][1] == p[key][1]:
297  pass
298  elif checkForAddressDifference(p[key][1], s[key][1]):
299  pass
300  else:
301  diffs.append(key)
302 
303  # finish
304  if diffs:
305  return False
306  else:
307  return True
308 
309 
310 def CheckFileRecords(par, ser):
311 
312  print("Checking File Records")
313 
314  parFSR = GetFSRdicts(par)
315  serFSR = GetFSRdicts(ser)
316 
317  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
318  serFSR["TimeSpanFSR"].iteritems()
319  )
320  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
321  serFSR["EventCountFSR"].iteritems()
322  )
323 
324  print(
325  "\nDifferent entries in TimeSpanFSR: \t"
326  + str(len(diff1))
327  + "\nDifferent entries in EventCountFSR:\t"
328  + str(len(diff2))
329  )
330 
331  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
332  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
333  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
334  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
335  print(
336  "Different entries in "
337  + str(k)
338  + ": \tkey: "
339  + str(len(diff3))
340  + " increment: "
341  + str(len(diff4))
342  + " integral: "
343  + str(len(diff5))
344  )
345 
346 
347 def LumiFSR(lumi):
348 
349  runs = []
350  files = []
351  info = {}
352  keys = []
353 
354  for r in lumi.runNumbers():
355  runs.append(r)
356 
357  for f in lumi.fileIDs():
358  files.append(f)
359  s = str(lumi)
360  sa = s.split("info (key/incr/integral) : ")[-1]
361  sa = sa.split("/")[:-1]
362 
363  key = []
364  incr = []
365  integral = []
366  for rec in sa:
367  k, i, t = rec.split()
368  key.append(int(k))
369  incr.append(int(i))
370  integral.append(int(t))
371 
372  return (runs, files, key, incr, integral)
373 
374 
375 def GetFSRdict(filename, queue):
376 
377  FSR = {
378  "TimeSpanFSR": {"earliest": 0, "latest": 0},
379  "LumiFSRBeamCrossing": {"key": 0, "incr": 0, "integral": 0},
380  "LumiFSRBeam1": {"key": 0, "incr": 0, "integral": 0},
381  "LumiFSRBeam2": {"key": 0, "incr": 0, "integral": 0},
382  "LumiFSRNoBeam": {"key": 0, "incr": 0, "integral": 0},
383  "EventCountFSR": {"input": 0, "output": 0, "statusFlag": 0},
384  }
385 
386  options = (
387  "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;"
388  % filename
389  )
390  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
391  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (2011, 2011)
392  exec(options)
393  app = AppMgr()
394  app.run(1)
395  fsr = app.filerecordsvc()
396 
397  lst = fsr.getHistoNames()
398 
399  if lst:
400  for l in lst:
401 
402  ob = fsr.retrieveObject(l)
403 
404  if "LumiFSR" in l:
405 
406  assert ob.numberOfObjects() == 1
407  k = ob.containedObject(0)
408  runs, files, keys, increment, integral = LumiFSR(k)
409 
410  FSR[l[l.rfind("/") + 1 :]]["runs"] = runs
411  FSR[l[l.rfind("/") + 1 :]]["files"] = files
412  FSR[l[l.rfind("/") + 1 :]]["key"] = keys
413  FSR[l[l.rfind("/") + 1 :]]["incr"] = increment
414  FSR[l[l.rfind("/") + 1 :]]["integral"] = integral
415 
416  if "TimeSpanFSR" in l:
417 
418  FSR["TimeSpanFSR"]["earliest"] = ob.containedObject(0).earliest()
419  FSR["TimeSpanFSR"]["latest"] = ob.containedObject(0).latest()
420 
421  if "EventCountFSR" in l:
422 
423  FSR["EventCountFSR"]["input"] = ob.input()
424  FSR["EventCountFSR"]["output"] = ob.output()
425  FSR["EventCountFSR"]["statusFlag"] = ob.statusFlag()
426 
427  app.stop()
428  app.finalize()
429 
430  queue.put(FSR)
431 
432 
433 def CompareFSR(pout, sout):
434 
435  parFSR = pout.get()
436  serFSR = sout.get()
437 
438  print("Comparing File Records")
439 
440  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
441  serFSR["TimeSpanFSR"].iteritems()
442  )
443  diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
444  serFSR["EventCountFSR"].iteritems()
445  )
446 
447  print(
448  "\nDifferent entries in TimeSpanFSR: \t"
449  + str(len(diff1))
450  + "\nDifferent entries in EventCountFSR:\t"
451  + str(len(diff2))
452  )
453 
454  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
455  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
456  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
457  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
458  print(
459  "Different entries in "
460  + str(k)
461  + ": \tkey: "
462  + str(len(diff3))
463  + " increment: "
464  + str(len(diff4))
465  + " integral: "
466  + str(len(diff5))
467  )
468 
469  print("\nParallel: \n" + str(parFSR))
470  print("\nSerial: \n" + str(serFSR))
471 
472 
473 if __name__ == "__main__":
474 
475  args = sys.argv
476  args.pop(0) # get rid of script name
477  if len(args) != 2:
478  print(
479  "Please supply two arguments : > python loadFile <parallelFile> <serialFile>"
480  )
481  sys.exit(0)
482  else:
483  par = "PFN:" + args[0]
484  ser = "PFN:" + args[1]
485  print("Parallel File to be analysed : %s" % (par))
486  print("Serial File to be analysed : %s" % (ser))
487 
488  pname = par[4:] # TFile doesn't need the "PFN:" prefix
489  sname = ser[4:]
490 
491  qacross = Queue()
492  pout = Queue()
493  sout = Queue()
494 
495  par = Process(target=Reader, args=(PAR, par, qacross, pout))
496  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
497  com = Process(target=ComparisonEngine, args=(pout, sout))
498 
499  # com.start() ; par.start() ; ser.start()
500  # ser.join() ; par.join() ; com.join()
501 
502  # CompareTrees( pname, sname )
503 
504  print("Check File Records")
505 
506  ser = sys.argv[0]
507  par = sys.argv[1]
508 
509  pout = Queue()
510  sout = Queue()
511 
512  sp = Process(target=GetFSRdict, args=(ser, sout))
513  pp = Process(target=GetFSRdict, args=(par, pout))
514  cp = Process(target=CompareFSR, args=(pout, sout))
515 
516  sp.start()
517  pp.start()
518  cp.start()
519  sp.join()
520  pp.join()
521  cp.join()
OutputStream
A small to stream Data I/O.
Definition: OutputStream.h:38
compareOutputFiles.compareEvents
def compareEvents(s, p)
Definition: compareOutputFiles.py:251
compareOutputFiles.ComparisonEngine
def ComparisonEngine(pQueue, sQueue)
Definition: compareOutputFiles.py:203
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:873
compareOutputFiles.LumiFSR
def LumiFSR(lumi)
Definition: compareOutputFiles.py:347
compareOutputFiles.CompareTrees
def CompareTrees(pname, sname)
Definition: compareOutputFiles.py:50
Gaudi.Configuration
Definition: Configuration.py:1
GaudiPython.HistoUtils.__repr__
__repr__
Definition: HistoUtils.py:536
compareOutputFiles.checkKeys
def checkKeys(name)
Definition: compareOutputFiles.py:29
compareOutputFiles.printDict
def printDict(d, name="unspecified")
Definition: compareOutputFiles.py:124
compareOutputFiles.CheckFileRecords
def CheckFileRecords(par, ser)
Definition: compareOutputFiles.py:310
compareOutputFiles.switchDict
def switchDict(d)
Definition: compareOutputFiles.py:106
GaudiKernel.ProcessJobOptions.importOptions
def importOptions(optsfile)
Definition: ProcessJobOptions.py:541
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:545
MessageSvc
Definition: MessageSvc.h:40
compareOutputFiles.GetFSRdict
def GetFSRdict(filename, queue)
Definition: compareOutputFiles.py:375
HistogramPersistencySvc
HistogramPersistencySvc class implementation definition.
Definition: HistogramPersistencySvc.h:57
ApplicationMgr
Definition: ApplicationMgr.h:57
EventSelector
Definition of class EventSelector.
Definition: EventSelector.h:63
compareOutputFiles.Reader
def Reader(readerType, filename, qacross, qToEngine)
Definition: compareOutputFiles.py:139
compareOutputFiles.CompareFSR
def CompareFSR(pout, sout)
Definition: compareOutputFiles.py:433
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
compareOutputFiles.checkForAddressDifference
def checkForAddressDifference(a, b)
Definition: compareOutputFiles.py:235