The Gaudi Framework  v30r3 (a5ef0a68)
compareOutputFiles.py
Go to the documentation of this file.
1 
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl
4 from ROOT import TFile, TBufferFile, TBuffer
5 from multiprocessing import Process, Queue
6 from Configurables import LHCbApp
7 import sys
8 
9 #
10 # loadFile.py
11 # -----------
12 # Open a dst file for inspection
13 #
14 
15 
16 def checkKeys(name):
17  # Check the TTree keys in each file
18  fname = name[4:] # TFile doesn't need the "PFN:" prefix
19  tf = TFile(fname, 'REC')
20 
21 
22 importOptions('$STDOPTS/LHCbApplication.opts')
23 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
24 importOptions('$GAUDICNVROOT/options/Setup.opts')
25 
26 
27 OutputStream("DstWriter").Output = ''
28 HistogramPersistencySvc().OutputFile = ''
29 MessageSvc(OutputLevel=ERROR)
30 EventSelector().PrintFreq = 100
31 
32 ApplicationMgr(OutputLevel=ERROR,
33  AppName='File Check - Serial vs Parallel')
34 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
35 
36 PAR = 'PARALLEL'
37 SER = 'SERIAL'
38 
39 
40 def CompareTrees(pname, sname):
41  pf = TFile(pname, 'REC')
42  sf = TFile(sname, 'REC')
43  event = '_Event'
44  pfks = pf.GetListOfKeys()
45  sfks = sf.GetListOfKeys()
46  pfkeys = list([pfk.GetName() for pfk in pfks])
47  pfkeys.sort()
48  sfkeys = list([sfk.GetName() for sfk in sfks])
49  sfkeys.sort()
50  pMeta = []
51  pEvent = []
52  pOther = []
53  for k in pfkeys:
54  if k.startswith(event):
55  pEvent.append(k)
56  elif k.startswith('##'):
57  pMeta.append(k)
58  else:
59  pOther.append(k)
60  sMeta = []
61  sEvent = []
62  sOther = []
63  for k in sfkeys:
64  if k.startswith(event):
65  sEvent.append(k)
66  elif k.startswith('##'):
67  sMeta.append(k)
68  else:
69  sOther.append(k)
70 
71  if pMeta == sMeta:
72  pass
73  else:
74  print 'Meta Data differs'
75 
76  if pEvent == sEvent:
77  pass
78  else:
79  print 'Event data differs'
80 
81  if pOther != sOther:
82  pset = set(pOther)
83  sset = set(sOther)
84  pExtra = pset - sset
85  sExtra = sset - pset
86  if pExtra:
87  print 'Extra Data in parallel file : ', pExtra
88  if sExtra:
89  print 'Extra Data in serial file : ', sExtra
90  if sExtra or pExtra:
91  print 'Files will have different sizes'
92  pf.Close()
93  sf.Close()
94 
95 
96 def switchDict(d):
97  # switch a dictionary around ; make the values the keys, and vice versa
98  # only works if all values are unique
99  nkeys = len(d.keys())
100  vals = d.values()
101  nvals = len(vals)
102  for v in vals:
103  if vals.count(v) > 1:
104  print 'Dictionary cannot be switched, values not unique'
105  return None
106  print 'Dict has keys/values : %i/%i' % (nkeys, nvals)
107  pairs = d.items() # returns (key, val) tuples in a list
108  newd = {}
109  for k, entry in pairs:
110  newd[entry] = k
111  return newd
112 
113 
114 def printDict(d, name='unspecified'):
115  # Print out a dictionary in the form
116  #
117  # Dictionary Name :
118  # key value
119  # key value
120  # ...
121  #
122  print '-' * 80
123  print 'Dictionary %s : ' % (name)
124  for k in iter(d.keys()):
125  print '\t', k, '\t', d[k]
126  print '-' * 80
127 
128 
129 def Reader(readerType, filename, qacross, qToEngine):
130  #
131  # Process for reading a file
132  # One process for reading Serial File, another for Parallel File
133  #
134  # First the order of events is determined, (parallel != serial, usually)
135  #
136  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
137  # on both Serial-Reader and Parallel-Reader processes.
138  #
139  # The string repr of everything in the TES is placed in a dictionary and
140  # sent to the comparison Process, which compares the two dictionaries
141  #
142  a = AppMgr()
143  sel = a.evtsel()
144  evt = a.evtsvc()
145 
146  header = '/Event/Rec/Header'
147  sel.open(filename)
148  ct = 0
149  order = {}
150  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
151 
152  # determine the ordering
153  while True:
154  a.run(1)
155  if evt[header]:
156  eNumber = int(evt[header].evtNumber())
157  order[eNumber] = ct
158  ct += 1
159  else:
160  break
161 
162  if readerType == SER:
163  # send the ordering details to the parallel-reader
164  order = switchDict(order)
165  qacross.put(order)
166  qacross.put(None)
167  # changeName
168  serOrder = order
169  elif readerType == PAR:
170  # receive the serial ordering from queue, and send ordering to SerialReader
171  for serOrder in iter(qacross.get, None):
172  pass
173  lsks = len(serOrder.keys())
174  lpks = len(order.keys())
175  print 'Events in Files (serial/parallel) : %i / %i' % (lsks, lpks)
176 
177  # now run files in the order specified by the serial ordering
178  # and send them one by one to the comparison engine
179  for i in iter(serOrder.keys()):
180  if readerType == PAR:
181  i = order[serOrder[i]]
182 
183  a.runSelectedEvents(fname, i)
184  lst = evt.getList()
185 
186  lst.sort()
187  ascii = dict(
188  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
189  qToEngine.put(ascii)
190  qToEngine.put(None)
191  print '%s Reader Finished' % (readerType)
192 
193 
194 def ComparisonEngine(pQueue, sQueue):
195  # The Comparison Engine runs on a seperate forked process and receives
196  # events in pairs, one each from Serial FileReader and Parallel FileReader
197  #
198  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
199  # and are compared using the compareEvents method
200  #
201  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
202  #
203  results = []
204  while True:
205  pitem = pQueue.get()
206  sitem = sQueue.get()
207  if pitem == sitem == None:
208  print 'Termination Signals received ok'
209  break
210  elif pitem == None:
211  print 'pitem != sitem : ', pitem, sitem
212  break
213  elif sitem == None:
214  print 'pitem != sitem : ', pitem, sitem
215  break
216  results.append(compareEvents(pitem, sitem))
217  print '=' * 80
218  print 'Comparison Engine Finished'
219  print '-' * 80
220  print 'Total Events Checked : %i' % (len(results))
221  print 'Perfect Matches : %i' % (sum(results))
222  print 'Errors : %i' % (len(results) - sum(results))
223  print '=' * 80
224 
225 
227  # the __repr__() method for Event Data Objects will return a generic
228  # string "DataObject at 0xADDRESS" for non-Pythonised objects
229  # If these objects have the same path, they are equal, but this
230  # cannot be tested with "==" in Python, as the memory address will
231  # be different for the two different DataObjects, so this method
232  # will check if the difference is in the address
233  #
234  # args : a, b two string representations
235  ref = 'DataObject at 0x'
236  if a[:16] == b[:16] == ref:
237  return True
238  else:
239  return False
240 
241 
242 def compareEvents(s, p):
243  # events in form of dictionary, with form
244  # d[ path ] = tuple( className, string_repr )
245 
246  # check 1 : number of keys (paths)
247  sks = s.keys()
248  pks = p.keys()
249  sks.sort()
250  pks.sort()
251  if len(sks) == len(pks):
252  pass
253  else:
254  # There may be extra keys in the parallel file
255  # example: DstWriter may ask for /Event/Prev/MC/Header#1
256  # but in TESSerializer, *all* DataObjects will be sent
257  # including /Event/Prev and /Event/Prev/MC
258 
259  # check for extra keys in the parallel file which are just containing DataObjects
260  # if found, remove them
261 
262  extras = list(set(pks) - set(sks))
263  for e in extras:
264  if p[e][0] == 'DataObject':
265  pks.remove(e)
266  else:
267  print 'Extra Other thing found!', e, p[e][0]
268  return False
269 
270  # check 2 : same paths?
271  if sks == pks:
272  pass
273  else:
274  return False
275 
276  # check 3 : check the content
277  l = len(sks)
278  diffs = []
279  for i in xrange(l):
280  key = sks[i]
281  # compare class name
282  if s[key][0] == p[key][0]:
283  pass
284  else:
285  diffs.append(key)
286  # compare string representation
287  if s[key][1] == p[key][1]:
288  pass
289  elif checkForAddressDifference(p[key][1], s[key][1]):
290  pass
291  else:
292  diffs.append(key)
293 
294  # finish
295  if diffs:
296  return False
297  else:
298  return True
299 
300 
301 def CheckFileRecords(par, ser):
302 
303  print "Checking File Records"
304 
305  parFSR = GetFSRdicts(par)
306  serFSR = GetFSRdicts(ser)
307 
308  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
309  set(serFSR["TimeSpanFSR"].iteritems())
310  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
311  set(serFSR["EventCountFSR"].iteritems())
312 
313  print "\nDifferent entries in TimeSpanFSR: \t" + \
314  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
315 
316  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
317  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
318  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
319  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
320  print "Different entries in " + str(k) + ": \tkey: " + str(
321  len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
322 
323 
324 def LumiFSR(lumi):
325 
326  runs = []
327  files = []
328  info = {}
329  keys = []
330 
331  for r in lumi.runNumbers():
332  runs.append(r)
333 
334  for f in lumi.fileIDs():
335  files.append(f)
336  s = str(lumi)
337  sa = s.split("info (key/incr/integral) : ")[-1]
338  sa = sa.split('/')[:-1]
339 
340  key = []
341  incr = []
342  integral = []
343  for rec in sa:
344  k, i, t = rec.split()
345  key.append(int(k))
346  incr.append(int(i))
347  integral.append(int(t))
348 
349  return (runs, files, key, incr, integral)
350 
351 
352 def GetFSRdict(filename, queue):
353 
354  FSR = {"TimeSpanFSR": {'earliest': 0, 'latest': 0},
355  "LumiFSRBeamCrossing": {'key': 0, 'incr': 0, 'integral': 0},
356  "LumiFSRBeam1": {'key': 0, 'incr': 0, 'integral': 0},
357  "LumiFSRBeam2": {'key': 0, 'incr': 0, 'integral': 0},
358  "LumiFSRNoBeam": {'key': 0, 'incr': 0, 'integral': 0},
359  "EventCountFSR": {'input': 0, 'output': 0, 'statusFlag': 0}}
360 
361  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
362  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
363  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
364  2011, 2011)
365  exec options
366  app = AppMgr()
367  app.run(1)
368  fsr = app.filerecordsvc()
369 
370  lst = fsr.getHistoNames()
371 
372  if lst:
373  for l in lst:
374 
375  ob = fsr.retrieveObject(l)
376 
377  if "LumiFSR" in l:
378 
379  assert ob.numberOfObjects() == 1
380  k = ob.containedObject(0)
381  runs, files, keys, increment, integral = LumiFSR(k)
382 
383  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
384  FSR[l[l.rfind('/') + 1:]]['files'] = files
385  FSR[l[l.rfind('/') + 1:]]['key'] = keys
386  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
387  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
388 
389  if "TimeSpanFSR" in l:
390 
391  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
392  0).earliest()
393  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
394 
395  if "EventCountFSR" in l:
396 
397  FSR["EventCountFSR"]['input'] = ob.input()
398  FSR["EventCountFSR"]['output'] = ob.output()
399  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
400 
401  app.stop()
402  app.finalize()
403 
404  queue.put(FSR)
405 
406 
407 def CompareFSR(pout, sout):
408 
409  parFSR = pout.get()
410  serFSR = sout.get()
411 
412  print "Comparing File Records"
413 
414  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
415  set(serFSR["TimeSpanFSR"].iteritems())
416  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
417  set(serFSR["EventCountFSR"].iteritems())
418 
419  print "\nDifferent entries in TimeSpanFSR: \t" + \
420  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
421 
422  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
423  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
424  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
425  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
426  print "Different entries in " + str(k) + ": \tkey: " + str(
427  len(diff3)) + " increment: " + str(len(diff4)) + " integral: " + str(len(diff5))
428 
429  print "\nParallel: \n" + str(parFSR)
430  print "\nSerial: \n" + str(serFSR)
431 
432 
433 if __name__ == '__main__':
434 
435  args = sys.argv
436  args.pop(0) # get rid of script name
437  if len(args) != 2:
438  print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
439  sys.exit(0)
440  else:
441  par = 'PFN:' + args[0]
442  ser = 'PFN:' + args[1]
443  print 'Parallel File to be analysed : %s' % (par)
444  print 'Serial File to be analysed : %s' % (ser)
445 
446  pname = par[4:] # TFile doesn't need the "PFN:" prefix
447  sname = ser[4:]
448 
449  qacross = Queue()
450  pout = Queue()
451  sout = Queue()
452 
453  par = Process(target=Reader, args=(PAR, par, qacross, pout))
454  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
455  com = Process(target=ComparisonEngine, args=(pout, sout))
456 
457  #com.start() ; par.start() ; ser.start()
458  #ser.join() ; par.join() ; com.join()
459 
460  #CompareTrees( pname, sname )
461 
462  print "Check File Records"
463 
464  ser = sys.argv[0]
465  par = sys.argv[1]
466 
467  pout = Queue()
468  sout = Queue()
469 
470  sp = Process(target=GetFSRdict, args=(ser, sout))
471  pp = Process(target=GetFSRdict, args=(par, pout))
472  cp = Process(target=CompareFSR, args=(pout, sout))
473 
474  sp.start()
475  pp.start()
476  cp.start()
477  sp.join()
478  pp.join()
479  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
double sum(double x, double y, double z)
A small to stream Data I/O.
Definition: OutputStream.h:29
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53
def Reader(readerType, filename, qacross, qToEngine)