The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
compareOutputFiles.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from GaudiPython import AppMgr, gbl
3 from ROOT import TFile, TBufferFile, TBuffer
4 from multiprocessing import Process, Queue
5 from Configurables import LHCbApp
6 import sys
7 
8 #
9 # loadFile.py
10 # -----------
11 # Open a dst file for inspection
12 #
13 
14 
15 def checkKeys(name):
16  # Check the TTree keys in each file
17  fname = name[4:] # TFile doesn't need the "PFN:" prefix
18  tf = TFile(fname, 'REC')
19 
20 
21 importOptions('$STDOPTS/LHCbApplication.opts')
22 #importOptions( '$GAUDIPOOLDBROOT/options/GaudiPoolDbRoot.opts' )
23 importOptions('$GAUDICNVROOT/options/Setup.opts')
24 
25 OutputStream("DstWriter").Output = ''
26 HistogramPersistencySvc().OutputFile = ''
27 MessageSvc(OutputLevel=ERROR)
28 EventSelector().PrintFreq = 100
29 
30 ApplicationMgr(OutputLevel=ERROR, AppName='File Check - Serial vs Parallel')
31 # TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
32 
33 PAR = 'PARALLEL'
34 SER = 'SERIAL'
35 
36 
37 def CompareTrees(pname, sname):
38  pf = TFile(pname, 'REC')
39  sf = TFile(sname, 'REC')
40  event = '_Event'
41  pfks = pf.GetListOfKeys()
42  sfks = sf.GetListOfKeys()
43  pfkeys = list([pfk.GetName() for pfk in pfks])
44  pfkeys.sort()
45  sfkeys = list([sfk.GetName() for sfk in sfks])
46  sfkeys.sort()
47  pMeta = []
48  pEvent = []
49  pOther = []
50  for k in pfkeys:
51  if k.startswith(event):
52  pEvent.append(k)
53  elif k.startswith('##'):
54  pMeta.append(k)
55  else:
56  pOther.append(k)
57  sMeta = []
58  sEvent = []
59  sOther = []
60  for k in sfkeys:
61  if k.startswith(event):
62  sEvent.append(k)
63  elif k.startswith('##'):
64  sMeta.append(k)
65  else:
66  sOther.append(k)
67 
68  if pMeta == sMeta:
69  pass
70  else:
71  print 'Meta Data differs'
72 
73  if pEvent == sEvent:
74  pass
75  else:
76  print 'Event data differs'
77 
78  if pOther != sOther:
79  pset = set(pOther)
80  sset = set(sOther)
81  pExtra = pset - sset
82  sExtra = sset - pset
83  if pExtra:
84  print 'Extra Data in parallel file : ', pExtra
85  if sExtra:
86  print 'Extra Data in serial file : ', sExtra
87  if sExtra or pExtra:
88  print 'Files will have different sizes'
89  pf.Close()
90  sf.Close()
91 
92 
93 def switchDict(d):
94  # switch a dictionary around ; make the values the keys, and vice versa
95  # only works if all values are unique
96  nkeys = len(d.keys())
97  vals = d.values()
98  nvals = len(vals)
99  for v in vals:
100  if vals.count(v) > 1:
101  print 'Dictionary cannot be switched, values not unique'
102  return None
103  print 'Dict has keys/values : %i/%i' % (nkeys, nvals)
104  pairs = d.items() # returns (key, val) tuples in a list
105  newd = {}
106  for k, entry in pairs:
107  newd[entry] = k
108  return newd
109 
110 
111 def printDict(d, name='unspecified'):
112  # Print out a dictionary in the form
113  #
114  # Dictionary Name :
115  # key value
116  # key value
117  # ...
118  #
119  print '-' * 80
120  print 'Dictionary %s : ' % (name)
121  for k in iter(d.keys()):
122  print '\t', k, '\t', d[k]
123  print '-' * 80
124 
125 
126 def Reader(readerType, filename, qacross, qToEngine):
127  #
128  # Process for reading a file
129  # One process for reading Serial File, another for Parallel File
130  #
131  # First the order of events is determined, (parallel != serial, usually)
132  #
133  # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
134  # on both Serial-Reader and Parallel-Reader processes.
135  #
136  # The string repr of everything in the TES is placed in a dictionary and
137  # sent to the comparison Process, which compares the two dictionaries
138  #
139  a = AppMgr()
140  sel = a.evtsel()
141  evt = a.evtsvc()
142 
143  header = '/Event/Rec/Header'
144  sel.open(filename)
145  ct = 0
146  order = {}
147  fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
148 
149  # determine the ordering
150  while True:
151  a.run(1)
152  if evt[header]:
153  eNumber = int(evt[header].evtNumber())
154  order[eNumber] = ct
155  ct += 1
156  else:
157  break
158 
159  if readerType == SER:
160  # send the ordering details to the parallel-reader
161  order = switchDict(order)
162  qacross.put(order)
163  qacross.put(None)
164  # changeName
165  serOrder = order
166  elif readerType == PAR:
167  # receive the serial ordering from queue, and send ordering to SerialReader
168  for serOrder in iter(qacross.get, None):
169  pass
170  lsks = len(serOrder.keys())
171  lpks = len(order.keys())
172  print 'Events in Files (serial/parallel) : %i / %i' % (lsks, lpks)
173 
174  # now run files in the order specified by the serial ordering
175  # and send them one by one to the comparison engine
176  for i in iter(serOrder.keys()):
177  if readerType == PAR:
178  i = order[serOrder[i]]
179 
180  a.runSelectedEvents(fname, i)
181  lst = evt.getList()
182 
183  lst.sort()
184  ascii = dict(
185  [(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
186  qToEngine.put(ascii)
187  qToEngine.put(None)
188  print '%s Reader Finished' % (readerType)
189 
190 
191 def ComparisonEngine(pQueue, sQueue):
192  # The Comparison Engine runs on a seperate forked process and receives
193  # events in pairs, one each from Serial FileReader and Parallel FileReader
194  #
195  # The events arrive in Dictionary Format, d[path]=(className, string_repr)
196  # and are compared using the compareEvents method
197  #
198  # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
199  #
200  results = []
201  while True:
202  pitem = pQueue.get()
203  sitem = sQueue.get()
204  if pitem == sitem == None:
205  print 'Termination Signals received ok'
206  break
207  elif pitem == None:
208  print 'pitem != sitem : ', pitem, sitem
209  break
210  elif sitem == None:
211  print 'pitem != sitem : ', pitem, sitem
212  break
213  results.append(compareEvents(pitem, sitem))
214  print '=' * 80
215  print 'Comparison Engine Finished'
216  print '-' * 80
217  print 'Total Events Checked : %i' % (len(results))
218  print 'Perfect Matches : %i' % (sum(results))
219  print 'Errors : %i' % (len(results) - sum(results))
220  print '=' * 80
221 
222 
224  # the __repr__() method for Event Data Objects will return a generic
225  # string "DataObject at 0xADDRESS" for non-Pythonised objects
226  # If these objects have the same path, they are equal, but this
227  # cannot be tested with "==" in Python, as the memory address will
228  # be different for the two different DataObjects, so this method
229  # will check if the difference is in the address
230  #
231  # args : a, b two string representations
232  ref = 'DataObject at 0x'
233  if a[:16] == b[:16] == ref:
234  return True
235  else:
236  return False
237 
238 
239 def compareEvents(s, p):
240  # events in form of dictionary, with form
241  # d[ path ] = tuple( className, string_repr )
242 
243  # check 1 : number of keys (paths)
244  sks = s.keys()
245  pks = p.keys()
246  sks.sort()
247  pks.sort()
248  if len(sks) == len(pks):
249  pass
250  else:
251  # There may be extra keys in the parallel file
252  # example: DstWriter may ask for /Event/Prev/MC/Header#1
253  # but in TESSerializer, *all* DataObjects will be sent
254  # including /Event/Prev and /Event/Prev/MC
255 
256  # check for extra keys in the parallel file which are just containing DataObjects
257  # if found, remove them
258 
259  extras = list(set(pks) - set(sks))
260  for e in extras:
261  if p[e][0] == 'DataObject':
262  pks.remove(e)
263  else:
264  print 'Extra Other thing found!', e, p[e][0]
265  return False
266 
267  # check 2 : same paths?
268  if sks == pks:
269  pass
270  else:
271  return False
272 
273  # check 3 : check the content
274  l = len(sks)
275  diffs = []
276  for i in xrange(l):
277  key = sks[i]
278  # compare class name
279  if s[key][0] == p[key][0]:
280  pass
281  else:
282  diffs.append(key)
283  # compare string representation
284  if s[key][1] == p[key][1]:
285  pass
286  elif checkForAddressDifference(p[key][1], s[key][1]):
287  pass
288  else:
289  diffs.append(key)
290 
291  # finish
292  if diffs:
293  return False
294  else:
295  return True
296 
297 
298 def CheckFileRecords(par, ser):
299 
300  print "Checking File Records"
301 
302  parFSR = GetFSRdicts(par)
303  serFSR = GetFSRdicts(ser)
304 
305  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
306  set(serFSR["TimeSpanFSR"].iteritems())
307  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
308  set(serFSR["EventCountFSR"].iteritems())
309 
310  print "\nDifferent entries in TimeSpanFSR: \t" + \
311  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
312 
313  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
314  diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
315  diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
316  diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
317  print "Different entries in " + str(k) + ": \tkey: " + str(
318  len(diff3)) + " increment: " + str(
319  len(diff4)) + " integral: " + str(len(diff5))
320 
321 
322 def LumiFSR(lumi):
323 
324  runs = []
325  files = []
326  info = {}
327  keys = []
328 
329  for r in lumi.runNumbers():
330  runs.append(r)
331 
332  for f in lumi.fileIDs():
333  files.append(f)
334  s = str(lumi)
335  sa = s.split("info (key/incr/integral) : ")[-1]
336  sa = sa.split('/')[:-1]
337 
338  key = []
339  incr = []
340  integral = []
341  for rec in sa:
342  k, i, t = rec.split()
343  key.append(int(k))
344  incr.append(int(i))
345  integral.append(int(t))
346 
347  return (runs, files, key, incr, integral)
348 
349 
350 def GetFSRdict(filename, queue):
351 
352  FSR = {
353  "TimeSpanFSR": {
354  'earliest': 0,
355  'latest': 0
356  },
357  "LumiFSRBeamCrossing": {
358  'key': 0,
359  'incr': 0,
360  'integral': 0
361  },
362  "LumiFSRBeam1": {
363  'key': 0,
364  'incr': 0,
365  'integral': 0
366  },
367  "LumiFSRBeam2": {
368  'key': 0,
369  'incr': 0,
370  'integral': 0
371  },
372  "LumiFSRNoBeam": {
373  'key': 0,
374  'incr': 0,
375  'integral': 0
376  },
377  "EventCountFSR": {
378  'input': 0,
379  'output': 0,
380  'statusFlag': 0
381  }
382  }
383 
384  options = "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;" % filename
385  options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
386  options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (
387  2011, 2011)
388  exec options
389  app = AppMgr()
390  app.run(1)
391  fsr = app.filerecordsvc()
392 
393  lst = fsr.getHistoNames()
394 
395  if lst:
396  for l in lst:
397 
398  ob = fsr.retrieveObject(l)
399 
400  if "LumiFSR" in l:
401 
402  assert ob.numberOfObjects() == 1
403  k = ob.containedObject(0)
404  runs, files, keys, increment, integral = LumiFSR(k)
405 
406  FSR[l[l.rfind('/') + 1:]]['runs'] = runs
407  FSR[l[l.rfind('/') + 1:]]['files'] = files
408  FSR[l[l.rfind('/') + 1:]]['key'] = keys
409  FSR[l[l.rfind('/') + 1:]]['incr'] = increment
410  FSR[l[l.rfind('/') + 1:]]['integral'] = integral
411 
412  if "TimeSpanFSR" in l:
413 
414  FSR["TimeSpanFSR"]['earliest'] = ob.containedObject(
415  0).earliest()
416  FSR["TimeSpanFSR"]['latest'] = ob.containedObject(0).latest()
417 
418  if "EventCountFSR" in l:
419 
420  FSR["EventCountFSR"]['input'] = ob.input()
421  FSR["EventCountFSR"]['output'] = ob.output()
422  FSR["EventCountFSR"]['statusFlag'] = ob.statusFlag()
423 
424  app.stop()
425  app.finalize()
426 
427  queue.put(FSR)
428 
429 
430 def CompareFSR(pout, sout):
431 
432  parFSR = pout.get()
433  serFSR = sout.get()
434 
435  print "Comparing File Records"
436 
437  diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - \
438  set(serFSR["TimeSpanFSR"].iteritems())
439  diff2 = set(parFSR["EventCountFSR"].iteritems()) - \
440  set(serFSR["EventCountFSR"].iteritems())
441 
442  print "\nDifferent entries in TimeSpanFSR: \t" + \
443  str(len(diff1)) + "\nDifferent entries in EventCountFSR:\t" + str(len(diff2))
444 
445  for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
446  diff3 = set(parFSR[k]['key']) - set(serFSR[k]['key'])
447  diff4 = set(parFSR[k]['incr']) - set(serFSR[k]['incr'])
448  diff5 = set(parFSR[k]['integral']) - set(serFSR[k]["integral"])
449  print "Different entries in " + str(k) + ": \tkey: " + str(
450  len(diff3)) + " increment: " + str(
451  len(diff4)) + " integral: " + str(len(diff5))
452 
453  print "\nParallel: \n" + str(parFSR)
454  print "\nSerial: \n" + str(serFSR)
455 
456 
457 if __name__ == '__main__':
458 
459  args = sys.argv
460  args.pop(0) # get rid of script name
461  if len(args) != 2:
462  print 'Please supply two arguments : > python loadFile <parallelFile> <serialFile>'
463  sys.exit(0)
464  else:
465  par = 'PFN:' + args[0]
466  ser = 'PFN:' + args[1]
467  print 'Parallel File to be analysed : %s' % (par)
468  print 'Serial File to be analysed : %s' % (ser)
469 
470  pname = par[4:] # TFile doesn't need the "PFN:" prefix
471  sname = ser[4:]
472 
473  qacross = Queue()
474  pout = Queue()
475  sout = Queue()
476 
477  par = Process(target=Reader, args=(PAR, par, qacross, pout))
478  ser = Process(target=Reader, args=(SER, ser, qacross, sout))
479  com = Process(target=ComparisonEngine, args=(pout, sout))
480 
481  #com.start() ; par.start() ; ser.start()
482  #ser.join() ; par.join() ; com.join()
483 
484  #CompareTrees( pname, sname )
485 
486  print "Check File Records"
487 
488  ser = sys.argv[0]
489  par = sys.argv[1]
490 
491  pout = Queue()
492  sout = Queue()
493 
494  sp = Process(target=GetFSRdict, args=(ser, sout))
495  pp = Process(target=GetFSRdict, args=(par, pout))
496  cp = Process(target=CompareFSR, args=(pout, sout))
497 
498  sp.start()
499  pp.start()
500  cp.start()
501  sp.join()
502  pp.join()
503  cp.join()
def printDict(d, name='unspecified')
def CheckFileRecords(par, ser)
def CompareFSR(pout, sout)
def ComparisonEngine(pQueue, sQueue)
double sum(double x, double y, double z)
A small to stream Data I/O.
Definition: OutputStream.h:28
def CompareTrees(pname, sname)
HistogramPersistencySvc class implementation definition.
def checkForAddressDifference(a, b)
def GetFSRdict(filename, queue)
The Application Manager class.
Definition of class EventSelector.
Definition: EventSelector.h:53
def Reader(readerType, filename, qacross, qToEngine)