Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v36r16 (ea80daf8)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
pTools.py
Go to the documentation of this file.
1 
11 import pickle
12 import time
13 from multiprocessing import Event
14 
15 from GaudiPython import FAILURE, SUCCESS, gbl
16 
17 # Eoin Smith
18 # 3 Aug 2010
19 
20 #
21 # This script contains the ancillary classes and functions used in the
22 # GaudiPython Parallel model
23 #
24 # Classes :
25 # - HistoAgent : In charge of extracting Histograms from their Transient Store
26 # on a reader/worker, communicating them to the writer, and on
27 # the writer receiving them and rebuilding a single store
28 #
29 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
30 #
31 # - LumiFSR : FSR data from different workers needs to be carefully merged to
32 # replicate the serial version; this class aids in that task by
33 # representing an LHCb LumiFSR object as a python class
34 #
35 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
36 # differences in serial and parallel output
37 #
38 # - Syncer : This class is responsible for syncing processes for a specified
39 # section of execution. For example, one Syncer object might be
40 # syncing Initialisation, one for Running, one for Finalisation.
41 # Syncer uses multiprocessing.Event() objects as flags which are
42 # visible across the N processes sharing them.
43 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
44 # ensure that there is no hanging; they in effect, allow a timeout
45 # for Initialisation, Run, Finalise on all processes
46 #
47 # - SyncMini : A class wrapper for a multiprocessing.Event() object
48 #
49 # Methods :
50 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
51 # ( AppMgr().evtsvc() ) to this to get the current
52 # Event Number as an integer (even from RawEvents!)
53 #
54 
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # =========================== Classes =========================================
59 
60 
61 class HistoAgent:
62  def __init__(self, gmpComponent):
63  self._gmpc = gmpComponent
64  self.hvt = self._gmpc.hvt
65  self.histos = []
66  self.qin = self._gmpc.hq
67  self.log = self._gmpc.log
68 
69  # There are many methods for booking Histogram Objects to Histo store
70  # here they are collected in a dictionary, with key = a relevant name
71  self.bookingDict = {}
72  self.bookingDict["DataObject"] = self.bookDataObject
73  self.bookingDict["NTuple::Directory"] = self.bookDataObject
74  self.bookingDict["NTuple::File"] = self.bookDataObject
75  self.bookingDict["TH1D"] = self.bookTH1D
76  self.bookingDict["TH2D"] = self.bookTH2D
77  self.bookingDict["TH3D"] = self.bookTH3D
78  self.bookingDict["TProfile"] = self.bookTProfile
79  self.bookingDict["TProfile2D"] = self.bookTProfile2D
80 
81  def register(self, tup):
82  # add a tuple of (worker-id, histoDict) to self.histos
83  assert tup.__class__.__name__ == "tuple"
84  self.histos.append(tup)
85 
86  def Receive(self):
87  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
88  while True:
89  tup = self.qin.get()
90  if tup == "HISTOS_SENT":
91  self.log.debug("received HISTOS_SENT message")
92  hstatus -= 1
93  if not hstatus:
94  break
95  else:
96  self.register(tup)
97  self._gmpc.sEvent.set()
98  self.log.info("Writer received all histo bundles and set sync event")
99  return SUCCESS
100 
101  def RebuildHistoStore(self):
102  """
103  Rebuild the Histogram Store from the histos received by Receive()
104  If we have a histo which is not in the store,
105  book and fill it according to self.bookingDict
106  If we have a histo with a matching histo in the store,
107  add the two histos, remembering that aida2root must be used on
108  the Stored histo for compatibility.
109  """
110  errors = 0
111  for tup in self.histos:
112  workerID, histDict = tup
113  added = 0
114  booked = 0
115 
116  for n in histDict.keys():
117  o = histDict[n]
118  obj = self.hvt.retrieve(n)
119 
120  if obj:
121  try:
122  aida2root(obj).Add(o)
123  except Exception:
124  self.log.warning("FAILED TO ADD : %s" % (str(obj)))
125  errors += 1
126  added += 1
127  else:
128 
129  if o.__class__.__name__ in self.bookingDict.keys():
130  try:
131  self.bookingDict[o.__class__.__name__](n, o)
132  except Exception:
133  self.log.warning(
134  "FAILED TO REGISTER : %s\tto%s"
135  % (o.__class__.__name__, n)
136  )
137  errors += 1
138  else:
139  self.log.warning(
140  "No booking method for: %s\t%s\t%s"
141  % (n, type(o), o.__class__.__name__)
142  )
143  errors += 1
144  booked += 1
145  hs = self.hvt.getHistoNames()
146  self.log.info("Histo Store Rebuilt : ")
147  self.log.info(" Contains %i objects." % (len(hs)))
148  self.log.info(" Errors in Rebuilding : %i" % (errors))
149  return SUCCESS
150 
151  def bookDataObject(self, n, o):
152  """
153  Register a DataObject to the Histo Store
154  """
155  self._gmpc.hvt.registerObject(n, o)
156 
157  def bookTH1D(self, n, o):
158  """
159  Register a ROOT 1D THisto to the Histo Store
160  """
161  obj = self.hvt._ihs.book(
162  n,
163  o.GetTitle(),
164  o.GetXaxis().GetNbins(),
165  o.GetXaxis().GetXmin(),
166  o.GetXaxis().GetXmax(),
167  )
168  aida2root(obj).Add(o)
169 
170  def bookTH2D(self, n, o):
171  """
172  Register a ROOT 2D THisto to the Histo Store
173  """
174  obj = self.hvt._ihs.book(
175  n,
176  o.GetTitle(),
177  o.GetXaxis().GetNbins(),
178  o.GetXaxis().GetXmin(),
179  o.GetXaxis().GetXmax(),
180  o.GetYaxis().GetNbins(),
181  o.GetYaxis().GetXmin(),
182  o.GetYaxis().GetXmax(),
183  )
184  aida2root(obj).Add(o)
185 
186  def bookTH3D(self, n, o):
187  """
188  Register a ROOT 3D THisto to the Histo Store
189  """
190  obj = self.hvt._ihs.book(
191  n,
192  o.GetTitle(),
193  o.GetXaxis().GetXbins(),
194  o.GetXaxis().GetXmin(),
195  o.GetXaxis().GetXmax(),
196  o.GetYaxis().GetXbins(),
197  o.GetYaxis().GetXmin(),
198  o.GetYaxis().GetXmax(),
199  o.GetZaxis().GetXbins(),
200  o.GetZaxis().GetXmin(),
201  o.GetZaxis().GetXmax(),
202  )
203  aida2root(obj).Add(o)
204 
205  def bookTProfile(self, n, o):
206  """
207  Register a ROOT TProfile to the Histo Store
208  """
209  obj = self.hvt._ihs.bookProf(
210  n,
211  o.GetTitle(),
212  o.GetXaxis().GetNbins(),
213  o.GetXaxis().GetXmin(),
214  o.GetXaxis().GetXmax(),
215  o.GetOption(),
216  )
217  aida2root(obj).Add(o)
218 
219  def bookTProfile2D(self, n, o):
220  """
221  Register a ROOT TProfile2D to the Histo Store
222  """
223  obj = self.hvt._ihs.bookProf(
224  n,
225  o.GetTitle(),
226  o.GetXaxis().GetNbins(),
227  o.GetXaxis().GetXmin(),
228  o.GetXaxis().GetXmax(),
229  o.GetYaxis().GetNbins(),
230  o.GetYaxis().GetXmin(),
231  o.GetYaxis().GetXmax(),
232  )
233  aida2root(obj).Add(o)
234 
235 
236 # =============================================================================
237 
238 
240  def __init__(self, gmpComponent):
241  self._gmpc = gmpComponent
242  self.fsr = self._gmpc.fsr
243  self.q = self._gmpc.fq
244  self.log = self._gmpc.log
245  self.objectsIn = [] # used for collecting FSR store objects
246  self.objectsOut = []
247 
248  def localCmp(self, tupA, tupB):
249  # sort tuples by a particular element
250  # for the sort() method
251  ind = 0
252  valA = tupA[ind]
253  valB = tupB[ind]
254  if valA < valB:
255  return -1
256  elif valA > valB:
257  return 1
258  else:
259  return 0
260 
261  def SendFileRecords(self):
262  # send the FileRecords data as part of finalisation
263 
264  # Take Care of FileRecords!
265  # There are two main things to consider here
266  # 1) The DataObjects in the FileRecords Transient Store
267  # 2) The fact that they are Keyed Containers, containing other objects
268  #
269  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
270  # a completeness guarantee,
271  #
272  # send in form ( nodeID, path, object)
273  self.log.info("Sending FileRecords...")
274  lst = self.fsr.getHistoNames()
275 
276  # Check Validity
277  if not lst:
278  self.log.info("No FileRecords Data to send to Writer.")
279  self.q.put("END_FSR")
280  return SUCCESS
281 
282  # no need to send the root node
283  if "/FileRecords" in lst:
284  lst.remove("/FileRecords")
285 
286  for l in lst:
287  o = self.fsr.retrieveObject(l)
288  if hasattr(o, "configureDirectAccess"):
289  o.configureDirectAccess()
290  # lead worker sends everything, as completeness guarantee
291  if self._gmpc.nodeID == 0:
292  self.objectsOut.append((0, l, pickle.dumps(o)))
293  else:
294  # only add the Event Counter
295  # and non-Empty Keyed Containers (ignore empty ones)
296  if l == "/FileRecords/EventCountFSR":
297  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
298  self.objectsOut.append(tup)
299  elif "KeyedContainer" in o.__class__.__name__:
300  # It's a Keyed Container
301  nObjects = o.numberOfObjects()
302  if nObjects:
303  self.log.debug(
304  "Keyed Container %s with %i objects" % (l, nObjects)
305  )
306  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
307  self.objectsOut.append(tup)
308  else:
309  self.log.info("Ignoring %s in FSR" % o.__class__.__name__)
310 
311  self.log.debug("Done with FSR store, just to send to Writer.")
312 
313  if self.objectsOut:
314  self.log.debug("%i FSR objects to Writer" % (len(self.objectsOut)))
315  for ob in self.objectsOut:
316  self.log.debug("\t%s" % (ob[0]))
317  self.q.put(self.objectsOut)
318  else:
319  self.log.info("Valid FSR Store, but no data to send to Writer")
320  self.log.info("SendFSR complete")
321  self.q.put("END_FSR")
322  return SUCCESS
323 
324  def Receive(self):
325  # Receive contents of all Workers FileRecords Transient Stores
326  self.log.info("Receiving FSR store data...")
327  nc = self._gmpc.nWorkers
328  while nc > 0:
329  objects = self.q.get()
330  if objects == "END_FSR":
331  nc -= 1
332  continue
333  if nc == 0:
334  break
335  # but if it's regular objects...
336  for o in objects:
337  self.objectsIn.append(o)
338  # Now sort it by which worker it came from
339  # an object is : (nodeID, path, pickledObject)
340  self.objectsIn.sort(cmp=self.localCmp)
341  self.log.info("All FSR data received")
342  return SUCCESS
343 
344  def Rebuild(self):
345  # objects is a list of (path, serializedObject) tuples
346  for sourceNode, path, serialob in self.objectsIn:
347  self.log.debug("Working with %s" % (path))
348  ob = pickle.loads(serialob)
349  if hasattr(ob, "update"):
350  ob.update()
351  if hasattr(ob, "numberOfObjects"):
352  nCont = ob.numberOfObjects()
353  self.log.debug(
354  "\t %s has containedObjects : %i" % (type(ob).__name__, nCont)
355  )
356  if sourceNode == 0:
357  self.log.debug("Registering Object to : %s" % (path))
358  self.fsr.registerObject(path, ob)
359  else:
360  self.log.debug("Merging Object to : %s" % (path))
361  self.MergeFSRobject(sourceNode, path, ob)
362  # As RecordStream has been split into Worker and Writer parts, the
363  # count for output is wrong... fix that here, as every event received
364  # by the writer is written (validation testing occurs on the worker)
365 
366  self.log.info("FSR Store Rebuilt. Correcting EventCountFSR")
367  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
368  ecount = "/FileRecords/EventCountFSR"
369  if self.fsr[ecount]:
370  self.fsr[ecount].setOutput(self._gmpc.nIn)
371  self.log.info(
372  "Event Counter Output set : %s : %i"
373  % (ecount, self.fsr[ecount].output())
374  )
375  # Do some reporting
376  self.log.debug("FSR store reconstructed!")
377  lst = self.fsr.getHistoNames()
378  if lst:
379  for l in lst:
380  ob = self.fsr.retrieveObject(l)
381  if hasattr(ob, "configureDirectAccess"):
382  ob.configureDirectAccess()
383  if hasattr(ob, "containedObjects"):
384  # if ob.numberOfObjects() :
385  self.log.debug(
386  "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
387  )
388  else:
389  self.log.debug("\t%s" % (l))
390  self.log.info("FSR Store fully rebuilt.")
391  return SUCCESS
392 
393  def MergeFSRobject(self, sourceNode, path, ob):
394  # Merge Non-Empty Keyed Container from Worker>0
395  if path == "/FileRecords/TimeSpanFSR":
396  # TimeSpanFSR is a straightforward case
397  self.ProcessTimeSpanFSR(path, ob)
398  elif path == "/FileRecords/EventCountFSR":
399  # Event Counter is also easy
400  self.ProcessEventCountFSR(path, ob)
401  # now other cases may not be so easy...
402  elif (
403  "KeyedContainer" in ob.__class__.__name__
404  and "LumiFSR" in ob.__class__.__name__
405  ):
406  # Keyed Container of LumiFSRs : extract and re-register
407  self.MergeLumiFSR(path, ob)
408  else:
409  self.log.info("Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
410 
411  def ProcessTimeSpanFSR(self, path, ob):
412  ob2 = self.fsr.retrieveObject(path)
413  if ob.containedObjects().size():
414  sz = ob.containedObjects().size()
415  cob = ob2.containedObjects()[0]
416  min = cob.earliest()
417  max = cob.latest()
418  for j in range(sz):
419  cob = ob.containedObjects()[j]
420  self.log.debug("Adding TimeSpanFSR")
421  if cob.earliest() < min:
422  min = cob.earliest()
423  if cob.latest() > max:
424  max = cob.latest()
425  # this is annoying: it has to be rebuilt, without a key & added
426  continue
427  tsfsr = gbl.LHCb.TimeSpanFSR()
428  tsfsr.setEarliest(min)
429  tsfsr.setLatest(max)
430  self.fsr[path].clear()
431  self.fsr[path].add(tsfsr)
432 
433  def ProcessEventCountFSR(self, path, ob):
434  self.log.debug("Event Count Input Addition")
435  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
436 
437  def MergeLumiFSR(self, path, keyedC):
438  from ROOT import string
439 
440  # Fetch the first lumi
441  keyedContainer = self.fsr.retrieveObject(path)
442  # The LumiFSR KeyedContainers only have one object
443  assert keyedContainer.numberOfObjects() == 1
444  l = keyedContainer.containedObject(0)
445  baseLumi = LumiFSR(l)
446  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
447  nCont = keyedC.numberOfObjects()
448  for i in range(nCont):
449  obj = keyedC.containedObject(i)
450  nextLumi = LumiFSR(obj)
451  baseLumi.merge(nextLumi)
452  # Now Rebuild and ReRegister
453  newLumi = gbl.LHCb.LumiFSR()
454  for r in baseLumi.runs:
455  newLumi.addRunNumber(r)
456  for f in baseLumi.files:
457  newLumi.addFileID(string(f))
458  for k in baseLumi.keys:
459  increment, integral = baseLumi.info[k]
460  newLumi.addInfo(k, increment, integral)
461  # clear existing Keyed Container
462  self.fsr[path].clear()
463  # Add newly merged lumiFSR
464  self.fsr[path].add(newLumi)
465  return SUCCESS
466 
467 
468 # =============================================================================
469 
470 
471 class LumiFSR:
472  def __init__(self, lumi):
473  # lumi looks like :
474  # { runs : 69857 69858
475  # files : root:/castor/cer.../069857_0000000006.raw
476  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
477 
478  # class variables
479  self.runs = []
480  self.files = []
481  self.info = {}
482  self.keys = []
483 
484  # get run numbers
485  for r in lumi.runNumbers():
486  self.runs.append(r)
487  # get file ids
488  for f in lumi.fileIDs():
489  self.files.append(f)
490  # Now the tricky bit, the info is not accessible via Python
491  # except as a string
492  s = str(lumi)
493  sa = s.split("info (key/incr/integral) : ")[-1]
494  sa = sa.split("/")[:-1]
495  for rec in sa:
496  k, i, t = rec.split()
497  k = int(k)
498  i = int(i)
499  t = int(t)
500  self.info[k] = (i, t)
501  self.keys = self.info.keys()
502 
503  def merge(self, otherLumi):
504  assert otherLumi.__class__.__name__ == "LumiFSR"
505  # add any extra runs
506  for r in otherLumi.runs:
507  if r in self.runs:
508  pass
509  else:
510  self.runs.append(r)
511  self.runs.sort()
512  # add any extra fileIDs
513  for f in otherLumi.files:
514  if f in self.files:
515  pass
516  else:
517  self.files.append(f)
518  self.files.sort()
519  # Now add any extra records
520  for k in otherLumi.keys:
521  increment, integral = otherLumi.info[k]
522  if k in self.keys:
523  myIncrement, myIntegral = self.info[k]
524  self.info[k] = (myIncrement + increment, myIntegral + integral)
525  else:
526  self.info[k] = (increment, integral)
527  # don't forget to update keys
528  self.keys = self.info.keys()
529 
530  def __repr__(self):
531  s = "LumiFSR Python class\n"
532  s += "\tRuns : \n"
533  for r in self.runs:
534  s += "\t\t%i\n" % (r)
535  s += "\tFiles : \n"
536  for f in self.files:
537  s += "\t\t%s\n" % (f)
538  s += "\tInfo : \n"
539  for k in self.keys:
540  increment, integral = self.info[k]
541  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
542  return s
543 
544 
545 # =============================================================================
546 
547 
549  def __init__(self, o):
550  cl = "LHCb::PackedCaloHypo"
551  assert o.__class__.__name__ == cl
552  self.centX = o.centX
553  self.centY = o.centY
554  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
555  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
556  self.firstCluster = o.firstCluster
557  self.firstDigit = o.firstDigit
558  self.firstHypo = o.firstHypo
559  self.hypothesis = o.hypothesis
560  self.key = o.key
561  self.lastCluster = o.lastCluster
562  self.lastDigit = o.lastDigit
563  self.lastHypo = o.lastHypo
564  self.lh = o.lh
565  self.pos = (o.posE, o.posX, o.posY)
566  self.z = o.z
567 
568  def __repr__(self):
569  s = "PackedCaloHypo : \n"
570  s += "\tcentX : %s\n" % (str(self.centX))
571  s += "\tcentY : %s\n" % (str(self.centY))
572  s += "\tcerr : %s\n" % (str(self.cerr))
573  s += "\tcov : %s\n" % (str(self.cov))
574  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
575  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
576  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
577  s += "\thypothesis : %s\n" % (str(self.hypothesis))
578  s += "\tkey : %s\n" % (str(self.key))
579  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
580  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
581  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
582  s += "\tlh : %s\n" % (str(self.lh))
583  s += "\tpos : %s\n" % (str(self.pos))
584  s += "\tz : %s\n" % (str(self.z))
585  s += "---------------------------------------\n"
586  return s
587 
588 
589 # =============================================================================
590 
591 
592 class SyncMini(object):
593  def __init__(self, event, lastEvent=None):
594  self.event = event
595  self.t = 0.0
596  self.lastEvent = None
597  if lastEvent:
598  self.lastEvent = lastEvent
599 
600  def check(self):
601  return self.event.is_set()
602 
603  def checkLast(self):
604  return self.lastEvent.is_set()
605 
606  def reset(self):
607  self.event.clear()
608  self.t = time.time()
609 
610  def getTime(self):
611  return self.t
612 
613  def set(self):
614  self.event.set()
615 
616  def __repr__(self):
617  s = "---------- SyncMini --------------\n"
618  s += " Status : %s\n" % (self.event.is_set())
619  s += " t : %5.2f\n" % (self.t)
620  if self.lastEvent:
621  s += "Last Event : %s\n" % (self.lastEvent.is_set())
622  s += "----------------------------------\n"
623  return s
624 
625 
626 # =============================================================================
627 
628 
629 class Syncer(object):
630  def __init__(
631  self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
632  ):
633  # Class to help synchronise the sub-processes
634  self.limit = limit
635  self.step = step
636  self.d = {}
637  self.manyEvents = manyEvents
638 
639  for i in range(-2, nWorkers):
640  self.d[i] = SyncMini(Event(), lastEvent=Event())
641  if self.manyEvents:
642  self.limitFirst = firstEvent
643 
644  self.keys = self.d.keys()
645  self.nWorkers = nWorkers
646  self.log = log
647 
648  def syncAll(self, step="Not specified"):
649  # is it this method, or is it the rolling version needed?
650  # if so, drop through...
651 
652  if self.manyEvents:
653  sc = self.syncAllRolling()
654  return sc
655 
656  # Regular version ----------------------------
657  for i in range(0, self.limit, self.step):
658  if self.checkAll():
659  self.log.info("%s : All procs done @ %i s" % (step, i))
660  break
661  else:
662  time.sleep(self.step)
663 
664  # Now the time limit is up... check the status one final time
665  if self.checkAll():
666  self.log.info("All processes : %s ok." % (step))
667  return SUCCESS
668  else:
669  self.log.critical("Some process is hanging on : %s" % (step))
670  for k in self.keys:
671  hangString = "%s : Proc/Stat : %i/%s" % (step, k, self.d[k].check())
672  self.log.critical(hangString)
673  return FAILURE
674 
675  def syncAllRolling(self):
676  # Keep track of the progress of Event processing
677  # Each process syncs after each event, so keep clearing
678  # the sync Event, and re-checking
679  # Note the time between True checks too, if the time
680  # between events exceeds singleEvent, this is considered a hang
681 
682  # set the initial time
683  begin = time.time()
684  firstEv = {}
685  timers = {}
686  for k in self.keys:
687  self.d[k].reset()
688  firstEv[k] = False
689  timers[k] = 0.0
690 
691  active = self.keys
692  while True:
693  # check the status of each sync object
694  for k in active:
695  sMini = self.d[k]
696 
697  if sMini.check() or sMini.checkLast():
698  if sMini.checkLast() and sMini.check():
699  # if last Event set,then event loop finished
700  active.remove(k)
701  alive = time.time() - begin
702  self.log.info("Audit : Node %i alive for %5.2f" % (k, alive))
703  else:
704  sMini.reset()
705  else:
706  # the event still has not been checked, how long is that?
707  # is it the first Event?
708  wait = time.time() - sMini.getTime()
709  cond = wait > self.limit
710  if not firstEv[k]:
711  cond = wait > self.limitFirst
712  firstEv[k] = True
713  if cond:
714  # It is hanging!
715  self.log.critical("Single event wait : %5.2f" % (wait))
716  self.processHang()
717  return FAILURE
718 
719  # Termination Criteria : if all procs have been removed, we're done
720  if self.checkLastEvents():
721  self.log.info("TC met for event loop")
722  break
723  else:
724  # sleep, loop again
725  time.sleep(self.step)
726 
727  self.log.info("All processes Completed all Events ok")
728  return SUCCESS
729 
730  def processHang(self):
731  self.log.critical("Some proc is hanging during Event processing!")
732  for k in self.keys:
733  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
734  return
735 
736  def checkAll(self):
737  # Check the status of each Sync object
738  # return True or False
739  currentStatus = [mini.check() for mini in self.d.values()]
740  return all(currentStatus)
741 
742  def checkLastEvents(self):
743  # check if all of the lastEvents are set to true in self.d[k][1]
744  stat = [sMini.checkLast() for sMini in self.d.values()]
745  return all(stat)
746 
747 
748 # =========================== Methods =========================================
749 
750 
751 def getEventNumber(evt):
752  # The class-independent version of the Event Number Retrieval method
753  #
754  n = None
755  # First Attempt : Unpacked Event Data
756  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
757  for l in lst:
758  try:
759  n = evt[l].evtNumber()
760  return n
761  except Exception:
762  # No evt number at this path
763  continue
764 
765  # second attepmt : try DAQ/RawEvent data
766  # The Evt Number is in bank type 16, bank 0, data pt 4
767  try:
768  n = evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
769  return n
770  except Exception:
771  pass
772 
773  # Default Action
774  return n
775 
776 
777 # ================================= EOF =======================================
GaudiMP.pTools.HistoAgent.bookDataObject
def bookDataObject(self, n, o)
Definition: pTools.py:151
GaudiMP.pTools.PackedCaloHypo.__repr__
def __repr__(self)
Definition: pTools.py:568
AlgSequencer.all
all
Definition: AlgSequencer.py:61
GaudiMP.pTools.LumiFSR.files
files
Definition: pTools.py:480
GaudiMP.pTools.HistoAgent.log
log
Definition: pTools.py:67
GaudiMP.pTools.HistoAgent.Receive
def Receive(self)
Definition: pTools.py:86
GaudiMP.pTools.SyncMini.getTime
def getTime(self)
Definition: pTools.py:610
GaudiMP.pTools.Syncer.keys
keys
Definition: pTools.py:642
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:239
GaudiTests.Histograms.axes_labels.check
def check(causes, result)
Definition: axes_labels.py:47
GaudiMP.pTools.FileRecordsAgent.MergeFSRobject
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:393
GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:433
GaudiMP.pTools.HistoAgent.bookTProfile
def bookTProfile(self, n, o)
Definition: pTools.py:205
details::size
constexpr auto size(const T &, Args &&...) noexcept
Definition: AnyDataWrapper.h:22
GaudiMP.pTools.SyncMini.__repr__
def __repr__(self)
Definition: pTools.py:616
GaudiMP.pTools.Syncer.limit
limit
Definition: pTools.py:632
GaudiMP.pTools.HistoAgent._gmpc
_gmpc
Definition: pTools.py:63
GaudiMP.pTools.PackedCaloHypo.lastCluster
lastCluster
Definition: pTools.py:561
GaudiMP.pTools.SyncMini.event
event
Definition: pTools.py:594
GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:411
GaudiMP.pTools.HistoAgent.bookTH2D
def bookTH2D(self, n, o)
Definition: pTools.py:170
GaudiMP.pTools.HistoAgent.bookTProfile2D
def bookTProfile2D(self, n, o)
Definition: pTools.py:219
GaudiMP.pTools.PackedCaloHypo.firstDigit
firstDigit
Definition: pTools.py:557
GaudiMP.pTools.Syncer.syncAll
def syncAll(self, step="Not specified")
Definition: pTools.py:648
GaudiMP.pTools.Syncer.step
step
Definition: pTools.py:633
gaudirun.output
output
Definition: gaudirun.py:523
GaudiMP.pTools.PackedCaloHypo.lastHypo
lastHypo
Definition: pTools.py:563
GaudiMP.pTools.HistoAgent.bookingDict
bookingDict
Definition: pTools.py:71
GaudiMP.pTools.SyncMini.checkLast
def checkLast(self)
Definition: pTools.py:603
GaudiMP.pTools.aida2root
aida2root
Definition: pTools.py:56
GaudiMP.pTools.FileRecordsAgent.fsr
fsr
Definition: pTools.py:242
GaudiMP.pTools.SyncMini.lastEvent
lastEvent
Definition: pTools.py:596
GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:437
GaudiMP.pTools.FileRecordsAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:240
GaudiMP.pTools.PackedCaloHypo.lastDigit
lastDigit
Definition: pTools.py:562
GaudiMP.pTools.HistoAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:62
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:444
GaudiMP.pTools.getEventNumber
def getEventNumber(evt)
Definition: pTools.py:751
GaudiMP.pTools.SyncMini.__init__
def __init__(self, event, lastEvent=None)
Definition: pTools.py:593
GaudiMP.pTools.LumiFSR.keys
keys
Definition: pTools.py:482
GaudiMP.pTools.Syncer
Definition: pTools.py:629
GaudiMP.pTools.PackedCaloHypo.cerr
cerr
Definition: pTools.py:554
GaudiMP.pTools.Syncer.processHang
def processHang(self)
Definition: pTools.py:730
GaudiMP.pTools.Syncer.__init__
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:630
GaudiMP.pTools.LumiFSR.runs
runs
Definition: pTools.py:479
GaudiMP.pTools.PackedCaloHypo.hypothesis
hypothesis
Definition: pTools.py:559
GaudiMP.pTools.PackedCaloHypo.key
key
Definition: pTools.py:560
GaudiMP.pTools.SyncMini
Definition: pTools.py:592
GaudiMP.pTools.Syncer.log
log
Definition: pTools.py:644
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.pTools.FileRecordsAgent.Receive
def Receive(self)
Definition: pTools.py:324
GaudiMP.pTools.Syncer.syncAllRolling
def syncAllRolling(self)
Definition: pTools.py:675
GaudiMP.pTools.SyncMini.reset
def reset(self)
Definition: pTools.py:606
GaudiMP.pTools.FileRecordsAgent.localCmp
def localCmp(self, tupA, tupB)
Definition: pTools.py:248
GaudiMP.pTools.HistoAgent.histos
histos
Definition: pTools.py:65
GaudiMP.pTools.LumiFSR.merge
def merge(self, otherLumi)
Definition: pTools.py:503
GaudiMP.pTools.PackedCaloHypo.cov
cov
Definition: pTools.py:555
gaudirun.type
type
Definition: gaudirun.py:162
GaudiMP.pTools.PackedCaloHypo.centX
centX
Definition: pTools.py:552
GaudiMP.pTools.Syncer.checkAll
def checkAll(self)
Definition: pTools.py:736
StringKeyEx.keys
keys
Definition: StringKeyEx.py:64
GaudiMP.pTools.FileRecordsAgent.Rebuild
def Rebuild(self)
Definition: pTools.py:344
GaudiMP.pTools.Syncer.checkLastEvents
def checkLastEvents(self)
Definition: pTools.py:742
GaudiMP.pTools.PackedCaloHypo.z
z
Definition: pTools.py:566
GaudiMP.pTools.HistoAgent.bookTH1D
def bookTH1D(self, n, o)
Definition: pTools.py:157
GaudiMP.pTools.HistoAgent.qin
qin
Definition: pTools.py:66
GaudiMP.pTools.LumiFSR.__init__
def __init__(self, lumi)
Definition: pTools.py:472
GaudiMP.pTools.Syncer.limitFirst
limitFirst
Definition: pTools.py:640
GaudiMP.pTools.SyncMini.t
t
Definition: pTools.py:595
GaudiMP.pTools.PackedCaloHypo.__init__
def __init__(self, o)
Definition: pTools.py:549
GaudiMP.pTools.PackedCaloHypo.firstHypo
firstHypo
Definition: pTools.py:558
GaudiMP.pTools.HistoAgent.bookTH3D
def bookTH3D(self, n, o)
Definition: pTools.py:186
GaudiMP.pTools.LumiFSR.info
info
Definition: pTools.py:481
GaudiMP.pTools.Syncer.manyEvents
manyEvents
Definition: pTools.py:635
GaudiMP.pTools.PackedCaloHypo.lh
lh
Definition: pTools.py:564
GaudiMP.pTools.FileRecordsAgent.objectsIn
objectsIn
Definition: pTools.py:245
GaudiMP.pTools.LumiFSR.__repr__
def __repr__(self)
Definition: pTools.py:530
GaudiMP.pTools.FileRecordsAgent.q
q
Definition: pTools.py:243
GaudiMP.pTools.HistoAgent.hvt
hvt
Definition: pTools.py:64
GaudiMP.pTools.FileRecordsAgent._gmpc
_gmpc
Definition: pTools.py:241
GaudiMP.pTools.FileRecordsAgent.objectsOut
objectsOut
Definition: pTools.py:246
GaudiMP.pTools.PackedCaloHypo.pos
pos
Definition: pTools.py:565
GaudiMP.pTools.PackedCaloHypo.firstCluster
firstCluster
Definition: pTools.py:556
GaudiMP.pTools.FileRecordsAgent.SendFileRecords
def SendFileRecords(self)
Definition: pTools.py:261
GaudiMP.pTools.PackedCaloHypo
Definition: pTools.py:548
GaudiMP.pTools.SyncMini.set
def set(self)
Definition: pTools.py:613
GaudiMP.pTools.PackedCaloHypo.centY
centY
Definition: pTools.py:553
GaudiMP.pTools.Syncer.d
d
Definition: pTools.py:634
GaudiMP.pTools.LumiFSR
Definition: pTools.py:471
GaudiMP.pTools.FileRecordsAgent.log
log
Definition: pTools.py:244
GaudiMP.pTools.HistoAgent.register
def register(self, tup)
Definition: pTools.py:81
GaudiMP.pTools.HistoAgent.RebuildHistoStore
def RebuildHistoStore(self)
Definition: pTools.py:101
GaudiPython.Persistency.add
def add(instance)
Definition: Persistency.py:49
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:173
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiMP.pTools.SyncMini.check
def check(self)
Definition: pTools.py:600
GaudiMP.pTools.Syncer.nWorkers
nWorkers
Definition: pTools.py:643