The Gaudi Framework  v36r7 (7f57a304)
pTools.py
Go to the documentation of this file.
1 
11 import pickle
12 import time
13 from multiprocessing import Event
14 
15 from GaudiPython import FAILURE, SUCCESS, gbl
16 
17 # Eoin Smith
18 # 3 Aug 2010
19 
20 #
21 # This script contains the ancillary classes and functions used in the
22 # GaudiPython Parallel model
23 #
24 # Classes :
25 # - HistoAgent : In charge of extracting Histograms from their Transient Store
26 # on a reader/worker, communicating them to the writer, and on
27 # the writer receiving them and rebuilding a single store
28 #
29 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
30 #
31 # - LumiFSR : FSR data from different workers needs to be carefully merged to
32 # replicate the serial version; this class aids in that task by
33 # representing an LHCb LumiFSR object as a python class
34 #
35 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
36 # differences in serial and parallel output
37 #
38 # - Syncer : This class is responsible for syncing processes for a specified
39 # section of execution. For example, one Syncer object might be
40 # syncing Initialisation, one for Running, one for Finalisation.
41 # Syncer uses multiprocessing.Event() objects as flags which are
42 # visible across the N processes sharing them.
43 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
44 # ensure that there is no hanging; they in effect, allow a timeout
45 # for Initialisation, Run, Finalise on all processes
46 #
47 # - SyncMini : A class wrapper for a multiprocessing.Event() object
48 #
49 # Methods :
50 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
51 # ( AppMgr().evtsvc() ) to this to get the current
52 # Event Number as an integer (even from RawEvents!)
53 #
54 
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # =========================== Classes =========================================
59 
60 
61 class HistoAgent:
62  def __init__(self, gmpComponent):
63  self._gmpc = gmpComponent
64  self.hvt = self._gmpc.hvt
65  self.histos = []
66  self.qin = self._gmpc.hq
67  self.log = self._gmpc.log
68 
69  # There are many methods for booking Histogram Objects to Histo store
70  # here they are collected in a dictionary, with key = a relevant name
71  self.bookingDict = {}
72  self.bookingDict["DataObject"] = self.bookDataObject
73  self.bookingDict["NTuple::Directory"] = self.bookDataObject
74  self.bookingDict["NTuple::File"] = self.bookDataObject
75  self.bookingDict["TH1D"] = self.bookTH1D
76  self.bookingDict["TH2D"] = self.bookTH2D
77  self.bookingDict["TH3D"] = self.bookTH3D
78  self.bookingDict["TProfile"] = self.bookTProfile
79  self.bookingDict["TProfile2D"] = self.bookTProfile2D
80 
81  def register(self, tup):
82  # add a tuple of (worker-id, histoDict) to self.histos
83  assert tup.__class__.__name__ == "tuple"
84  self.histos.append(tup)
85 
86  def Receive(self):
87  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
88  while True:
89  tup = self.qin.get()
90  if tup == "HISTOS_SENT":
91  self.log.debug("received HISTOS_SENT message")
92  hstatus -= 1
93  if not hstatus:
94  break
95  else:
96  self.register(tup)
97  self._gmpc.sEvent.set()
98  self.log.info("Writer received all histo bundles and set sync event")
99  return SUCCESS
100 
101  def RebuildHistoStore(self):
102  """
103  Rebuild the Histogram Store from the histos received by Receive()
104  If we have a histo which is not in the store,
105  book and fill it according to self.bookingDict
106  If we have a histo with a matching histo in the store,
107  add the two histos, remembering that aida2root must be used on
108  the Stored histo for compatibility.
109  """
110  errors = 0
111  for tup in self.histos:
112  workerID, histDict = tup
113  added = 0
114  registered = 0
115  booked = 0
116 
117  for n in histDict.keys():
118  o = histDict[n]
119  obj = self.hvt.retrieve(n)
120 
121  if obj:
122  try:
123  aida2root(obj).Add(o)
124  except:
125  self.log.warning("FAILED TO ADD : %s" % (str(obj)))
126  errors += 1
127  added += 1
128  else:
129 
130  if o.__class__.__name__ in self.bookingDict.keys():
131  try:
132  self.bookingDict[o.__class__.__name__](n, o)
133  except:
134  self.log.warning(
135  "FAILED TO REGISTER : %s\tto%s"
136  % (o.__class__.__name__, n)
137  )
138  errors += 1
139  else:
140  self.log.warning(
141  "No booking method for: %s\t%s\t%s"
142  % (n, type(o), o.__class__.__name__)
143  )
144  errors += 1
145  booked += 1
146  hs = self.hvt.getHistoNames()
147  self.log.info("Histo Store Rebuilt : ")
148  self.log.info(" Contains %i objects." % (len(hs)))
149  self.log.info(" Errors in Rebuilding : %i" % (errors))
150  return SUCCESS
151 
152  def bookDataObject(self, n, o):
153  """
154  Register a DataObject to the Histo Store
155  """
156  self._gmpc.hvt.registerObject(n, o)
157 
158  def bookTH1D(self, n, o):
159  """
160  Register a ROOT 1D THisto to the Histo Store
161  """
162  obj = self.hvt._ihs.book(
163  n,
164  o.GetTitle(),
165  o.GetXaxis().GetNbins(),
166  o.GetXaxis().GetXmin(),
167  o.GetXaxis().GetXmax(),
168  )
169  aida2root(obj).Add(o)
170 
171  def bookTH2D(self, n, o):
172  """
173  Register a ROOT 2D THisto to the Histo Store
174  """
175  obj = self.hvt._ihs.book(
176  n,
177  o.GetTitle(),
178  o.GetXaxis().GetNbins(),
179  o.GetXaxis().GetXmin(),
180  o.GetXaxis().GetXmax(),
181  o.GetYaxis().GetNbins(),
182  o.GetYaxis().GetXmin(),
183  o.GetYaxis().GetXmax(),
184  )
185  aida2root(obj).Add(o)
186 
187  def bookTH3D(self, n, o):
188  """
189  Register a ROOT 3D THisto to the Histo Store
190  """
191  obj = self.hvt._ihs.book(
192  n,
193  o.GetTitle(),
194  o.GetXaxis().GetXbins(),
195  o.GetXaxis().GetXmin(),
196  o.GetXaxis().GetXmax(),
197  o.GetYaxis().GetXbins(),
198  o.GetYaxis().GetXmin(),
199  o.GetYaxis().GetXmax(),
200  o.GetZaxis().GetXbins(),
201  o.GetZaxis().GetXmin(),
202  o.GetZaxis().GetXmax(),
203  )
204  aida2root(obj).Add(o)
205 
206  def bookTProfile(self, n, o):
207  """
208  Register a ROOT TProfile to the Histo Store
209  """
210  obj = self.hvt._ihs.bookProf(
211  n,
212  o.GetTitle(),
213  o.GetXaxis().GetNbins(),
214  o.GetXaxis().GetXmin(),
215  o.GetXaxis().GetXmax(),
216  o.GetOption(),
217  )
218  aida2root(obj).Add(o)
219 
220  def bookTProfile2D(self, n, o):
221  """
222  Register a ROOT TProfile2D to the Histo Store
223  """
224  obj = self.hvt._ihs.bookProf(
225  n,
226  o.GetTitle(),
227  o.GetXaxis().GetNbins(),
228  o.GetXaxis().GetXmin(),
229  o.GetXaxis().GetXmax(),
230  o.GetYaxis().GetNbins(),
231  o.GetYaxis().GetXmin(),
232  o.GetYaxis().GetXmax(),
233  )
234  aida2root(obj).Add(o)
235 
236 
237 # =============================================================================
238 
239 
241  def __init__(self, gmpComponent):
242  self._gmpc = gmpComponent
243  self.fsr = self._gmpc.fsr
244  self.q = self._gmpc.fq
245  self.log = self._gmpc.log
246  self.objectsIn = [] # used for collecting FSR store objects
247  self.objectsOut = []
248 
249  def localCmp(self, tupA, tupB):
250  # sort tuples by a particular element
251  # for the sort() method
252  ind = 0
253  valA = tupA[ind]
254  valB = tupB[ind]
255  if valA < valB:
256  return -1
257  elif valA > valB:
258  return 1
259  else:
260  return 0
261 
262  def SendFileRecords(self):
263  # send the FileRecords data as part of finalisation
264 
265  # Take Care of FileRecords!
266  # There are two main things to consider here
267  # 1) The DataObjects in the FileRecords Transient Store
268  # 2) The fact that they are Keyed Containers, containing other objects
269  #
270  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
271  # a completeness guarantee,
272  #
273  # send in form ( nodeID, path, object)
274  self.log.info("Sending FileRecords...")
275  lst = self.fsr.getHistoNames()
276 
277  # Check Validity
278  if not lst:
279  self.log.info("No FileRecords Data to send to Writer.")
280  self.q.put("END_FSR")
281  return SUCCESS
282 
283  # no need to send the root node
284  if "/FileRecords" in lst:
285  lst.remove("/FileRecords")
286 
287  for l in lst:
288  o = self.fsr.retrieveObject(l)
289  if hasattr(o, "configureDirectAccess"):
290  o.configureDirectAccess()
291  # lead worker sends everything, as completeness guarantee
292  if self._gmpc.nodeID == 0:
293  self.objectsOut.append((0, l, pickle.dumps(o)))
294  else:
295  # only add the Event Counter
296  # and non-Empty Keyed Containers (ignore empty ones)
297  if l == "/FileRecords/EventCountFSR":
298  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
299  self.objectsOut.append(tup)
300  elif "KeyedContainer" in o.__class__.__name__:
301  # It's a Keyed Container
302  nObjects = o.numberOfObjects()
303  if nObjects:
304  self.log.debug(
305  "Keyed Container %s with %i objects" % (l, nObjects)
306  )
307  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
308  self.objectsOut.append(tup)
309  else:
310  self.log.info("Ignoring %s in FSR" % o.__class__.__name__)
311 
312  self.log.debug("Done with FSR store, just to send to Writer.")
313 
314  if self.objectsOut:
315  self.log.debug("%i FSR objects to Writer" % (len(self.objectsOut)))
316  for ob in self.objectsOut:
317  self.log.debug("\t%s" % (ob[0]))
318  self.q.put(self.objectsOut)
319  else:
320  self.log.info("Valid FSR Store, but no data to send to Writer")
321  self.log.info("SendFSR complete")
322  self.q.put("END_FSR")
323  return SUCCESS
324 
325  def Receive(self):
326  # Receive contents of all Workers FileRecords Transient Stores
327  self.log.info("Receiving FSR store data...")
328  nc = self._gmpc.nWorkers
329  while nc > 0:
330  objects = self.q.get()
331  if objects == "END_FSR":
332  nc -= 1
333  continue
334  if nc == 0:
335  break
336  # but if it's regular objects...
337  for o in objects:
338  self.objectsIn.append(o)
339  # Now sort it by which worker it came from
340  # an object is : (nodeID, path, pickledObject)
341  self.objectsIn.sort(cmp=self.localCmp)
342  self.log.info("All FSR data received")
343  return SUCCESS
344 
345  def Rebuild(self):
346  # objects is a list of (path, serializedObject) tuples
347  for sourceNode, path, serialob in self.objectsIn:
348  self.log.debug("Working with %s" % (path))
349  ob = pickle.loads(serialob)
350  if hasattr(ob, "update"):
351  ob.update()
352  if hasattr(ob, "numberOfObjects"):
353  nCont = ob.numberOfObjects()
354  self.log.debug(
355  "\t %s has containedObjects : %i" % (type(ob).__name__, nCont)
356  )
357  if sourceNode == 0:
358  self.log.debug("Registering Object to : %s" % (path))
359  self.fsr.registerObject(path, ob)
360  else:
361  self.log.debug("Merging Object to : %s" % (path))
362  self.MergeFSRobject(sourceNode, path, ob)
363  # As RecordStream has been split into Worker and Writer parts, the
364  # count for output is wrong... fix that here, as every event received
365  # by the writer is written (validation testing occurs on the worker)
366 
367  self.log.info("FSR Store Rebuilt. Correcting EventCountFSR")
368  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
369  ecount = "/FileRecords/EventCountFSR"
370  if self.fsr[ecount]:
371  self.fsr[ecount].setOutput(self._gmpc.nIn)
372  self.log.info(
373  "Event Counter Output set : %s : %i"
374  % (ecount, self.fsr[ecount].output())
375  )
376  # Do some reporting
377  self.log.debug("FSR store reconstructed!")
378  lst = self.fsr.getHistoNames()
379  if lst:
380  for l in lst:
381  ob = self.fsr.retrieveObject(l)
382  if hasattr(ob, "configureDirectAccess"):
383  ob.configureDirectAccess()
384  if hasattr(ob, "containedObjects"):
385  # if ob.numberOfObjects() :
386  self.log.debug(
387  "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
388  )
389  else:
390  self.log.debug("\t%s" % (l))
391  self.log.info("FSR Store fully rebuilt.")
392  return SUCCESS
393 
394  def MergeFSRobject(self, sourceNode, path, ob):
395  # Merge Non-Empty Keyed Container from Worker>0
396  if path == "/FileRecords/TimeSpanFSR":
397  # TimeSpanFSR is a straightforward case
398  self.ProcessTimeSpanFSR(path, ob)
399  elif path == "/FileRecords/EventCountFSR":
400  # Event Counter is also easy
401  self.ProcessEventCountFSR(path, ob)
402  # now other cases may not be so easy...
403  elif (
404  "KeyedContainer" in ob.__class__.__name__
405  and "LumiFSR" in ob.__class__.__name__
406  ):
407  # Keyed Container of LumiFSRs : extract and re-register
408  self.MergeLumiFSR(path, ob)
409  else:
410  self.log.info("Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
411 
412  def ProcessTimeSpanFSR(self, path, ob):
413  ob2 = self.fsr.retrieveObject(path)
414  if ob.containedObjects().size():
415  sz = ob.containedObjects().size()
416  cob = ob2.containedObjects()[0]
417  min = cob.earliest()
418  max = cob.latest()
419  for j in range(sz):
420  cob = ob.containedObjects()[j]
421  self.log.debug("Adding TimeSpanFSR")
422  if cob.earliest() < min:
423  min = cob.earliest()
424  if cob.latest() > max:
425  max = cob.latest()
426  # this is annoying: it has to be rebuilt, without a key & added
427  continue
428  tsfsr = gbl.LHCb.TimeSpanFSR()
429  tsfsr.setEarliest(min)
430  tsfsr.setLatest(max)
431  self.fsr[path].clear()
432  self.fsr[path].add(tsfsr)
433 
434  def ProcessEventCountFSR(self, path, ob):
435  self.log.debug("Event Count Input Addition")
436  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
437 
438  def MergeLumiFSR(self, path, keyedC):
439  from ROOT import string
440 
441  # Fetch the first lumi
442  keyedContainer = self.fsr.retrieveObject(path)
443  # The LumiFSR KeyedContainers only have one object
444  assert keyedContainer.numberOfObjects() == 1
445  l = keyedContainer.containedObject(0)
446  baseLumi = LumiFSR(l)
447  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
448  nCont = keyedC.numberOfObjects()
449  for i in range(nCont):
450  obj = keyedC.containedObject(i)
451  nextLumi = LumiFSR(obj)
452  baseLumi.merge(nextLumi)
453  # Now Rebuild and ReRegister
454  newLumi = gbl.LHCb.LumiFSR()
455  for r in baseLumi.runs:
456  newLumi.addRunNumber(r)
457  for f in baseLumi.files:
458  newLumi.addFileID(string(f))
459  for k in baseLumi.keys:
460  increment, integral = baseLumi.info[k]
461  newLumi.addInfo(k, increment, integral)
462  # clear existing Keyed Container
463  self.fsr[path].clear()
464  # Add newly merged lumiFSR
465  self.fsr[path].add(newLumi)
466  return SUCCESS
467 
468 
469 # =============================================================================
470 
471 
472 class LumiFSR:
473  def __init__(self, lumi):
474  # lumi looks like :
475  # { runs : 69857 69858
476  # files : root:/castor/cer.../069857_0000000006.raw
477  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
478 
479  # class variables
480  self.runs = []
481  self.files = []
482  self.info = {}
483  self.keys = []
484 
485  # get run numbers
486  for r in lumi.runNumbers():
487  self.runs.append(r)
488  # get file ids
489  for f in lumi.fileIDs():
490  self.files.append(f)
491  # Now the tricky bit, the info is not accessible via Python
492  # except as a string
493  s = str(lumi)
494  sa = s.split("info (key/incr/integral) : ")[-1]
495  sa = sa.split("/")[:-1]
496  for rec in sa:
497  k, i, t = rec.split()
498  k = int(k)
499  i = int(i)
500  t = int(t)
501  self.info[k] = (i, t)
502  self.keys = self.info.keys()
503 
504  def merge(self, otherLumi):
505  assert otherLumi.__class__.__name__ == "LumiFSR"
506  # add any extra runs
507  for r in otherLumi.runs:
508  if r in self.runs:
509  pass
510  else:
511  self.runs.append(r)
512  self.runs.sort()
513  # add any extra fileIDs
514  for f in otherLumi.files:
515  if f in self.files:
516  pass
517  else:
518  self.files.append(f)
519  self.files.sort()
520  # Now add any extra records
521  for k in otherLumi.keys:
522  increment, integral = otherLumi.info[k]
523  if k in self.keys:
524  myIncrement, myIntegral = self.info[k]
525  self.info[k] = (myIncrement + increment, myIntegral + integral)
526  else:
527  self.info[k] = (increment, integral)
528  # don't forget to update keys
529  self.keys = self.info.keys()
530 
531  def __repr__(self):
532  s = "LumiFSR Python class\n"
533  s += "\tRuns : \n"
534  for r in self.runs:
535  s += "\t\t%i\n" % (r)
536  s += "\tFiles : \n"
537  for f in self.files:
538  s += "\t\t%s\n" % (f)
539  s += "\tInfo : \n"
540  for k in self.keys:
541  increment, integral = self.info[k]
542  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
543  return s
544 
545 
546 # =============================================================================
547 
548 
550  def __init__(self, o):
551  cl = "LHCb::PackedCaloHypo"
552  assert o.__class__.__name__ == cl
553  self.centX = o.centX
554  self.centY = o.centY
555  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
556  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
557  self.firstCluster = o.firstCluster
558  self.firstDigit = o.firstDigit
559  self.firstHypo = o.firstHypo
560  self.hypothesis = o.hypothesis
561  self.key = o.key
562  self.lastCluster = o.lastCluster
563  self.lastDigit = o.lastDigit
564  self.lastHypo = o.lastHypo
565  self.lh = o.lh
566  self.pos = (o.posE, o.posX, o.posY)
567  self.z = o.z
568 
569  def __repr__(self):
570  s = "PackedCaloHypo : \n"
571  s += "\tcentX : %s\n" % (str(self.centX))
572  s += "\tcentY : %s\n" % (str(self.centY))
573  s += "\tcerr : %s\n" % (str(self.cerr))
574  s += "\tcov : %s\n" % (str(self.cov))
575  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
576  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
577  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
578  s += "\thypothesis : %s\n" % (str(self.hypothesis))
579  s += "\tkey : %s\n" % (str(self.key))
580  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
581  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
582  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
583  s += "\tlh : %s\n" % (str(self.lh))
584  s += "\tpos : %s\n" % (str(self.pos))
585  s += "\tz : %s\n" % (str(self.z))
586  s += "---------------------------------------\n"
587  return s
588 
589 
590 # =============================================================================
591 
592 
593 class SyncMini(object):
594  def __init__(self, event, lastEvent=None):
595  self.event = event
596  self.t = 0.0
597  self.lastEvent = None
598  if lastEvent:
599  self.lastEvent = lastEvent
600 
601  def check(self):
602  return self.event.is_set()
603 
604  def checkLast(self):
605  return self.lastEvent.is_set()
606 
607  def reset(self):
608  self.event.clear()
609  self.t = time.time()
610 
611  def getTime(self):
612  return self.t
613 
614  def set(self):
615  self.event.set()
616 
617  def __repr__(self):
618  s = "---------- SyncMini --------------\n"
619  s += " Status : %s\n" % (self.event.is_set())
620  s += " t : %5.2f\n" % (self.t)
621  if self.lastEvent:
622  s += "Last Event : %s\n" % (self.lastEvent.is_set())
623  s += "----------------------------------\n"
624  return s
625 
626 
627 # =============================================================================
628 
629 
630 class Syncer(object):
631  def __init__(
632  self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
633  ):
634  # Class to help synchronise the sub-processes
635  self.limit = limit
636  self.step = step
637  self.d = {}
638  self.manyEvents = manyEvents
639 
640  for i in range(-2, nWorkers):
641  self.d[i] = SyncMini(Event(), lastEvent=Event())
642  if self.manyEvents:
643  self.limitFirst = firstEvent
644 
645  self.keys = self.d.keys()
646  self.nWorkers = nWorkers
647  self.log = log
648 
649  def syncAll(self, step="Not specified"):
650  # is it this method, or is it the rolling version needed?
651  # if so, drop through...
652 
653  if self.manyEvents:
654  sc = self.syncAllRolling()
655  return sc
656 
657  # Regular version ----------------------------
658  for i in range(0, self.limit, self.step):
659  if self.checkAll():
660  self.log.info("%s : All procs done @ %i s" % (step, i))
661  break
662  else:
663  time.sleep(self.step)
664 
665  # Now the time limit is up... check the status one final time
666  if self.checkAll():
667  self.log.info("All processes : %s ok." % (step))
668  return SUCCESS
669  else:
670  self.log.critical("Some process is hanging on : %s" % (step))
671  for k in self.keys:
672  hangString = "%s : Proc/Stat : %i/%s" % (step, k, self.d[k].check())
673  self.log.critical(hangString)
674  return FAILURE
675 
676  def syncAllRolling(self):
677  # Keep track of the progress of Event processing
678  # Each process syncs after each event, so keep clearing
679  # the sync Event, and re-checking
680  # Note the time between True checks too, if the time
681  # between events exceeds singleEvent, this is considered a hang
682 
683  # set the initial time
684  begin = time.time()
685  firstEv = {}
686  timers = {}
687  for k in self.keys:
688  self.d[k].reset()
689  firstEv[k] = False
690  timers[k] = 0.0
691 
692  active = self.keys
693  while True:
694  # check the status of each sync object
695  for k in active:
696  sMini = self.d[k]
697 
698  if sMini.check() or sMini.checkLast():
699  if sMini.checkLast() and sMini.check():
700  # if last Event set,then event loop finished
701  active.remove(k)
702  alive = time.time() - begin
703  self.log.info("Audit : Node %i alive for %5.2f" % (k, alive))
704  else:
705  sMini.reset()
706  else:
707  # the event still has not been checked, how long is that?
708  # is it the first Event?
709  wait = time.time() - sMini.getTime()
710  cond = wait > self.limit
711  if not firstEv[k]:
712  cond = wait > self.limitFirst
713  firstEv[k] = True
714  if cond:
715  # It is hanging!
716  self.log.critical("Single event wait : %5.2f" % (wait))
717  self.processHang()
718  return FAILURE
719 
720  # Termination Criteria : if all procs have been removed, we're done
721  if self.checkLastEvents():
722  self.log.info("TC met for event loop")
723  break
724  else:
725  # sleep, loop again
726  time.sleep(self.step)
727 
728  self.log.info("All processes Completed all Events ok")
729  return SUCCESS
730 
731  def processHang(self):
732  self.log.critical("Some proc is hanging during Event processing!")
733  for k in self.keys:
734  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
735  return
736 
737  def checkAll(self):
738  # Check the status of each Sync object
739  # return True or False
740  currentStatus = [mini.check() for mini in self.d.values()]
741  return all(currentStatus)
742 
743  def checkLastEvents(self):
744  # check if all of the lastEvents are set to true in self.d[k][1]
745  stat = [sMini.checkLast() for sMini in self.d.values()]
746  return all(stat)
747 
748 
749 # =========================== Methods =========================================
750 
751 
752 def getEventNumber(evt):
753  # The class-independent version of the Event Number Retrieval method
754  #
755  n = None
756  # First Attempt : Unpacked Event Data
757  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
758  for l in lst:
759  try:
760  n = evt[l].evtNumber()
761  return n
762  except:
763  # No evt number at this path
764  continue
765 
766  # second attepmt : try DAQ/RawEvent data
767  # The Evt Number is in bank type 16, bank 0, data pt 4
768  try:
769  n = evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
770  return n
771  except:
772  pass
773 
774  # Default Action
775  return n
776 
777 
778 # ================================= EOF =======================================
GaudiMP.pTools.HistoAgent.bookDataObject
def bookDataObject(self, n, o)
Definition: pTools.py:152
GaudiMP.pTools.PackedCaloHypo.__repr__
def __repr__(self)
Definition: pTools.py:569
AlgSequencer.all
all
Definition: AlgSequencer.py:61
GaudiMP.pTools.LumiFSR.files
files
Definition: pTools.py:481
GaudiMP.pTools.HistoAgent.log
log
Definition: pTools.py:67
GaudiMP.pTools.HistoAgent.Receive
def Receive(self)
Definition: pTools.py:86
GaudiMP.pTools.SyncMini.getTime
def getTime(self)
Definition: pTools.py:611
GaudiMP.pTools.Syncer.keys
keys
Definition: pTools.py:643
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:240
GaudiTests.Histograms.axes_labels.check
def check(causes, result)
Definition: axes_labels.py:47
GaudiMP.pTools.FileRecordsAgent.MergeFSRobject
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:394
GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:434
GaudiMP.pTools.HistoAgent.bookTProfile
def bookTProfile(self, n, o)
Definition: pTools.py:206
details::size
constexpr auto size(const T &, Args &&...) noexcept
Definition: AnyDataWrapper.h:22
GaudiMP.pTools.SyncMini.__repr__
def __repr__(self)
Definition: pTools.py:617
GaudiMP.pTools.Syncer.limit
limit
Definition: pTools.py:633
GaudiMP.pTools.HistoAgent._gmpc
_gmpc
Definition: pTools.py:63
GaudiMP.pTools.PackedCaloHypo.lastCluster
lastCluster
Definition: pTools.py:562
GaudiMP.pTools.SyncMini.event
event
Definition: pTools.py:595
GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:412
GaudiMP.pTools.HistoAgent.bookTH2D
def bookTH2D(self, n, o)
Definition: pTools.py:171
GaudiMP.pTools.HistoAgent.bookTProfile2D
def bookTProfile2D(self, n, o)
Definition: pTools.py:220
GaudiMP.pTools.PackedCaloHypo.firstDigit
firstDigit
Definition: pTools.py:558
GaudiMP.pTools.Syncer.syncAll
def syncAll(self, step="Not specified")
Definition: pTools.py:649
GaudiMP.pTools.Syncer.step
step
Definition: pTools.py:634
gaudirun.output
output
Definition: gaudirun.py:521
GaudiMP.pTools.PackedCaloHypo.lastHypo
lastHypo
Definition: pTools.py:564
GaudiMP.pTools.HistoAgent.bookingDict
bookingDict
Definition: pTools.py:71
GaudiMP.pTools.SyncMini.checkLast
def checkLast(self)
Definition: pTools.py:604
GaudiMP.pTools.aida2root
aida2root
Definition: pTools.py:56
GaudiMP.pTools.FileRecordsAgent.fsr
fsr
Definition: pTools.py:243
GaudiMP.pTools.SyncMini.lastEvent
lastEvent
Definition: pTools.py:597
GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:438
GaudiMP.pTools.FileRecordsAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:241
GaudiMP.pTools.PackedCaloHypo.lastDigit
lastDigit
Definition: pTools.py:563
GaudiMP.pTools.HistoAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:62
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:444
GaudiMP.pTools.getEventNumber
def getEventNumber(evt)
Definition: pTools.py:752
GaudiMP.pTools.SyncMini.__init__
def __init__(self, event, lastEvent=None)
Definition: pTools.py:594
GaudiMP.pTools.LumiFSR.keys
keys
Definition: pTools.py:483
GaudiMP.pTools.Syncer
Definition: pTools.py:630
GaudiMP.pTools.PackedCaloHypo.cerr
cerr
Definition: pTools.py:555
GaudiMP.pTools.Syncer.processHang
def processHang(self)
Definition: pTools.py:731
GaudiMP.pTools.Syncer.__init__
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:631
GaudiMP.pTools.LumiFSR.runs
runs
Definition: pTools.py:480
GaudiMP.pTools.PackedCaloHypo.hypothesis
hypothesis
Definition: pTools.py:560
GaudiMP.pTools.PackedCaloHypo.key
key
Definition: pTools.py:561
GaudiMP.pTools.SyncMini
Definition: pTools.py:593
GaudiMP.pTools.Syncer.log
log
Definition: pTools.py:645
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.pTools.FileRecordsAgent.Receive
def Receive(self)
Definition: pTools.py:325
GaudiMP.pTools.Syncer.syncAllRolling
def syncAllRolling(self)
Definition: pTools.py:676
GaudiMP.pTools.SyncMini.reset
def reset(self)
Definition: pTools.py:607
GaudiMP.pTools.FileRecordsAgent.localCmp
def localCmp(self, tupA, tupB)
Definition: pTools.py:249
GaudiMP.pTools.HistoAgent.histos
histos
Definition: pTools.py:65
GaudiMP.pTools.LumiFSR.merge
def merge(self, otherLumi)
Definition: pTools.py:504
GaudiMP.pTools.PackedCaloHypo.cov
cov
Definition: pTools.py:556
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.pTools.PackedCaloHypo.centX
centX
Definition: pTools.py:553
GaudiMP.pTools.Syncer.checkAll
def checkAll(self)
Definition: pTools.py:737
GaudiMP.pTools.FileRecordsAgent.Rebuild
def Rebuild(self)
Definition: pTools.py:345
GaudiMP.pTools.Syncer.checkLastEvents
def checkLastEvents(self)
Definition: pTools.py:743
GaudiMP.pTools.PackedCaloHypo.z
z
Definition: pTools.py:567
GaudiMP.pTools.HistoAgent.bookTH1D
def bookTH1D(self, n, o)
Definition: pTools.py:158
GaudiMP.pTools.HistoAgent.qin
qin
Definition: pTools.py:66
GaudiMP.pTools.LumiFSR.__init__
def __init__(self, lumi)
Definition: pTools.py:473
GaudiMP.pTools.Syncer.limitFirst
limitFirst
Definition: pTools.py:641
GaudiMP.pTools.SyncMini.t
t
Definition: pTools.py:596
GaudiMP.pTools.PackedCaloHypo.__init__
def __init__(self, o)
Definition: pTools.py:550
GaudiMP.pTools.PackedCaloHypo.firstHypo
firstHypo
Definition: pTools.py:559
GaudiMP.pTools.HistoAgent.bookTH3D
def bookTH3D(self, n, o)
Definition: pTools.py:187
GaudiMP.pTools.LumiFSR.info
info
Definition: pTools.py:482
GaudiMP.pTools.Syncer.manyEvents
manyEvents
Definition: pTools.py:636
GaudiMP.pTools.PackedCaloHypo.lh
lh
Definition: pTools.py:565
GaudiMP.pTools.FileRecordsAgent.objectsIn
objectsIn
Definition: pTools.py:246
GaudiMP.pTools.LumiFSR.__repr__
def __repr__(self)
Definition: pTools.py:531
GaudiMP.pTools.FileRecordsAgent.q
q
Definition: pTools.py:244
GaudiMP.pTools.HistoAgent.hvt
hvt
Definition: pTools.py:64
GaudiMP.pTools.FileRecordsAgent._gmpc
_gmpc
Definition: pTools.py:242
GaudiMP.pTools.FileRecordsAgent.objectsOut
objectsOut
Definition: pTools.py:247
GaudiMP.pTools.PackedCaloHypo.pos
pos
Definition: pTools.py:566
GaudiMP.pTools.PackedCaloHypo.firstCluster
firstCluster
Definition: pTools.py:557
GaudiMP.pTools.FileRecordsAgent.SendFileRecords
def SendFileRecords(self)
Definition: pTools.py:262
GaudiMP.pTools.PackedCaloHypo
Definition: pTools.py:549
GaudiMP.pTools.SyncMini.set
def set(self)
Definition: pTools.py:614
GaudiMP.pTools.PackedCaloHypo.centY
centY
Definition: pTools.py:554
GaudiMP.pTools.Syncer.d
d
Definition: pTools.py:635
GaudiMP.pTools.LumiFSR
Definition: pTools.py:472
GaudiMP.pTools.FileRecordsAgent.log
log
Definition: pTools.py:245
GaudiMP.pTools.HistoAgent.register
def register(self, tup)
Definition: pTools.py:81
GaudiMP.pTools.HistoAgent.RebuildHistoStore
def RebuildHistoStore(self)
Definition: pTools.py:101
GaudiPython.Persistency.add
def add(instance)
Definition: Persistency.py:49
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:173
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiMP.pTools.SyncMini.check
def check(self)
Definition: pTools.py:601
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiMP.pTools.Syncer.nWorkers
nWorkers
Definition: pTools.py:644