The Gaudi Framework  master (37c0b60a)
pTools.py
Go to the documentation of this file.
1 
11 import pickle
12 import time
13 from multiprocessing import Event
14 
15 from GaudiPython import FAILURE, SUCCESS, gbl
16 
17 # Eoin Smith
18 # 3 Aug 2010
19 
20 #
21 # This script contains the ancillary classes and functions used in the
22 # GaudiPython Parallel model
23 #
24 # Classes :
25 # - HistoAgent : In charge of extracting Histograms from their Transient Store
26 # on a reader/worker, communicating them to the writer, and on
27 # the writer receiving them and rebuilding a single store
28 #
29 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
30 #
31 # - LumiFSR : FSR data from different workers needs to be carefully merged to
32 # replicate the serial version; this class aids in that task by
33 # representing an LHCb LumiFSR object as a python class
34 #
35 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
36 # differences in serial and parallel output
37 #
38 # - Syncer : This class is responsible for syncing processes for a specified
39 # section of execution. For example, one Syncer object might be
40 # syncing Initialisation, one for Running, one for Finalisation.
41 # Syncer uses multiprocessing.Event() objects as flags which are
42 # visible across the N processes sharing them.
43 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
44 # ensure that there is no hanging; they in effect, allow a timeout
45 # for Initialisation, Run, Finalise on all processes
46 #
47 # - SyncMini : A class wrapper for a multiprocessing.Event() object
48 #
49 # Methods :
50 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
51 # ( AppMgr().evtsvc() ) to this to get the current
52 # Event Number as an integer (even from RawEvents!)
53 #
54 
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # =========================== Classes =========================================
59 
60 
61 class HistoAgent:
62  def __init__(self, gmpComponent):
63  self._gmpc = gmpComponent
64  self.hvt = self._gmpc.hvt
65  self.histos = []
66  self.qin = self._gmpc.hq
67  self.log = self._gmpc.log
68 
69  # There are many methods for booking Histogram Objects to Histo store
70  # here they are collected in a dictionary, with key = a relevant name
71  self.bookingDict = {}
72  self.bookingDict["DataObject"] = self.bookDataObject
73  self.bookingDict["NTuple::Directory"] = self.bookDataObject
74  self.bookingDict["NTuple::File"] = self.bookDataObject
75  self.bookingDict["TH1D"] = self.bookTH1D
76  self.bookingDict["TH2D"] = self.bookTH2D
77  self.bookingDict["TH3D"] = self.bookTH3D
78  self.bookingDict["TProfile"] = self.bookTProfile
79  self.bookingDict["TProfile2D"] = self.bookTProfile2D
80 
81  def register(self, tup):
82  # add a tuple of (worker-id, histoDict) to self.histos
83  assert tup.__class__.__name__ == "tuple"
84  self.histos.append(tup)
85 
86  def Receive(self):
87  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
88  while True:
89  tup = self.qin.get()
90  if tup == "HISTOS_SENT":
91  self.log.debug("received HISTOS_SENT message")
92  hstatus -= 1
93  if not hstatus:
94  break
95  else:
96  self.register(tup)
97  self._gmpc.sEvent.set()
98  self.log.info("Writer received all histo bundles and set sync event")
99  return SUCCESS
100 
101  def RebuildHistoStore(self):
102  """
103  Rebuild the Histogram Store from the histos received by Receive()
104  If we have a histo which is not in the store,
105  book and fill it according to self.bookingDict
106  If we have a histo with a matching histo in the store,
107  add the two histos, remembering that aida2root must be used on
108  the Stored histo for compatibility.
109  """
110  errors = 0
111  for tup in self.histos:
112  workerID, histDict = tup
113  added = 0
114  booked = 0
115 
116  for n in histDict.keys():
117  o = histDict[n]
118  obj = self.hvt.retrieve(n)
119 
120  if obj:
121  try:
122  aida2root(obj).Add(o)
123  except Exception:
124  self.log.warning("FAILED TO ADD : %s" % (str(obj)))
125  errors += 1
126  added += 1
127  else:
128  if o.__class__.__name__ in self.bookingDict.keys():
129  try:
130  self.bookingDict[o.__class__.__name__](n, o)
131  except Exception:
132  self.log.warning(
133  "FAILED TO REGISTER : %s\tto%s"
134  % (o.__class__.__name__, n)
135  )
136  errors += 1
137  else:
138  self.log.warning(
139  "No booking method for: %s\t%s\t%s"
140  % (n, type(o), o.__class__.__name__)
141  )
142  errors += 1
143  booked += 1
144  hs = self.hvt.getHistoNames()
145  self.log.info("Histo Store Rebuilt : ")
146  self.log.info(" Contains %i objects." % (len(hs)))
147  self.log.info(" Errors in Rebuilding : %i" % (errors))
148  return SUCCESS
149 
150  def bookDataObject(self, n, o):
151  """
152  Register a DataObject to the Histo Store
153  """
154  self._gmpc.hvt.registerObject(n, o)
155 
156  def bookTH1D(self, n, o):
157  """
158  Register a ROOT 1D THisto to the Histo Store
159  """
160  obj = self.hvt._ihs.book(
161  n,
162  o.GetTitle(),
163  o.GetXaxis().GetNbins(),
164  o.GetXaxis().GetXmin(),
165  o.GetXaxis().GetXmax(),
166  )
167  aida2root(obj).Add(o)
168 
169  def bookTH2D(self, n, o):
170  """
171  Register a ROOT 2D THisto to the Histo Store
172  """
173  obj = self.hvt._ihs.book(
174  n,
175  o.GetTitle(),
176  o.GetXaxis().GetNbins(),
177  o.GetXaxis().GetXmin(),
178  o.GetXaxis().GetXmax(),
179  o.GetYaxis().GetNbins(),
180  o.GetYaxis().GetXmin(),
181  o.GetYaxis().GetXmax(),
182  )
183  aida2root(obj).Add(o)
184 
185  def bookTH3D(self, n, o):
186  """
187  Register a ROOT 3D THisto to the Histo Store
188  """
189  obj = self.hvt._ihs.book(
190  n,
191  o.GetTitle(),
192  o.GetXaxis().GetXbins(),
193  o.GetXaxis().GetXmin(),
194  o.GetXaxis().GetXmax(),
195  o.GetYaxis().GetXbins(),
196  o.GetYaxis().GetXmin(),
197  o.GetYaxis().GetXmax(),
198  o.GetZaxis().GetXbins(),
199  o.GetZaxis().GetXmin(),
200  o.GetZaxis().GetXmax(),
201  )
202  aida2root(obj).Add(o)
203 
204  def bookTProfile(self, n, o):
205  """
206  Register a ROOT TProfile to the Histo Store
207  """
208  obj = self.hvt._ihs.bookProf(
209  n,
210  o.GetTitle(),
211  o.GetXaxis().GetNbins(),
212  o.GetXaxis().GetXmin(),
213  o.GetXaxis().GetXmax(),
214  o.GetOption(),
215  )
216  aida2root(obj).Add(o)
217 
218  def bookTProfile2D(self, n, o):
219  """
220  Register a ROOT TProfile2D to the Histo Store
221  """
222  obj = self.hvt._ihs.bookProf(
223  n,
224  o.GetTitle(),
225  o.GetXaxis().GetNbins(),
226  o.GetXaxis().GetXmin(),
227  o.GetXaxis().GetXmax(),
228  o.GetYaxis().GetNbins(),
229  o.GetYaxis().GetXmin(),
230  o.GetYaxis().GetXmax(),
231  )
232  aida2root(obj).Add(o)
233 
234 
235 # =============================================================================
236 
237 
239  def __init__(self, gmpComponent):
240  self._gmpc = gmpComponent
241  self.fsr = self._gmpc.fsr
242  self.q = self._gmpc.fq
243  self.log = self._gmpc.log
244  self.objectsIn = [] # used for collecting FSR store objects
245  self.objectsOut = []
246 
247  def localCmp(self, tupA, tupB):
248  # sort tuples by a particular element
249  # for the sort() method
250  ind = 0
251  valA = tupA[ind]
252  valB = tupB[ind]
253  if valA < valB:
254  return -1
255  elif valA > valB:
256  return 1
257  else:
258  return 0
259 
260  def SendFileRecords(self):
261  # send the FileRecords data as part of finalisation
262 
263  # Take Care of FileRecords!
264  # There are two main things to consider here
265  # 1) The DataObjects in the FileRecords Transient Store
266  # 2) The fact that they are Keyed Containers, containing other objects
267  #
268  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
269  # a completeness guarantee,
270  #
271  # send in form ( nodeID, path, object)
272  self.log.info("Sending FileRecords...")
273  lst = self.fsr.getHistoNames()
274 
275  # Check Validity
276  if not lst:
277  self.log.info("No FileRecords Data to send to Writer.")
278  self.q.put("END_FSR")
279  return SUCCESS
280 
281  # no need to send the root node
282  if "/FileRecords" in lst:
283  lst.remove("/FileRecords")
284 
285  for l in lst:
286  o = self.fsr.retrieveObject(l)
287  if hasattr(o, "configureDirectAccess"):
288  o.configureDirectAccess()
289  # lead worker sends everything, as completeness guarantee
290  if self._gmpc.nodeID == 0:
291  self.objectsOut.append((0, l, pickle.dumps(o)))
292  else:
293  # only add the Event Counter
294  # and non-Empty Keyed Containers (ignore empty ones)
295  if l == "/FileRecords/EventCountFSR":
296  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
297  self.objectsOut.append(tup)
298  elif "KeyedContainer" in o.__class__.__name__:
299  # It's a Keyed Container
300  nObjects = o.numberOfObjects()
301  if nObjects:
302  self.log.debug(
303  "Keyed Container %s with %i objects" % (l, nObjects)
304  )
305  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
306  self.objectsOut.append(tup)
307  else:
308  self.log.info("Ignoring %s in FSR" % o.__class__.__name__)
309 
310  self.log.debug("Done with FSR store, just to send to Writer.")
311 
312  if self.objectsOut:
313  self.log.debug("%i FSR objects to Writer" % (len(self.objectsOut)))
314  for ob in self.objectsOut:
315  self.log.debug("\t%s" % (ob[0]))
316  self.q.put(self.objectsOut)
317  else:
318  self.log.info("Valid FSR Store, but no data to send to Writer")
319  self.log.info("SendFSR complete")
320  self.q.put("END_FSR")
321  return SUCCESS
322 
323  def Receive(self):
324  # Receive contents of all Workers FileRecords Transient Stores
325  self.log.info("Receiving FSR store data...")
326  nc = self._gmpc.nWorkers
327  while nc > 0:
328  objects = self.q.get()
329  if objects == "END_FSR":
330  nc -= 1
331  continue
332  if nc == 0:
333  break
334  # but if it's regular objects...
335  for o in objects:
336  self.objectsIn.append(o)
337  # Now sort it by which worker it came from
338  # an object is : (nodeID, path, pickledObject)
339  self.objectsIn.sort(cmp=self.localCmp)
340  self.log.info("All FSR data received")
341  return SUCCESS
342 
343  def Rebuild(self):
344  # objects is a list of (path, serializedObject) tuples
345  for sourceNode, path, serialob in self.objectsIn:
346  self.log.debug("Working with %s" % (path))
347  ob = pickle.loads(serialob)
348  if hasattr(ob, "update"):
349  ob.update()
350  if hasattr(ob, "numberOfObjects"):
351  nCont = ob.numberOfObjects()
352  self.log.debug(
353  "\t %s has containedObjects : %i" % (type(ob).__name__, nCont)
354  )
355  if sourceNode == 0:
356  self.log.debug("Registering Object to : %s" % (path))
357  self.fsr.registerObject(path, ob)
358  else:
359  self.log.debug("Merging Object to : %s" % (path))
360  self.MergeFSRobject(sourceNode, path, ob)
361  # As RecordStream has been split into Worker and Writer parts, the
362  # count for output is wrong... fix that here, as every event received
363  # by the writer is written (validation testing occurs on the worker)
364 
365  self.log.info("FSR Store Rebuilt. Correcting EventCountFSR")
366  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
367  ecount = "/FileRecords/EventCountFSR"
368  if self.fsr[ecount]:
369  self.fsr[ecount].setOutput(self._gmpc.nIn)
370  self.log.info(
371  "Event Counter Output set : %s : %i"
372  % (ecount, self.fsr[ecount].output())
373  )
374  # Do some reporting
375  self.log.debug("FSR store reconstructed!")
376  lst = self.fsr.getHistoNames()
377  if lst:
378  for l in lst:
379  ob = self.fsr.retrieveObject(l)
380  if hasattr(ob, "configureDirectAccess"):
381  ob.configureDirectAccess()
382  if hasattr(ob, "containedObjects"):
383  # if ob.numberOfObjects() :
384  self.log.debug(
385  "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
386  )
387  else:
388  self.log.debug("\t%s" % (l))
389  self.log.info("FSR Store fully rebuilt.")
390  return SUCCESS
391 
392  def MergeFSRobject(self, sourceNode, path, ob):
393  # Merge Non-Empty Keyed Container from Worker>0
394  if path == "/FileRecords/TimeSpanFSR":
395  # TimeSpanFSR is a straightforward case
396  self.ProcessTimeSpanFSR(path, ob)
397  elif path == "/FileRecords/EventCountFSR":
398  # Event Counter is also easy
399  self.ProcessEventCountFSR(path, ob)
400  # now other cases may not be so easy...
401  elif (
402  "KeyedContainer" in ob.__class__.__name__
403  and "LumiFSR" in ob.__class__.__name__
404  ):
405  # Keyed Container of LumiFSRs : extract and re-register
406  self.MergeLumiFSR(path, ob)
407  else:
408  self.log.info("Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
409 
410  def ProcessTimeSpanFSR(self, path, ob):
411  ob2 = self.fsr.retrieveObject(path)
412  if ob.containedObjects().size():
413  sz = ob.containedObjects().size()
414  cob = ob2.containedObjects()[0]
415  min = cob.earliest()
416  max = cob.latest()
417  for j in range(sz):
418  cob = ob.containedObjects()[j]
419  self.log.debug("Adding TimeSpanFSR")
420  if cob.earliest() < min:
421  min = cob.earliest()
422  if cob.latest() > max:
423  max = cob.latest()
424  # this is annoying: it has to be rebuilt, without a key & added
425  continue
426  tsfsr = gbl.LHCb.TimeSpanFSR()
427  tsfsr.setEarliest(min)
428  tsfsr.setLatest(max)
429  self.fsr[path].clear()
430  self.fsr[path].add(tsfsr)
431 
432  def ProcessEventCountFSR(self, path, ob):
433  self.log.debug("Event Count Input Addition")
434  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
435 
436  def MergeLumiFSR(self, path, keyedC):
437  from ROOT import string
438 
439  # Fetch the first lumi
440  keyedContainer = self.fsr.retrieveObject(path)
441  # The LumiFSR KeyedContainers only have one object
442  assert keyedContainer.numberOfObjects() == 1
443  l = keyedContainer.containedObject(0)
444  baseLumi = LumiFSR(l)
445  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
446  nCont = keyedC.numberOfObjects()
447  for i in range(nCont):
448  obj = keyedC.containedObject(i)
449  nextLumi = LumiFSR(obj)
450  baseLumi.merge(nextLumi)
451  # Now Rebuild and ReRegister
452  newLumi = gbl.LHCb.LumiFSR()
453  for r in baseLumi.runs:
454  newLumi.addRunNumber(r)
455  for f in baseLumi.files:
456  newLumi.addFileID(string(f))
457  for k in baseLumi.keys:
458  increment, integral = baseLumi.info[k]
459  newLumi.addInfo(k, increment, integral)
460  # clear existing Keyed Container
461  self.fsr[path].clear()
462  # Add newly merged lumiFSR
463  self.fsr[path].add(newLumi)
464  return SUCCESS
465 
466 
467 # =============================================================================
468 
469 
470 class LumiFSR:
471  def __init__(self, lumi):
472  # lumi looks like :
473  # { runs : 69857 69858
474  # files : root:/castor/cer.../069857_0000000006.raw
475  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
476 
477  # class variables
478  self.runs = []
479  self.files = []
480  self.info = {}
481  self.keys = []
482 
483  # get run numbers
484  for r in lumi.runNumbers():
485  self.runs.append(r)
486  # get file ids
487  for f in lumi.fileIDs():
488  self.files.append(f)
489  # Now the tricky bit, the info is not accessible via Python
490  # except as a string
491  s = str(lumi)
492  sa = s.split("info (key/incr/integral) : ")[-1]
493  sa = sa.split("/")[:-1]
494  for rec in sa:
495  k, i, t = rec.split()
496  k = int(k)
497  i = int(i)
498  t = int(t)
499  self.info[k] = (i, t)
500  self.keys = self.info.keys()
501 
502  def merge(self, otherLumi):
503  assert otherLumi.__class__.__name__ == "LumiFSR"
504  # add any extra runs
505  for r in otherLumi.runs:
506  if r in self.runs:
507  pass
508  else:
509  self.runs.append(r)
510  self.runs.sort()
511  # add any extra fileIDs
512  for f in otherLumi.files:
513  if f in self.files:
514  pass
515  else:
516  self.files.append(f)
517  self.files.sort()
518  # Now add any extra records
519  for k in otherLumi.keys:
520  increment, integral = otherLumi.info[k]
521  if k in self.keys:
522  myIncrement, myIntegral = self.info[k]
523  self.info[k] = (myIncrement + increment, myIntegral + integral)
524  else:
525  self.info[k] = (increment, integral)
526  # don't forget to update keys
527  self.keys = self.info.keys()
528 
529  def __repr__(self):
530  s = "LumiFSR Python class\n"
531  s += "\tRuns : \n"
532  for r in self.runs:
533  s += "\t\t%i\n" % (r)
534  s += "\tFiles : \n"
535  for f in self.files:
536  s += "\t\t%s\n" % (f)
537  s += "\tInfo : \n"
538  for k in self.keys:
539  increment, integral = self.info[k]
540  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
541  return s
542 
543 
544 # =============================================================================
545 
546 
548  def __init__(self, o):
549  cl = "LHCb::PackedCaloHypo"
550  assert o.__class__.__name__ == cl
551  self.centX = o.centX
552  self.centY = o.centY
553  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
554  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
555  self.firstCluster = o.firstCluster
556  self.firstDigit = o.firstDigit
557  self.firstHypo = o.firstHypo
558  self.hypothesis = o.hypothesis
559  self.key = o.key
560  self.lastCluster = o.lastCluster
561  self.lastDigit = o.lastDigit
562  self.lastHypo = o.lastHypo
563  self.lh = o.lh
564  self.pos = (o.posE, o.posX, o.posY)
565  self.z = o.z
566 
567  def __repr__(self):
568  s = "PackedCaloHypo : \n"
569  s += "\tcentX : %s\n" % (str(self.centX))
570  s += "\tcentY : %s\n" % (str(self.centY))
571  s += "\tcerr : %s\n" % (str(self.cerr))
572  s += "\tcov : %s\n" % (str(self.cov))
573  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
574  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
575  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
576  s += "\thypothesis : %s\n" % (str(self.hypothesis))
577  s += "\tkey : %s\n" % (str(self.key))
578  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
579  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
580  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
581  s += "\tlh : %s\n" % (str(self.lh))
582  s += "\tpos : %s\n" % (str(self.pos))
583  s += "\tz : %s\n" % (str(self.z))
584  s += "---------------------------------------\n"
585  return s
586 
587 
588 # =============================================================================
589 
590 
591 class SyncMini(object):
592  def __init__(self, event, lastEvent=None):
593  self.event = event
594  self.t = 0.0
595  self.lastEvent = None
596  if lastEvent:
597  self.lastEvent = lastEvent
598 
599  def check(self):
600  return self.event.is_set()
601 
602  def checkLast(self):
603  return self.lastEvent.is_set()
604 
605  def reset(self):
606  self.event.clear()
607  self.t = time.time()
608 
609  def getTime(self):
610  return self.t
611 
612  def set(self):
613  self.event.set()
614 
615  def __repr__(self):
616  s = "---------- SyncMini --------------\n"
617  s += " Status : %s\n" % (self.event.is_set())
618  s += " t : %5.2f\n" % (self.t)
619  if self.lastEvent:
620  s += "Last Event : %s\n" % (self.lastEvent.is_set())
621  s += "----------------------------------\n"
622  return s
623 
624 
625 # =============================================================================
626 
627 
628 class Syncer(object):
629  def __init__(
630  self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
631  ):
632  # Class to help synchronise the sub-processes
633  self.limit = limit
634  self.step = step
635  self.d = {}
636  self.manyEvents = manyEvents
637 
638  for i in range(-2, nWorkers):
639  self.d[i] = SyncMini(Event(), lastEvent=Event())
640  if self.manyEvents:
641  self.limitFirst = firstEvent
642 
643  self.keys = list(self.d.keys())
644  self.nWorkers = nWorkers
645  self.log = log
646 
647  def syncAll(self, step="Not specified"):
648  # is it this method, or is it the rolling version needed?
649  # if so, drop through...
650 
651  if self.manyEvents:
652  sc = self.syncAllRolling()
653  return sc
654 
655  # Regular version ----------------------------
656  for i in range(0, self.limit, self.step):
657  if self.checkAll():
658  self.log.info("%s : All procs done @ %i s" % (step, i))
659  break
660  else:
661  time.sleep(self.step)
662 
663  # Now the time limit is up... check the status one final time
664  if self.checkAll():
665  self.log.info("All processes : %s ok." % (step))
666  return SUCCESS
667  else:
668  self.log.critical("Some process is hanging on : %s" % (step))
669  for k in self.keys:
670  hangString = "%s : Proc/Stat : %i/%s" % (step, k, self.d[k].check())
671  self.log.critical(hangString)
672  return FAILURE
673 
674  def syncAllRolling(self):
675  # Keep track of the progress of Event processing
676  # Each process syncs after each event, so keep clearing
677  # the sync Event, and re-checking
678  # Note the time between True checks too, if the time
679  # between events exceeds singleEvent, this is considered a hang
680 
681  # set the initial time
682  begin = time.time()
683  firstEv = {}
684  timers = {}
685  for k in self.keys:
686  self.d[k].reset()
687  firstEv[k] = False
688  timers[k] = 0.0
689 
690  active = self.keys
691  while True:
692  # check the status of each sync object
693  for k in active:
694  sMini = self.d[k]
695 
696  if sMini.check() or sMini.checkLast():
697  if sMini.checkLast() and sMini.check():
698  # if last Event set,then event loop finished
699  active.remove(k)
700  alive = time.time() - begin
701  self.log.info("Audit : Node %i alive for %5.2f" % (k, alive))
702  else:
703  sMini.reset()
704  else:
705  # the event still has not been checked, how long is that?
706  # is it the first Event?
707  wait = time.time() - sMini.getTime()
708  cond = wait > self.limit
709  if not firstEv[k]:
710  cond = wait > self.limitFirst
711  firstEv[k] = True
712  if cond:
713  # It is hanging!
714  self.log.critical("Single event wait : %5.2f" % (wait))
715  self.processHang()
716  return FAILURE
717 
718  # Termination Criteria : if all procs have been removed, we're done
719  if self.checkLastEvents():
720  self.log.info("TC met for event loop")
721  break
722  else:
723  # sleep, loop again
724  time.sleep(self.step)
725 
726  self.log.info("All processes Completed all Events ok")
727  return SUCCESS
728 
729  def processHang(self):
730  self.log.critical("Some proc is hanging during Event processing!")
731  for k in self.keys:
732  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
733  return
734 
735  def checkAll(self):
736  # Check the status of each Sync object
737  # return True or False
738  currentStatus = [mini.check() for mini in self.d.values()]
739  return all(currentStatus)
740 
741  def checkLastEvents(self):
742  # check if all of the lastEvents are set to true in self.d[k][1]
743  stat = [sMini.checkLast() for sMini in self.d.values()]
744  return all(stat)
745 
746 
747 # =========================== Methods =========================================
748 
749 
750 def getEventNumber(evt):
751  # The class-independent version of the Event Number Retrieval method
752  #
753  n = None
754  # First Attempt : Unpacked Event Data
755  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
756  for l in lst:
757  try:
758  n = evt[l].evtNumber()
759  return n
760  except Exception:
761  # No evt number at this path
762  continue
763 
764  # second attepmt : try DAQ/RawEvent data
765  # The Evt Number is in bank type 16, bank 0, data pt 4
766  try:
767  n = evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
768  return n
769  except Exception:
770  pass
771 
772  # Default Action
773  return n
774 
775 
776 # ================================= EOF =======================================
GaudiMP.pTools.HistoAgent.bookDataObject
def bookDataObject(self, n, o)
Definition: pTools.py:150
GaudiMP.pTools.PackedCaloHypo.__repr__
def __repr__(self)
Definition: pTools.py:567
GaudiMP.pTools.LumiFSR.files
files
Definition: pTools.py:479
GaudiMP.pTools.HistoAgent.log
log
Definition: pTools.py:67
GaudiMP.pTools.HistoAgent.Receive
def Receive(self)
Definition: pTools.py:86
GaudiMP.pTools.SyncMini.getTime
def getTime(self)
Definition: pTools.py:609
GaudiMP.pTools.Syncer.keys
keys
Definition: pTools.py:641
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:238
GaudiMP.pTools.FileRecordsAgent.MergeFSRobject
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:392
GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:432
GaudiMP.pTools.HistoAgent.bookTProfile
def bookTProfile(self, n, o)
Definition: pTools.py:204
details::size
constexpr auto size(const T &, Args &&...) noexcept
Definition: AnyDataWrapper.h:23
GaudiMP.pTools.SyncMini.__repr__
def __repr__(self)
Definition: pTools.py:615
GaudiMP.pTools.Syncer.limit
limit
Definition: pTools.py:631
GaudiMP.pTools.HistoAgent._gmpc
_gmpc
Definition: pTools.py:63
GaudiMP.pTools.PackedCaloHypo.lastCluster
lastCluster
Definition: pTools.py:560
GaudiMP.pTools.SyncMini.event
event
Definition: pTools.py:593
GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:410
GaudiMP.pTools.HistoAgent.bookTH2D
def bookTH2D(self, n, o)
Definition: pTools.py:169
GaudiMP.pTools.HistoAgent.bookTProfile2D
def bookTProfile2D(self, n, o)
Definition: pTools.py:218
GaudiPartProp.decorators.get
get
decorate the vector of properties
Definition: decorators.py:283
GaudiMP.pTools.PackedCaloHypo.firstDigit
firstDigit
Definition: pTools.py:556
GaudiMP.pTools.Syncer.syncAll
def syncAll(self, step="Not specified")
Definition: pTools.py:647
GaudiMP.pTools.Syncer.step
step
Definition: pTools.py:632
gaudirun.output
output
Definition: gaudirun.py:521
GaudiMP.pTools.PackedCaloHypo.lastHypo
lastHypo
Definition: pTools.py:562
GaudiMP.pTools.HistoAgent.bookingDict
bookingDict
Definition: pTools.py:71
GaudiMP.pTools.SyncMini.checkLast
def checkLast(self)
Definition: pTools.py:602
GaudiMP.pTools.aida2root
aida2root
Definition: pTools.py:56
GaudiPartProp.decorators.all
all
decorate service
Definition: decorators.py:53
GaudiMP.pTools.FileRecordsAgent.fsr
fsr
Definition: pTools.py:241
GaudiMP.pTools.SyncMini.lastEvent
lastEvent
Definition: pTools.py:595
GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:436
GaudiMP.pTools.FileRecordsAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:239
GaudiMP.pTools.PackedCaloHypo.lastDigit
lastDigit
Definition: pTools.py:561
GaudiMP.pTools.HistoAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:62
GaudiMP.pTools.getEventNumber
def getEventNumber(evt)
Definition: pTools.py:750
GaudiMP.pTools.SyncMini.__init__
def __init__(self, event, lastEvent=None)
Definition: pTools.py:592
GaudiMP.pTools.LumiFSR.keys
keys
Definition: pTools.py:481
GaudiMP.pTools.Syncer
Definition: pTools.py:628
GaudiMP.pTools.PackedCaloHypo.cerr
cerr
Definition: pTools.py:553
GaudiMP.pTools.Syncer.processHang
def processHang(self)
Definition: pTools.py:729
axes_labels.check
def check(causes, result)
Definition: axes_labels.py:47
GaudiMP.pTools.Syncer.__init__
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:629
GaudiMP.pTools.LumiFSR.runs
runs
Definition: pTools.py:478
GaudiMP.pTools.PackedCaloHypo.hypothesis
hypothesis
Definition: pTools.py:558
GaudiMP.pTools.PackedCaloHypo.key
key
Definition: pTools.py:559
GaudiMP.pTools.SyncMini
Definition: pTools.py:591
GaudiMP.pTools.Syncer.log
log
Definition: pTools.py:643
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.pTools.FileRecordsAgent.Receive
def Receive(self)
Definition: pTools.py:323
GaudiMP.pTools.Syncer.syncAllRolling
def syncAllRolling(self)
Definition: pTools.py:674
GaudiMP.pTools.SyncMini.reset
def reset(self)
Definition: pTools.py:605
GaudiMP.pTools.FileRecordsAgent.localCmp
def localCmp(self, tupA, tupB)
Definition: pTools.py:247
GaudiMP.pTools.HistoAgent.histos
histos
Definition: pTools.py:65
GaudiMP.pTools.LumiFSR.merge
def merge(self, otherLumi)
Definition: pTools.py:502
GaudiMP.pTools.PackedCaloHypo.cov
cov
Definition: pTools.py:554
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.pTools.PackedCaloHypo.centX
centX
Definition: pTools.py:551
GaudiMP.pTools.Syncer.checkAll
def checkAll(self)
Definition: pTools.py:735
StringKeyEx.keys
keys
Definition: StringKeyEx.py:64
GaudiMP.pTools.FileRecordsAgent.Rebuild
def Rebuild(self)
Definition: pTools.py:343
GaudiMP.pTools.Syncer.checkLastEvents
def checkLastEvents(self)
Definition: pTools.py:741
GaudiMP.pTools.PackedCaloHypo.z
z
Definition: pTools.py:565
GaudiMP.pTools.HistoAgent.bookTH1D
def bookTH1D(self, n, o)
Definition: pTools.py:156
GaudiMP.pTools.HistoAgent.qin
qin
Definition: pTools.py:66
GaudiMP.pTools.LumiFSR.__init__
def __init__(self, lumi)
Definition: pTools.py:471
GaudiMP.pTools.Syncer.limitFirst
limitFirst
Definition: pTools.py:639
GaudiMP.pTools.SyncMini.t
t
Definition: pTools.py:594
GaudiMP.pTools.PackedCaloHypo.__init__
def __init__(self, o)
Definition: pTools.py:548
GaudiMP.pTools.PackedCaloHypo.firstHypo
firstHypo
Definition: pTools.py:557
GaudiMP.pTools.HistoAgent.bookTH3D
def bookTH3D(self, n, o)
Definition: pTools.py:185
GaudiMP.pTools.LumiFSR.info
info
Definition: pTools.py:480
GaudiMP.pTools.Syncer.manyEvents
manyEvents
Definition: pTools.py:634
GaudiMP.pTools.PackedCaloHypo.lh
lh
Definition: pTools.py:563
GaudiMP.pTools.FileRecordsAgent.objectsIn
objectsIn
Definition: pTools.py:244
GaudiMP.pTools.LumiFSR.__repr__
def __repr__(self)
Definition: pTools.py:529
GaudiMP.pTools.FileRecordsAgent.q
q
Definition: pTools.py:242
GaudiMP.pTools.HistoAgent.hvt
hvt
Definition: pTools.py:64
GaudiMP.pTools.FileRecordsAgent._gmpc
_gmpc
Definition: pTools.py:240
GaudiMP.pTools.FileRecordsAgent.objectsOut
objectsOut
Definition: pTools.py:245
GaudiMP.pTools.PackedCaloHypo.pos
pos
Definition: pTools.py:564
GaudiMP.pTools.PackedCaloHypo.firstCluster
firstCluster
Definition: pTools.py:555
GaudiMP.pTools.FileRecordsAgent.SendFileRecords
def SendFileRecords(self)
Definition: pTools.py:260
GaudiMP.pTools.PackedCaloHypo
Definition: pTools.py:547
GaudiMP.pTools.SyncMini.set
def set(self)
Definition: pTools.py:612
GaudiMP.pTools.PackedCaloHypo.centY
centY
Definition: pTools.py:552
GaudiMP.pTools.Syncer.d
d
Definition: pTools.py:633
GaudiMP.pTools.LumiFSR
Definition: pTools.py:470
GaudiMP.pTools.FileRecordsAgent.log
log
Definition: pTools.py:243
GaudiMP.pTools.HistoAgent.register
def register(self, tup)
Definition: pTools.py:81
GaudiMP.pTools.HistoAgent.RebuildHistoStore
def RebuildHistoStore(self)
Definition: pTools.py:101
GaudiPython.Persistency.add
def add(instance)
Definition: Persistency.py:50
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: details.h:168
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:97
GaudiMP.pTools.SyncMini.check
def check(self)
Definition: pTools.py:599
GaudiMP.pTools.Syncer.nWorkers
nWorkers
Definition: pTools.py:642