The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
compareOutputFiles.py
Go to the documentation of this file.
11
12# We do not lint job options
13# ruff: noqa
14
15
16import sys
17from multiprocessing import Process, Queue
18
19from Configurables import LHCbApp
20from Gaudi.Configuration import *
21from ROOT import TBuffer, TBufferFile, TFile
22
23from GaudiPython import AppMgr, gbl
24
25#
26# loadFile.py
27# -----------
28# Open a dst file for inspection
29#
30
31
32def checkKeys(name):
33 # Check the TTree keys in each file
34 fname = name[4:] # TFile doesn't need the "PFN:" prefix
35 tf = TFile(fname, "REC")
36
37
38importOptions("$STDOPTS/LHCbApplication.opts")
39importOptions("$ENV_PROJECT_SOURCE_DIR/RootCnv/options/Setup.opts")
40
41OutputStream("DstWriter").Output = ""
42HistogramPersistencySvc().OutputFile = ""
43MessageSvc(OutputLevel=ERROR)
44EventSelector().PrintFreq = 100
45
46ApplicationMgr(OutputLevel=ERROR, AppName="File Check - Serial vs Parallel")
47# TopAlg = ['UnpackMCParticle', 'UnpackMCVertex'] )
48
49PAR = "PARALLEL"
50SER = "SERIAL"
51
52
53def CompareTrees(pname, sname):
54 pf = TFile(pname, "REC")
55 sf = TFile(sname, "REC")
56 event = "_Event"
57 pfks = pf.GetListOfKeys()
58 sfks = sf.GetListOfKeys()
59 pfkeys = list([pfk.GetName() for pfk in pfks])
60 pfkeys.sort()
61 sfkeys = list([sfk.GetName() for sfk in sfks])
62 sfkeys.sort()
63 pMeta = []
64 pEvent = []
65 pOther = []
66 for k in pfkeys:
67 if k.startswith(event):
68 pEvent.append(k)
69 elif k.startswith("##"):
70 pMeta.append(k)
71 else:
72 pOther.append(k)
73 sMeta = []
74 sEvent = []
75 sOther = []
76 for k in sfkeys:
77 if k.startswith(event):
78 sEvent.append(k)
79 elif k.startswith("##"):
80 sMeta.append(k)
81 else:
82 sOther.append(k)
83
84 if pMeta == sMeta:
85 pass
86 else:
87 print("Meta Data differs")
88
89 if pEvent == sEvent:
90 pass
91 else:
92 print("Event data differs")
93
94 if pOther != sOther:
95 pset = set(pOther)
96 sset = set(sOther)
97 pExtra = pset - sset
98 sExtra = sset - pset
99 if pExtra:
100 print("Extra Data in parallel file : ", pExtra)
101 if sExtra:
102 print("Extra Data in serial file : ", sExtra)
103 if sExtra or pExtra:
104 print("Files will have different sizes")
105 pf.Close()
106 sf.Close()
107
108
110 # switch a dictionary around ; make the values the keys, and vice versa
111 # only works if all values are unique
112 nkeys = len(d.keys())
113 vals = d.values()
114 nvals = len(vals)
115 for v in vals:
116 if vals.count(v) > 1:
117 print("Dictionary cannot be switched, values not unique")
118 return None
119 print("Dict has keys/values : %i/%i" % (nkeys, nvals))
120 pairs = d.items() # returns (key, val) tuples in a list
121 newd = {}
122 for k, entry in pairs:
123 newd[entry] = k
124 return newd
125
126
127def printDict(d, name="unspecified"):
128 # Print out a dictionary in the form
129 #
130 # Dictionary Name :
131 # key value
132 # key value
133 # ...
134 #
135 print("-" * 80)
136 print("Dictionary %s : " % (name))
137 for k in iter(d.keys()):
138 print("\t", k, "\t", d[k])
139 print("-" * 80)
140
141
142def Reader(readerType, filename, qacross, qToEngine):
143 #
144 # Process for reading a file
145 # One process for reading Serial File, another for Parallel File
146 #
147 # First the order of events is determined, (parallel != serial, usually)
148 #
149 # Then the events are run *in order* using AppMgr().runSelectedEvents(pfn, evtNumber)
150 # on both Serial-Reader and Parallel-Reader processes.
151 #
152 # The string repr of everything in the TES is placed in a dictionary and
153 # sent to the comparison Process, which compares the two dictionaries
154 #
155 a = AppMgr()
156 sel = a.evtsel()
157 evt = a.evtsvc()
158
159 header = "/Event/Rec/Header"
160 sel.open(filename)
161 ct = 0
162 order = {}
163 fname = filename[4:] # runSelectedEvents doesn't need the "PFN:" prefix
164
165 # determine the ordering
166 while True:
167 a.run(1)
168 if evt[header]:
169 eNumber = int(evt[header].evtNumber())
170 order[eNumber] = ct
171 ct += 1
172 else:
173 break
174
175 if readerType == SER:
176 # send the ordering details to the parallel-reader
177 order = switchDict(order)
178 qacross.put(order)
179 qacross.put(None)
180 # changeName
181 serOrder = order
182 elif readerType == PAR:
183 # receive the serial ordering from queue, and send ordering to SerialReader
184 for serOrder in iter(qacross.get, None):
185 pass
186 lsks = len(serOrder.keys())
187 lpks = len(order.keys())
188 print("Events in Files (serial/parallel) : %i / %i" % (lsks, lpks))
189
190 # now run files in the order specified by the serial ordering
191 # and send them one by one to the comparison engine
192 for i in iter(serOrder.keys()):
193 if readerType == PAR:
194 i = order[serOrder[i]]
195
196 a.runSelectedEvents(fname, i)
197 lst = evt.getList()
198
199 lst.sort()
200 ascii = dict([(l, (evt[l].__class__.__name__, evt[l].__repr__())) for l in lst])
201 qToEngine.put(ascii)
202 qToEngine.put(None)
203 print("%s Reader Finished" % (readerType))
204
205
206def ComparisonEngine(pQueue, sQueue):
207 # The Comparison Engine runs on a seperate forked process and receives
208 # events in pairs, one each from Serial FileReader and Parallel FileReader
209 #
210 # The events arrive in Dictionary Format, d[path]=(className, string_repr)
211 # and are compared using the compareEvents method
212 #
213 # Results are stored in an array of bools (PerfectMatch=True, Diff=False)
214 #
215 results = []
216 while True:
217 pitem = pQueue.get()
218 sitem = sQueue.get()
219 if pitem == sitem == None:
220 print("Termination Signals received ok")
221 break
222 elif pitem == None:
223 print("pitem != sitem : ", pitem, sitem)
224 break
225 elif sitem == None:
226 print("pitem != sitem : ", pitem, sitem)
227 break
228 results.append(compareEvents(pitem, sitem))
229 print("=" * 80)
230 print("Comparison Engine Finished")
231 print("-" * 80)
232 print("Total Events Checked : %i" % (len(results)))
233 print("Perfect Matches : %i" % (sum(results)))
234 print("Errors : %i" % (len(results) - sum(results)))
235 print("=" * 80)
236
237
239 # the __repr__() method for Event Data Objects will return a generic
240 # string "DataObject at 0xADDRESS" for non-Pythonised objects
241 # If these objects have the same path, they are equal, but this
242 # cannot be tested with "==" in Python, as the memory address will
243 # be different for the two different DataObjects, so this method
244 # will check if the difference is in the address
245 #
246 # args : a, b two string representations
247 ref = "DataObject at 0x"
248 if a[:16] == b[:16] == ref:
249 return True
250 else:
251 return False
252
253
255 # events in form of dictionary, with form
256 # d[ path ] = tuple( className, string_repr )
257
258 # check 1 : number of keys (paths)
259 sks = s.keys()
260 pks = p.keys()
261 sks.sort()
262 pks.sort()
263 if len(sks) == len(pks):
264 pass
265 else:
266 # There may be extra keys in the parallel file
267 # example: DstWriter may ask for /Event/Prev/MC/Header#1
268 # but in TESSerializer, *all* DataObjects will be sent
269 # including /Event/Prev and /Event/Prev/MC
270
271 # check for extra keys in the parallel file which are just containing DataObjects
272 # if found, remove them
273
274 extras = list(set(pks) - set(sks))
275 for e in extras:
276 if p[e][0] == "DataObject":
277 pks.remove(e)
278 else:
279 print("Extra Other thing found!", e, p[e][0])
280 return False
281
282 # check 2 : same paths?
283 if sks == pks:
284 pass
285 else:
286 return False
287
288 # check 3 : check the content
289 l = len(sks)
290 diffs = []
291 for i in range(l):
292 key = sks[i]
293 # compare class name
294 if s[key][0] == p[key][0]:
295 pass
296 else:
297 diffs.append(key)
298 # compare string representation
299 if s[key][1] == p[key][1]:
300 pass
301 elif checkForAddressDifference(p[key][1], s[key][1]):
302 pass
303 else:
304 diffs.append(key)
305
306 # finish
307 if diffs:
308 return False
309 else:
310 return True
311
312
313def CheckFileRecords(par, ser):
314 print("Checking File Records")
315
316 parFSR = GetFSRdicts(par)
317 serFSR = GetFSRdicts(ser)
318
319 diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
320 serFSR["TimeSpanFSR"].iteritems()
321 )
322 diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
323 serFSR["EventCountFSR"].iteritems()
324 )
325
326 print(
327 "\nDifferent entries in TimeSpanFSR: \t"
328 + str(len(diff1))
329 + "\nDifferent entries in EventCountFSR:\t"
330 + str(len(diff2))
331 )
332
333 for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
334 diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
335 diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
336 diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
337 print(
338 "Different entries in "
339 + str(k)
340 + ": \tkey: "
341 + str(len(diff3))
342 + " increment: "
343 + str(len(diff4))
344 + " integral: "
345 + str(len(diff5))
346 )
347
348
349def LumiFSR(lumi):
350 runs = []
351 files = []
352 info = {}
353 keys = []
354
355 for r in lumi.runNumbers():
356 runs.append(r)
357
358 for f in lumi.fileIDs():
359 files.append(f)
360 s = str(lumi)
361 sa = s.split("info (key/incr/integral) : ")[-1]
362 sa = sa.split("/")[:-1]
363
364 key = []
365 incr = []
366 integral = []
367 for rec in sa:
368 k, i, t = rec.split()
369 key.append(int(k))
370 incr.append(int(i))
371 integral.append(int(t))
372
373 return (runs, files, key, incr, integral)
374
375
376def GetFSRdict(filename, queue):
377 FSR = {
378 "TimeSpanFSR": {"earliest": 0, "latest": 0},
379 "LumiFSRBeamCrossing": {"key": 0, "incr": 0, "integral": 0},
380 "LumiFSRBeam1": {"key": 0, "incr": 0, "integral": 0},
381 "LumiFSRBeam2": {"key": 0, "incr": 0, "integral": 0},
382 "LumiFSRNoBeam": {"key": 0, "incr": 0, "integral": 0},
383 "EventCountFSR": {"input": 0, "output": 0, "statusFlag": 0},
384 }
385
386 options = (
387 "from LumiAlgs.LumiFsrReaderConf import LumiFsrReaderConf as LumiFsrReader; LumiFsrReader().OutputLevel = INFO; LumiFsrReader().inputFiles = ['%s'] ;"
388 % filename
389 )
390 options += "LumiFsrReader().Persistency='ROOT'; LumiFsrReader().EvtMax = 1; from Configurables import LHCbApp; LHCbApp().Persistency='ROOT'; from Configurables import CondDB, DDDBConf;"
391 options += " CondDB().UseLatestTags=['%s']; DDDBConf(DataType='%s');" % (2011, 2011)
392 exec(options)
393 app = AppMgr()
394 app.run(1)
395 fsr = app.filerecordsvc()
396
397 lst = fsr.getHistoNames()
398
399 if lst:
400 for l in lst:
401 ob = fsr.retrieveObject(l)
402
403 if "LumiFSR" in l:
404 assert ob.numberOfObjects() == 1
405 k = ob.containedObject(0)
406 runs, files, keys, increment, integral = LumiFSR(k)
407
408 FSR[l[l.rfind("/") + 1 :]]["runs"] = runs
409 FSR[l[l.rfind("/") + 1 :]]["files"] = files
410 FSR[l[l.rfind("/") + 1 :]]["key"] = keys
411 FSR[l[l.rfind("/") + 1 :]]["incr"] = increment
412 FSR[l[l.rfind("/") + 1 :]]["integral"] = integral
413
414 if "TimeSpanFSR" in l:
415 FSR["TimeSpanFSR"]["earliest"] = ob.containedObject(0).earliest()
416 FSR["TimeSpanFSR"]["latest"] = ob.containedObject(0).latest()
417
418 if "EventCountFSR" in l:
419 FSR["EventCountFSR"]["input"] = ob.input()
420 FSR["EventCountFSR"]["output"] = ob.output()
421 FSR["EventCountFSR"]["statusFlag"] = ob.statusFlag()
422
423 app.stop()
424 app.finalize()
425
426 queue.put(FSR)
427
428
429def CompareFSR(pout, sout):
430 parFSR = pout.get()
431 serFSR = sout.get()
432
433 print("Comparing File Records")
434
435 diff1 = set(parFSR["TimeSpanFSR"].iteritems()) - set(
436 serFSR["TimeSpanFSR"].iteritems()
437 )
438 diff2 = set(parFSR["EventCountFSR"].iteritems()) - set(
439 serFSR["EventCountFSR"].iteritems()
440 )
441
442 print(
443 "\nDifferent entries in TimeSpanFSR: \t"
444 + str(len(diff1))
445 + "\nDifferent entries in EventCountFSR:\t"
446 + str(len(diff2))
447 )
448
449 for k in ["LumiFSRBeamCrossing", "LumiFSRBeam2", "LumiFSRNoBeam"]:
450 diff3 = set(parFSR[k]["key"]) - set(serFSR[k]["key"])
451 diff4 = set(parFSR[k]["incr"]) - set(serFSR[k]["incr"])
452 diff5 = set(parFSR[k]["integral"]) - set(serFSR[k]["integral"])
453 print(
454 "Different entries in "
455 + str(k)
456 + ": \tkey: "
457 + str(len(diff3))
458 + " increment: "
459 + str(len(diff4))
460 + " integral: "
461 + str(len(diff5))
462 )
463
464 print("\nParallel: \n" + str(parFSR))
465 print("\nSerial: \n" + str(serFSR))
466
467
468if __name__ == "__main__":
469 args = sys.argv
470 args.pop(0) # get rid of script name
471 if len(args) != 2:
472 print(
473 "Please supply two arguments : > python loadFile <parallelFile> <serialFile>"
474 )
475 sys.exit(0)
476 else:
477 par = "PFN:" + args[0]
478 ser = "PFN:" + args[1]
479 print("Parallel File to be analysed : %s" % (par))
480 print("Serial File to be analysed : %s" % (ser))
481
482 pname = par[4:] # TFile doesn't need the "PFN:" prefix
483 sname = ser[4:]
484
485 qacross = Queue()
486 pout = Queue()
487 sout = Queue()
488
489 par = Process(target=Reader, args=(PAR, par, qacross, pout))
490 ser = Process(target=Reader, args=(SER, ser, qacross, sout))
491 com = Process(target=ComparisonEngine, args=(pout, sout))
492
493 # com.start() ; par.start() ; ser.start()
494 # ser.join() ; par.join() ; com.join()
495
496 # CompareTrees( pname, sname )
497
498 print("Check File Records")
499
500 ser = sys.argv[0]
501 par = sys.argv[1]
502
503 pout = Queue()
504 sout = Queue()
505
506 sp = Process(target=GetFSRdict, args=(ser, sout))
507 pp = Process(target=GetFSRdict, args=(par, pout))
508 cp = Process(target=CompareFSR, args=(pout, sout))
509
510 sp.start()
511 pp.start()
512 cp.start()
513 sp.join()
514 pp.join()
515 cp.join()
The Application Manager class.
Definition of class EventSelector.
HistogramPersistencySvc class implementation definition.
A small to stream Data I/O.
GetFSRdict(filename, queue)
printDict(d, name="unspecified")
ComparisonEngine(pQueue, sQueue)
Reader(readerType, filename, qacross, qToEngine)
CompareTrees(pname, sname)