The Gaudi Framework  master (b9786168)
Loading...
Searching...
No Matches
pTools.py
Go to the documentation of this file.
11import pickle
12import time
13from multiprocessing import Event
14
15from GaudiPython import FAILURE, SUCCESS, gbl
16
17# Eoin Smith
18# 3 Aug 2010
19
20#
21# This script contains the ancillary classes and functions used in the
22# GaudiPython Parallel model
23#
24# Classes :
25# - HistoAgent : In charge of extracting Histograms from their Transient Store
26# on a reader/worker, communicating them to the writer, and on
27# the writer receiving them and rebuilding a single store
28#
29# - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
30#
31# - LumiFSR : FSR data from different workers needs to be carefully merged to
32# replicate the serial version; this class aids in that task by
33# representing an LHCb LumiFSR object as a python class
34#
35# - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
36# differences in serial and parallel output
37#
38# - Syncer : This class is responsible for syncing processes for a specified
39# section of execution. For example, one Syncer object might be
40# syncing Initialisation, one for Running, one for Finalisation.
41# Syncer uses multiprocessing.Event() objects as flags which are
42# visible across the N processes sharing them.
43# IMPORTANT : The Syncer objects in the GaudiPython Parallel model
44# ensure that there is no hanging; they in effect, allow a timeout
45# for Initialisation, Run, Finalise on all processes
46#
47# - SyncMini : A class wrapper for a multiprocessing.Event() object
48#
49# Methods :
50# - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
51# ( AppMgr().evtsvc() ) to this to get the current
52# Event Number as an integer (even from RawEvents!)
53#
54
55# used to convert stored histos (in AIDA format) to ROOT format
56aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57
58# =========================== Classes =========================================
59
60
62 def __init__(self, gmpComponent):
63 self._gmpc = gmpComponent
64 self.hvt = self._gmpc.hvt
65 self.histos = []
66 self.qin = self._gmpc.hq
67 self.log = self._gmpc.log
68
69 # There are many methods for booking Histogram Objects to Histo store
70 # here they are collected in a dictionary, with key = a relevant name
71 self.bookingDict = {}
72 self.bookingDict["DataObject"] = self.bookDataObject
73 self.bookingDict["NTuple::Directory"] = self.bookDataObject
74 self.bookingDict["NTuple::File"] = self.bookDataObject
75 self.bookingDict["TH1D"] = self.bookTH1D
76 self.bookingDict["TH2D"] = self.bookTH2D
77 self.bookingDict["TH3D"] = self.bookTH3D
78 self.bookingDict["TProfile"] = self.bookTProfile
79 self.bookingDict["TProfile2D"] = self.bookTProfile2D
80
81 def register(self, tup):
82 # add a tuple of (worker-id, histoDict) to self.histos
83 assert tup.__class__.__name__ == "tuple"
84 self.histos.append(tup)
85
86 def Receive(self):
87 hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
88 while True:
89 tup = self.qin.get()
90 if tup == "HISTOS_SENT":
91 self.log.debug("received HISTOS_SENT message")
92 hstatus -= 1
93 if not hstatus:
94 break
95 else:
96 self.register(tup)
97 self._gmpc.sEvent.set()
98 self.log.info("Writer received all histo bundles and set sync event")
99 return SUCCESS
100
102 """
103 Rebuild the Histogram Store from the histos received by Receive()
104 If we have a histo which is not in the store,
105 book and fill it according to self.bookingDict
106 If we have a histo with a matching histo in the store,
107 add the two histos, remembering that aida2root must be used on
108 the Stored histo for compatibility.
109 """
110 errors = 0
111 for tup in self.histos:
112 workerID, histDict = tup
113 added = 0
114 booked = 0
115
116 for n in histDict.keys():
117 o = histDict[n]
118 obj = self.hvt.retrieve(n)
119
120 if obj:
121 try:
122 aida2root(obj).Add(o)
123 except Exception:
124 self.log.warning("FAILED TO ADD : %s" % (str(obj)))
125 errors += 1
126 added += 1
127 else:
128 if o.__class__.__name__ in self.bookingDict.keys():
129 try:
130 self.bookingDict[o.__class__.__name__](n, o)
131 except Exception:
132 self.log.warning(
133 "FAILED TO REGISTER : %s\tto%s"
134 % (o.__class__.__name__, n)
135 )
136 errors += 1
137 else:
138 self.log.warning(
139 "No booking method for: %s\t%s\t%s"
140 % (n, type(o), o.__class__.__name__)
141 )
142 errors += 1
143 booked += 1
144 hs = self.hvt.getHistoNames()
145 self.log.info("Histo Store Rebuilt : ")
146 self.log.info(" Contains %i objects." % (len(hs)))
147 self.log.info(" Errors in Rebuilding : %i" % (errors))
148 return SUCCESS
149
150 def bookDataObject(self, n, o):
151 """
152 Register a DataObject to the Histo Store
153 """
154 self._gmpc.hvt.registerObject(n, o)
155
156 def bookTH1D(self, n, o):
157 """
158 Register a ROOT 1D THisto to the Histo Store
159 """
160 obj = self.hvt._ihs.book(
161 n,
162 o.GetTitle(),
163 o.GetXaxis().GetNbins(),
164 o.GetXaxis().GetXmin(),
165 o.GetXaxis().GetXmax(),
166 )
167 aida2root(obj).Add(o)
168
169 def bookTH2D(self, n, o):
170 """
171 Register a ROOT 2D THisto to the Histo Store
172 """
173 obj = self.hvt._ihs.book(
174 n,
175 o.GetTitle(),
176 o.GetXaxis().GetNbins(),
177 o.GetXaxis().GetXmin(),
178 o.GetXaxis().GetXmax(),
179 o.GetYaxis().GetNbins(),
180 o.GetYaxis().GetXmin(),
181 o.GetYaxis().GetXmax(),
182 )
183 aida2root(obj).Add(o)
184
185 def bookTH3D(self, n, o):
186 """
187 Register a ROOT 3D THisto to the Histo Store
188 """
189 obj = self.hvt._ihs.book(
190 n,
191 o.GetTitle(),
192 o.GetXaxis().GetXbins(),
193 o.GetXaxis().GetXmin(),
194 o.GetXaxis().GetXmax(),
195 o.GetYaxis().GetXbins(),
196 o.GetYaxis().GetXmin(),
197 o.GetYaxis().GetXmax(),
198 o.GetZaxis().GetXbins(),
199 o.GetZaxis().GetXmin(),
200 o.GetZaxis().GetXmax(),
201 )
202 aida2root(obj).Add(o)
203
204 def bookTProfile(self, n, o):
205 """
206 Register a ROOT TProfile to the Histo Store
207 """
208 obj = self.hvt._ihs.bookProf(
209 n,
210 o.GetTitle(),
211 o.GetXaxis().GetNbins(),
212 o.GetXaxis().GetXmin(),
213 o.GetXaxis().GetXmax(),
214 o.GetOption(),
215 )
216 aida2root(obj).Add(o)
217
218 def bookTProfile2D(self, n, o):
219 """
220 Register a ROOT TProfile2D to the Histo Store
221 """
222 obj = self.hvt._ihs.bookProf(
223 n,
224 o.GetTitle(),
225 o.GetXaxis().GetNbins(),
226 o.GetXaxis().GetXmin(),
227 o.GetXaxis().GetXmax(),
228 o.GetYaxis().GetNbins(),
229 o.GetYaxis().GetXmin(),
230 o.GetYaxis().GetXmax(),
231 )
232 aida2root(obj).Add(o)
233
234
235# =============================================================================
236
237
239 def __init__(self, gmpComponent):
240 self._gmpc = gmpComponent
241 self.fsr = self._gmpc.fsr
242 self.q = self._gmpc.fq
243 self.log = self._gmpc.log
244 self.objectsIn = [] # used for collecting FSR store objects
245 self.objectsOut = []
246
247 def localCmp(self, tupA, tupB):
248 # sort tuples by a particular element
249 # for the sort() method
250 ind = 0
251 valA = tupA[ind]
252 valB = tupB[ind]
253 if valA < valB:
254 return -1
255 elif valA > valB:
256 return 1
257 else:
258 return 0
259
261 # send the FileRecords data as part of finalisation
262
263 # Take Care of FileRecords!
264 # There are two main things to consider here
265 # 1) The DataObjects in the FileRecords Transient Store
266 # 2) The fact that they are Keyed Containers, containing other objects
267 #
268 # The Lead Worker, nodeID=0, sends everything in the FSR store, as
269 # a completeness guarantee,
270 #
271 # send in form ( nodeID, path, object)
272 self.log.info("Sending FileRecords...")
273 lst = self.fsr.getHistoNames()
274
275 # Check Validity
276 if not lst:
277 self.log.info("No FileRecords Data to send to Writer.")
278 self.q.put("END_FSR")
279 return SUCCESS
280
281 # no need to send the root node
282 if "/FileRecords" in lst:
283 lst.remove("/FileRecords")
284
285 for l in lst:
286 o = self.fsr.retrieveObject(l)
287 if hasattr(o, "configureDirectAccess"):
288 o.configureDirectAccess()
289 # lead worker sends everything, as completeness guarantee
290 if self._gmpc.nodeID == 0:
291 self.objectsOut.append((0, l, pickle.dumps(o)))
292 else:
293 # only add the Event Counter
294 # and non-Empty Keyed Containers (ignore empty ones)
295 if l == "/FileRecords/EventCountFSR":
296 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
297 self.objectsOut.append(tup)
298 elif "KeyedContainer" in o.__class__.__name__:
299 # It's a Keyed Container
300 nObjects = o.numberOfObjects()
301 if nObjects:
302 self.log.debug(
303 "Keyed Container %s with %i objects" % (l, nObjects)
304 )
305 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
306 self.objectsOut.append(tup)
307 else:
308 self.log.info("Ignoring %s in FSR" % o.__class__.__name__)
309
310 self.log.debug("Done with FSR store, just to send to Writer.")
311
312 if self.objectsOut:
313 self.log.debug("%i FSR objects to Writer" % (len(self.objectsOut)))
314 for ob in self.objectsOut:
315 self.log.debug("\t%s" % (ob[0]))
316 self.q.put(self.objectsOut)
317 else:
318 self.log.info("Valid FSR Store, but no data to send to Writer")
319 self.log.info("SendFSR complete")
320 self.q.put("END_FSR")
321 return SUCCESS
322
323 def Receive(self):
324 # Receive contents of all Workers FileRecords Transient Stores
325 self.log.info("Receiving FSR store data...")
326 nc = self._gmpc.nWorkers
327 while nc > 0:
328 objects = self.q.get()
329 if objects == "END_FSR":
330 nc -= 1
331 continue
332 if nc == 0:
333 break
334 # but if it's regular objects...
335 for o in objects:
336 self.objectsIn.append(o)
337 # Now sort it by which worker it came from
338 # an object is : (nodeID, path, pickledObject)
339 self.objectsIn.sort(cmp=self.localCmp)
340 self.log.info("All FSR data received")
341 return SUCCESS
342
343 def Rebuild(self):
344 # objects is a list of (path, serializedObject) tuples
345 for sourceNode, path, serialob in self.objectsIn:
346 self.log.debug("Working with %s" % (path))
347 ob = pickle.loads(serialob)
348 if hasattr(ob, "update"):
349 ob.update()
350 if hasattr(ob, "numberOfObjects"):
351 nCont = ob.numberOfObjects()
352 self.log.debug(
353 "\t %s has containedObjects : %i" % (type(ob).__name__, nCont)
354 )
355 if sourceNode == 0:
356 self.log.debug("Registering Object to : %s" % (path))
357 self.fsr.registerObject(path, ob)
358 else:
359 self.log.debug("Merging Object to : %s" % (path))
360 self.MergeFSRobject(sourceNode, path, ob)
361 # As RecordStream has been split into Worker and Writer parts, the
362 # count for output is wrong... fix that here, as every event received
363 # by the writer is written (validation testing occurs on the worker)
364
365 self.log.info("FSR Store Rebuilt. Correcting EventCountFSR")
366 if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
367 ecount = "/FileRecords/EventCountFSR"
368 if self.fsr[ecount]:
369 self.fsr[ecount].setOutput(self._gmpc.nIn)
370 self.log.info(
371 "Event Counter Output set : %s : %i"
372 % (ecount, self.fsr[ecount].output())
373 )
374 # Do some reporting
375 self.log.debug("FSR store reconstructed!")
376 lst = self.fsr.getHistoNames()
377 if lst:
378 for l in lst:
379 ob = self.fsr.retrieveObject(l)
380 if hasattr(ob, "configureDirectAccess"):
381 ob.configureDirectAccess()
382 if hasattr(ob, "containedObjects"):
383 # if ob.numberOfObjects() :
384 self.log.debug(
385 "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
386 )
387 else:
388 self.log.debug("\t%s" % (l))
389 self.log.info("FSR Store fully rebuilt.")
390 return SUCCESS
391
392 def MergeFSRobject(self, sourceNode, path, ob):
393 # Merge Non-Empty Keyed Container from Worker>0
394 if path == "/FileRecords/TimeSpanFSR":
395 # TimeSpanFSR is a straightforward case
396 self.ProcessTimeSpanFSR(path, ob)
397 elif path == "/FileRecords/EventCountFSR":
398 # Event Counter is also easy
399 self.ProcessEventCountFSR(path, ob)
400 # now other cases may not be so easy...
401 elif (
402 "KeyedContainer" in ob.__class__.__name__
403 and "LumiFSR" in ob.__class__.__name__
404 ):
405 # Keyed Container of LumiFSRs : extract and re-register
406 self.MergeLumiFSR(path, ob)
407 else:
408 self.log.info("Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
409
410 def ProcessTimeSpanFSR(self, path, ob):
411 ob2 = self.fsr.retrieveObject(path)
412 if ob.containedObjects().size():
413 sz = ob.containedObjects().size()
414 cob = ob2.containedObjects()[0]
415 min = cob.earliest()
416 max = cob.latest()
417 for j in range(sz):
418 cob = ob.containedObjects()[j]
419 self.log.debug("Adding TimeSpanFSR")
420 if cob.earliest() < min:
421 min = cob.earliest()
422 if cob.latest() > max:
423 max = cob.latest()
424 # this is annoying: it has to be rebuilt, without a key & added
425 continue
426 tsfsr = gbl.LHCb.TimeSpanFSR()
427 tsfsr.setEarliest(min)
428 tsfsr.setLatest(max)
429 self.fsr[path].clear()
430 self.fsr[path].add(tsfsr)
431
432 def ProcessEventCountFSR(self, path, ob):
433 self.log.debug("Event Count Input Addition")
434 self.fsr[path].setInput(self.fsr[path].input() + ob.input())
435
436 def MergeLumiFSR(self, path, keyedC):
437 from ROOT import string
438
439 # Fetch the first lumi
440 keyedContainer = self.fsr.retrieveObject(path)
441 # The LumiFSR KeyedContainers only have one object
442 assert keyedContainer.numberOfObjects() == 1
443 l = keyedContainer.containedObject(0)
444 baseLumi = LumiFSR(l)
445 # Now deal with the argument Non-empty Keyed Container of LumiFSRs
446 nCont = keyedC.numberOfObjects()
447 for i in range(nCont):
448 obj = keyedC.containedObject(i)
449 nextLumi = LumiFSR(obj)
450 baseLumi.merge(nextLumi)
451 # Now Rebuild and ReRegister
452 newLumi = gbl.LHCb.LumiFSR()
453 for r in baseLumi.runs:
454 newLumi.addRunNumber(r)
455 for f in baseLumi.files:
456 newLumi.addFileID(string(f))
457 for k in baseLumi.keys:
458 increment, integral = baseLumi.info[k]
459 newLumi.addInfo(k, increment, integral)
460 # clear existing Keyed Container
461 self.fsr[path].clear()
462 # Add newly merged lumiFSR
463 self.fsr[path].add(newLumi)
464 return SUCCESS
465
466
467# =============================================================================
468
469
471 def __init__(self, lumi):
472 # lumi looks like :
473 # { runs : 69857 69858
474 # files : root:/castor/cer.../069857_0000000006.raw
475 # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
476
477 # class variables
478 self.runs = []
479 self.files = []
480 self.info = {}
481 self.keys = []
482
483 # get run numbers
484 for r in lumi.runNumbers():
485 self.runs.append(r)
486 # get file ids
487 for f in lumi.fileIDs():
488 self.files.append(f)
489 # Now the tricky bit, the info is not accessible via Python
490 # except as a string
491 s = str(lumi)
492 sa = s.split("info (key/incr/integral) : ")[-1]
493 sa = sa.split("/")[:-1]
494 for rec in sa:
495 k, i, t = rec.split()
496 k = int(k)
497 i = int(i)
498 t = int(t)
499 self.info[k] = (i, t)
500 self.keys = self.info.keys()
501
502 def merge(self, otherLumi):
503 assert otherLumi.__class__.__name__ == "LumiFSR"
504 # add any extra runs
505 for r in otherLumi.runs:
506 if r in self.runs:
507 pass
508 else:
509 self.runs.append(r)
510 self.runs.sort()
511 # add any extra fileIDs
512 for f in otherLumi.files:
513 if f in self.files:
514 pass
515 else:
516 self.files.append(f)
517 self.files.sort()
518 # Now add any extra records
519 for k in otherLumi.keys:
520 increment, integral = otherLumi.info[k]
521 if k in self.keys:
522 myIncrement, myIntegral = self.info[k]
523 self.info[k] = (myIncrement + increment, myIntegral + integral)
524 else:
525 self.info[k] = (increment, integral)
526 # don't forget to update keys
527 self.keys = self.info.keys()
528
529 def __repr__(self):
530 s = "LumiFSR Python class\n"
531 s += "\tRuns : \n"
532 for r in self.runs:
533 s += "\t\t%i\n" % (r)
534 s += "\tFiles : \n"
535 for f in self.files:
536 s += "\t\t%s\n" % (f)
537 s += "\tInfo : \n"
538 for k in self.keys:
539 increment, integral = self.info[k]
540 s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
541 return s
542
543
544# =============================================================================
545
546
548 def __init__(self, o):
549 cl = "LHCb::PackedCaloHypo"
550 assert o.__class__.__name__ == cl
551 self.centX = o.centX
552 self.centY = o.centY
553 self.cerr = (o.cerr00, o.cerr10, o.cerr11)
554 self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
555 self.firstCluster = o.firstCluster
556 self.firstDigit = o.firstDigit
557 self.firstHypo = o.firstHypo
558 self.hypothesis = o.hypothesis
559 self.key = o.key
560 self.lastCluster = o.lastCluster
561 self.lastDigit = o.lastDigit
562 self.lastHypo = o.lastHypo
563 self.lh = o.lh
564 self.pos = (o.posE, o.posX, o.posY)
565 self.z = o.z
566
567 def __repr__(self):
568 s = "PackedCaloHypo : \n"
569 s += "\tcentX : %s\n" % (str(self.centX))
570 s += "\tcentY : %s\n" % (str(self.centY))
571 s += "\tcerr : %s\n" % (str(self.cerr))
572 s += "\tcov : %s\n" % (str(self.cov))
573 s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
574 s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
575 s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
576 s += "\thypothesis : %s\n" % (str(self.hypothesis))
577 s += "\tkey : %s\n" % (str(self.key))
578 s += "\tlastCluster : %s\n" % (str(self.lastCluster))
579 s += "\tlastDigit : %s\n" % (str(self.lastDigit))
580 s += "\tlastHypo : %s\n" % (str(self.lastHypo))
581 s += "\tlh : %s\n" % (str(self.lh))
582 s += "\tpos : %s\n" % (str(self.pos))
583 s += "\tz : %s\n" % (str(self.z))
584 s += "---------------------------------------\n"
585 return s
586
587
588# =============================================================================
589
590
591class SyncMini(object):
592 def __init__(self, event, lastEvent=None):
593 self.event = event
594 self.t = 0.0
595 self.lastEvent = None
596 if lastEvent:
597 self.lastEvent = lastEvent
598
599 def check(self):
600 return self.event.is_set()
601
602 def checkLast(self):
603 return self.lastEvent.is_set()
604
605 def reset(self):
606 self.event.clear()
607 self.t = time.time()
608
609 def getTime(self):
610 return self.t
611
612 def set(self):
613 self.event.set()
614
615 def __repr__(self):
616 s = "---------- SyncMini --------------\n"
617 s += " Status : %s\n" % (self.event.is_set())
618 s += " t : %5.2f\n" % (self.t)
619 if self.lastEvent:
620 s += "Last Event : %s\n" % (self.lastEvent.is_set())
621 s += "----------------------------------\n"
622 return s
623
624
625# =============================================================================
626
627
628class Syncer(object):
630 self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
631 ):
632 # Class to help synchronise the sub-processes
633 self.limit = limit
634 self.step = step
635 self.d = {}
636 self.manyEvents = manyEvents
637
638 for i in range(-2, nWorkers):
639 self.d[i] = SyncMini(Event(), lastEvent=Event())
640 if self.manyEvents:
641 self.limitFirst = firstEvent
642
643 self.keys = list(self.d.keys())
644 self.nWorkers = nWorkers
645 self.log = log
646
647 def syncAll(self, step="Not specified"):
648 # is it this method, or is it the rolling version needed?
649 # if so, drop through...
650
651 if self.manyEvents:
652 sc = self.syncAllRolling()
653 return sc
654
655 # Regular version ----------------------------
656 for i in range(0, self.limit, self.step):
657 if self.checkAll():
658 self.log.info("%s : All procs done @ %i s" % (step, i))
659 break
660 else:
661 time.sleep(self.step)
662
663 # Now the time limit is up... check the status one final time
664 if self.checkAll():
665 self.log.info("All processes : %s ok." % (step))
666 return SUCCESS
667 else:
668 self.log.critical("Some process is hanging on : %s" % (step))
669 for k in self.keys:
670 hangString = "%s : Proc/Stat : %i/%s" % (step, k, self.d[k].check())
671 self.log.critical(hangString)
672 return FAILURE
673
674 def syncAllRolling(self):
675 # Keep track of the progress of Event processing
676 # Each process syncs after each event, so keep clearing
677 # the sync Event, and re-checking
678 # Note the time between True checks too, if the time
679 # between events exceeds singleEvent, this is considered a hang
680
681 # set the initial time
682 begin = time.time()
683 firstEv = {}
684 timers = {}
685 for k in self.keys:
686 self.d[k].reset()
687 firstEv[k] = False
688 timers[k] = 0.0
689
690 active = self.keys
691 while True:
692 # check the status of each sync object
693 for k in active:
694 sMini = self.d[k]
695
696 if sMini.check() or sMini.checkLast():
697 if sMini.checkLast() and sMini.check():
698 # if last Event set,then event loop finished
699 active.remove(k)
700 alive = time.time() - begin
701 self.log.info("Audit : Node %i alive for %5.2f" % (k, alive))
702 else:
703 sMini.reset()
704 else:
705 # the event still has not been checked, how long is that?
706 # is it the first Event?
707 wait = time.time() - sMini.getTime()
708 cond = wait > self.limit
709 if not firstEv[k]:
710 cond = wait > self.limitFirst
711 firstEv[k] = True
712 if cond:
713 # It is hanging!
714 self.log.critical("Single event wait : %5.2f" % (wait))
715 self.processHang()
716 return FAILURE
717
718 # Termination Criteria : if all procs have been removed, we're done
719 if self.checkLastEvents():
720 self.log.info("TC met for event loop")
721 break
722 else:
723 # sleep, loop again
724 time.sleep(self.step)
725
726 self.log.info("All processes Completed all Events ok")
727 return SUCCESS
728
729 def processHang(self):
730 self.log.critical("Some proc is hanging during Event processing!")
731 for k in self.keys:
732 self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
733 return
734
735 def checkAll(self):
736 # Check the status of each Sync object
737 # return True or False
738 currentStatus = [mini.check() for mini in self.d.values()]
739 return all(currentStatus)
740
742 # check if all of the lastEvents are set to true in self.d[k][1]
743 stat = [sMini.checkLast() for sMini in self.d.values()]
744 return all(stat)
745
746
747# =========================== Methods =========================================
748
749
751 # The class-independent version of the Event Number Retrieval method
752 #
753 n = None
754 # First Attempt : Unpacked Event Data
755 lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
756 for l in lst:
757 try:
758 n = evt[l].evtNumber()
759 return n
760 except Exception:
761 # No evt number at this path
762 continue
763
764 # second attepmt : try DAQ/RawEvent data
765 # The Evt Number is in bank type 16, bank 0, data pt 4
766 try:
767 n = evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
768 return n
769 except Exception:
770 pass
771
772 # Default Action
773 return n
774
775
776# ================================= EOF =======================================
__init__(self, gmpComponent)
Definition pTools.py:239
MergeLumiFSR(self, path, keyedC)
Definition pTools.py:436
MergeFSRobject(self, sourceNode, path, ob)
Definition pTools.py:392
ProcessTimeSpanFSR(self, path, ob)
Definition pTools.py:410
ProcessEventCountFSR(self, path, ob)
Definition pTools.py:432
localCmp(self, tupA, tupB)
Definition pTools.py:247
bookTProfile2D(self, n, o)
Definition pTools.py:218
bookDataObject(self, n, o)
Definition pTools.py:150
__init__(self, gmpComponent)
Definition pTools.py:62
bookTProfile(self, n, o)
Definition pTools.py:204
merge(self, otherLumi)
Definition pTools.py:502
__init__(self, lumi)
Definition pTools.py:471
__init__(self, event, lastEvent=None)
Definition pTools.py:592
syncAll(self, step="Not specified")
Definition pTools.py:647
__init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition pTools.py:631
STL class.
getEventNumber(evt)
Definition pTools.py:750