The Gaudi Framework  v36r1 (3e2fb5a8)
pTools.py
Go to the documentation of this file.
1 
11 from GaudiPython import gbl, SUCCESS, FAILURE
12 from multiprocessing import Event
13 import pickle
14 import time
15 
16 # Eoin Smith
17 # 3 Aug 2010
18 
19 #
20 # This script contains the ancillary classes and functions used in the
21 # GaudiPython Parallel model
22 #
23 # Classes :
24 # - HistoAgent : In charge of extracting Histograms from their Transient Store
25 # on a reader/worker, communicating them to the writer, and on
26 # the writer receiving them and rebuilding a single store
27 #
28 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
29 #
30 # - LumiFSR : FSR data from different workers needs to be carefully merged to
31 # replicate the serial version; this class aids in that task by
32 # representing an LHCb LumiFSR object as a python class
33 #
34 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
35 # differences in serial and parallel output
36 #
37 # - Syncer : This class is responsible for syncing processes for a specified
38 # section of execution. For example, one Syncer object might be
39 # syncing Initialisation, one for Running, one for Finalisation.
40 # Syncer uses multiprocessing.Event() objects as flags which are
41 # visible across the N processes sharing them.
42 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
43 # ensure that there is no hanging; they in effect, allow a timeout
44 # for Initialisation, Run, Finalise on all processes
45 #
46 # - SyncMini : A class wrapper for a multiprocessing.Event() object
47 #
48 # Methods :
49 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
50 # ( AppMgr().evtsvc() ) to this to get the current
51 # Event Number as an integer (even from RawEvents!)
52 #
53 
54 # used to convert stored histos (in AIDA format) to ROOT format
55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
56 
57 # =========================== Classes =========================================
58 
59 
60 class HistoAgent():
61  def __init__(self, gmpComponent):
62  self._gmpc = gmpComponent
63  self.hvt = self._gmpc.hvt
64  self.histos = []
65  self.qin = self._gmpc.hq
66  self.log = self._gmpc.log
67 
68  # There are many methods for booking Histogram Objects to Histo store
69  # here they are collected in a dictionary, with key = a relevant name
70  self.bookingDict = {}
71  self.bookingDict['DataObject'] = self.bookDataObject
72  self.bookingDict['NTuple::Directory'] = self.bookDataObject
73  self.bookingDict['NTuple::File'] = self.bookDataObject
74  self.bookingDict['TH1D'] = self.bookTH1D
75  self.bookingDict['TH2D'] = self.bookTH2D
76  self.bookingDict['TH3D'] = self.bookTH3D
77  self.bookingDict['TProfile'] = self.bookTProfile
78  self.bookingDict['TProfile2D'] = self.bookTProfile2D
79 
80  def register(self, tup):
81  # add a tuple of (worker-id, histoDict) to self.histos
82  assert tup.__class__.__name__ == 'tuple'
83  self.histos.append(tup)
84 
85  def Receive(self):
86  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
87  while True:
88  tup = self.qin.get()
89  if tup == 'HISTOS_SENT':
90  self.log.debug('received HISTOS_SENT message')
91  hstatus -= 1
92  if not hstatus:
93  break
94  else:
95  self.register(tup)
96  self._gmpc.sEvent.set()
97  self.log.info('Writer received all histo bundles and set sync event')
98  return SUCCESS
99 
100  def RebuildHistoStore(self):
101  '''
102  Rebuild the Histogram Store from the histos received by Receive()
103  If we have a histo which is not in the store,
104  book and fill it according to self.bookingDict
105  If we have a histo with a matching histo in the store,
106  add the two histos, remembering that aida2root must be used on
107  the Stored histo for compatibility.
108  '''
109  errors = 0
110  for tup in self.histos:
111  workerID, histDict = tup
112  added = 0
113  registered = 0
114  booked = 0
115 
116  for n in histDict.keys():
117  o = histDict[n]
118  obj = self.hvt.retrieve(n)
119 
120  if obj:
121  try:
122  aida2root(obj).Add(o)
123  except:
124  self.log.warning('FAILED TO ADD : %s' % (str(obj)))
125  errors += 1
126  added += 1
127  else:
128 
129  if o.__class__.__name__ in self.bookingDict.keys():
130  try:
131  self.bookingDict[o.__class__.__name__](n, o)
132  except:
133  self.log.warning('FAILED TO REGISTER : %s\tto%s' %
134  (o.__class__.__name__, n))
135  errors += 1
136  else:
137  self.log.warning('No booking method for: %s\t%s\t%s' %
138  (n, type(o), o.__class__.__name__))
139  errors += 1
140  booked += 1
141  hs = self.hvt.getHistoNames()
142  self.log.info('Histo Store Rebuilt : ')
143  self.log.info(' Contains %i objects.' % (len(hs)))
144  self.log.info(' Errors in Rebuilding : %i' % (errors))
145  return SUCCESS
146 
147  def bookDataObject(self, n, o):
148  '''
149  Register a DataObject to the Histo Store
150  '''
151  self._gmpc.hvt.registerObject(n, o)
152 
153  def bookTH1D(self, n, o):
154  '''
155  Register a ROOT 1D THisto to the Histo Store
156  '''
157  obj = self.hvt._ihs.book(n, o.GetTitle(),
158  o.GetXaxis().GetNbins(),
159  o.GetXaxis().GetXmin(),
160  o.GetXaxis().GetXmax())
161  aida2root(obj).Add(o)
162 
163  def bookTH2D(self, n, o):
164  '''
165  Register a ROOT 2D THisto to the Histo Store
166  '''
167  obj = self.hvt._ihs.book(n, o.GetTitle(),
168  o.GetXaxis().GetNbins(),
169  o.GetXaxis().GetXmin(),
170  o.GetXaxis().GetXmax(),
171  o.GetYaxis().GetNbins(),
172  o.GetYaxis().GetXmin(),
173  o.GetYaxis().GetXmax())
174  aida2root(obj).Add(o)
175 
176  def bookTH3D(self, n, o):
177  '''
178  Register a ROOT 3D THisto to the Histo Store
179  '''
180  obj = self.hvt._ihs.book(n, o.GetTitle(),
181  o.GetXaxis().GetXbins(),
182  o.GetXaxis().GetXmin(),
183  o.GetXaxis().GetXmax(),
184  o.GetYaxis().GetXbins(),
185  o.GetYaxis().GetXmin(),
186  o.GetYaxis().GetXmax(),
187  o.GetZaxis().GetXbins(),
188  o.GetZaxis().GetXmin(),
189  o.GetZaxis().GetXmax())
190  aida2root(obj).Add(o)
191 
192  def bookTProfile(self, n, o):
193  '''
194  Register a ROOT TProfile to the Histo Store
195  '''
196  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
197  o.GetXaxis().GetNbins(),
198  o.GetXaxis().GetXmin(),
199  o.GetXaxis().GetXmax(), o.GetOption())
200  aida2root(obj).Add(o)
201 
202  def bookTProfile2D(self, n, o):
203  '''
204  Register a ROOT TProfile2D to the Histo Store
205  '''
206  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
207  o.GetXaxis().GetNbins(),
208  o.GetXaxis().GetXmin(),
209  o.GetXaxis().GetXmax(),
210  o.GetYaxis().GetNbins(),
211  o.GetYaxis().GetXmin(),
212  o.GetYaxis().GetXmax())
213  aida2root(obj).Add(o)
214 
215 
216 # =============================================================================
217 
218 
220  def __init__(self, gmpComponent):
221  self._gmpc = gmpComponent
222  self.fsr = self._gmpc.fsr
223  self.q = self._gmpc.fq
224  self.log = self._gmpc.log
225  self.objectsIn = [] # used for collecting FSR store objects
226  self.objectsOut = []
227 
228  def localCmp(self, tupA, tupB):
229  # sort tuples by a particular element
230  # for the sort() method
231  ind = 0
232  valA = tupA[ind]
233  valB = tupB[ind]
234  if valA < valB:
235  return -1
236  elif valA > valB:
237  return 1
238  else:
239  return 0
240 
241  def SendFileRecords(self):
242  # send the FileRecords data as part of finalisation
243 
244  # Take Care of FileRecords!
245  # There are two main things to consider here
246  # 1) The DataObjects in the FileRecords Transient Store
247  # 2) The fact that they are Keyed Containers, containing other objects
248  #
249  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
250  # a completeness guarantee,
251  #
252  # send in form ( nodeID, path, object)
253  self.log.info('Sending FileRecords...')
254  lst = self.fsr.getHistoNames()
255 
256  # Check Validity
257  if not lst:
258  self.log.info('No FileRecords Data to send to Writer.')
259  self.q.put('END_FSR')
260  return SUCCESS
261 
262  # no need to send the root node
263  if '/FileRecords' in lst:
264  lst.remove('/FileRecords')
265 
266  for l in lst:
267  o = self.fsr.retrieveObject(l)
268  if hasattr(o, "configureDirectAccess"):
269  o.configureDirectAccess()
270  # lead worker sends everything, as completeness guarantee
271  if self._gmpc.nodeID == 0:
272  self.objectsOut.append((0, l, pickle.dumps(o)))
273  else:
274  # only add the Event Counter
275  # and non-Empty Keyed Containers (ignore empty ones)
276  if l == '/FileRecords/EventCountFSR':
277  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
278  self.objectsOut.append(tup)
279  elif "KeyedContainer" in o.__class__.__name__:
280  # It's a Keyed Container
281  nObjects = o.numberOfObjects()
282  if nObjects:
283  self.log.debug("Keyed Container %s with %i objects" %
284  (l, nObjects))
285  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
286  self.objectsOut.append(tup)
287  else:
288  self.log.info('Ignoring %s in FSR' % o.__class__.__name__)
289 
290  self.log.debug('Done with FSR store, just to send to Writer.')
291 
292  if self.objectsOut:
293  self.log.debug('%i FSR objects to Writer' % (len(self.objectsOut)))
294  for ob in self.objectsOut:
295  self.log.debug('\t%s' % (ob[0]))
296  self.q.put(self.objectsOut)
297  else:
298  self.log.info('Valid FSR Store, but no data to send to Writer')
299  self.log.info('SendFSR complete')
300  self.q.put('END_FSR')
301  return SUCCESS
302 
303  def Receive(self):
304  # Receive contents of all Workers FileRecords Transient Stores
305  self.log.info('Receiving FSR store data...')
306  nc = self._gmpc.nWorkers
307  while nc > 0:
308  objects = self.q.get()
309  if objects == 'END_FSR':
310  nc -= 1
311  continue
312  if nc == 0:
313  break
314  # but if it's regular objects...
315  for o in objects:
316  self.objectsIn.append(o)
317  # Now sort it by which worker it came from
318  # an object is : (nodeID, path, pickledObject)
319  self.objectsIn.sort(cmp=self.localCmp)
320  self.log.info('All FSR data received')
321  return SUCCESS
322 
323  def Rebuild(self):
324  # objects is a list of (path, serializedObject) tuples
325  for sourceNode, path, serialob in self.objectsIn:
326  self.log.debug('Working with %s' % (path))
327  ob = pickle.loads(serialob)
328  if hasattr(ob, 'update'):
329  ob.update()
330  if hasattr(ob, 'numberOfObjects'):
331  nCont = ob.numberOfObjects()
332  self.log.debug('\t %s has containedObjects : %i' %
333  (type(ob).__name__, nCont))
334  if sourceNode == 0:
335  self.log.debug('Registering Object to : %s' % (path))
336  self.fsr.registerObject(path, ob)
337  else:
338  self.log.debug('Merging Object to : %s' % (path))
339  self.MergeFSRobject(sourceNode, path, ob)
340  # As RecordStream has been split into Worker and Writer parts, the
341  # count for output is wrong... fix that here, as every event received
342  # by the writer is written (validation testing occurs on the worker)
343 
344  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
345  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
346  ecount = '/FileRecords/EventCountFSR'
347  if self.fsr[ecount]:
348  self.fsr[ecount].setOutput(self._gmpc.nIn)
349  self.log.info('Event Counter Output set : %s : %i' %
350  (ecount, self.fsr[ecount].output()))
351  # Do some reporting
352  self.log.debug('FSR store reconstructed!')
353  lst = self.fsr.getHistoNames()
354  if lst:
355  for l in lst:
356  ob = self.fsr.retrieveObject(l)
357  if hasattr(ob, 'configureDirectAccess'):
358  ob.configureDirectAccess()
359  if hasattr(ob, 'containedObjects'):
360  # if ob.numberOfObjects() :
361  self.log.debug('\t%s (cont. objects : %i)' %
362  (l, ob.numberOfObjects()))
363  else:
364  self.log.debug('\t%s' % (l))
365  self.log.info('FSR Store fully rebuilt.')
366  return SUCCESS
367 
368  def MergeFSRobject(self, sourceNode, path, ob):
369  # Merge Non-Empty Keyed Container from Worker>0
370  if path == '/FileRecords/TimeSpanFSR':
371  # TimeSpanFSR is a straightforward case
372  self.ProcessTimeSpanFSR(path, ob)
373  elif path == '/FileRecords/EventCountFSR':
374  # Event Counter is also easy
375  self.ProcessEventCountFSR(path, ob)
376  # now other cases may not be so easy...
377  elif "KeyedContainer" in ob.__class__.__name__ and "LumiFSR" in ob.__class__.__name__:
378  # Keyed Container of LumiFSRs : extract and re-register
379  self.MergeLumiFSR(path, ob)
380  else:
381  self.log.info(
382  "Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
383 
384  def ProcessTimeSpanFSR(self, path, ob):
385  ob2 = self.fsr.retrieveObject(path)
386  if ob.containedObjects().size():
387  sz = ob.containedObjects().size()
388  cob = ob2.containedObjects()[0]
389  min = cob.earliest()
390  max = cob.latest()
391  for j in range(sz):
392  cob = ob.containedObjects()[j]
393  self.log.debug('Adding TimeSpanFSR')
394  if cob.earliest() < min:
395  min = cob.earliest()
396  if cob.latest() > max:
397  max = cob.latest()
398  # this is annoying: it has to be rebuilt, without a key & added
399  continue
400  tsfsr = gbl.LHCb.TimeSpanFSR()
401  tsfsr.setEarliest(min)
402  tsfsr.setLatest(max)
403  self.fsr[path].clear()
404  self.fsr[path].add(tsfsr)
405 
406  def ProcessEventCountFSR(self, path, ob):
407  self.log.debug('Event Count Input Addition')
408  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
409 
410  def MergeLumiFSR(self, path, keyedC):
411  from ROOT import string
412  # Fetch the first lumi
413  keyedContainer = self.fsr.retrieveObject(path)
414  # The LumiFSR KeyedContainers only have one object
415  assert keyedContainer.numberOfObjects() == 1
416  l = keyedContainer.containedObject(0)
417  baseLumi = LumiFSR(l)
418  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
419  nCont = keyedC.numberOfObjects()
420  for i in range(nCont):
421  obj = keyedC.containedObject(i)
422  nextLumi = LumiFSR(obj)
423  baseLumi.merge(nextLumi)
424  # Now Rebuild and ReRegister
425  newLumi = gbl.LHCb.LumiFSR()
426  for r in baseLumi.runs:
427  newLumi.addRunNumber(r)
428  for f in baseLumi.files:
429  newLumi.addFileID(string(f))
430  for k in baseLumi.keys:
431  increment, integral = baseLumi.info[k]
432  newLumi.addInfo(k, increment, integral)
433  # clear existing Keyed Container
434  self.fsr[path].clear()
435  # Add newly merged lumiFSR
436  self.fsr[path].add(newLumi)
437  return SUCCESS
438 
439 
440 # =============================================================================
441 
442 
443 class LumiFSR():
444  def __init__(self, lumi):
445  # lumi looks like :
446  # { runs : 69857 69858
447  # files : root:/castor/cer.../069857_0000000006.raw
448  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
449 
450  # class variables
451  self.runs = []
452  self.files = []
453  self.info = {}
454  self.keys = []
455 
456  # get run numbers
457  for r in lumi.runNumbers():
458  self.runs.append(r)
459  # get file ids
460  for f in lumi.fileIDs():
461  self.files.append(f)
462  # Now the tricky bit, the info is not accessible via Python
463  # except as a string
464  s = str(lumi)
465  sa = s.split("info (key/incr/integral) : ")[-1]
466  sa = sa.split('/')[:-1]
467  for rec in sa:
468  k, i, t = rec.split()
469  k = int(k)
470  i = int(i)
471  t = int(t)
472  self.info[k] = (i, t)
473  self.keys = self.info.keys()
474 
475  def merge(self, otherLumi):
476  assert otherLumi.__class__.__name__ == "LumiFSR"
477  # add any extra runs
478  for r in otherLumi.runs:
479  if r in self.runs:
480  pass
481  else:
482  self.runs.append(r)
483  self.runs.sort()
484  # add any extra fileIDs
485  for f in otherLumi.files:
486  if f in self.files:
487  pass
488  else:
489  self.files.append(f)
490  self.files.sort()
491  # Now add any extra records
492  for k in otherLumi.keys:
493  increment, integral = otherLumi.info[k]
494  if k in self.keys:
495  myIncrement, myIntegral = self.info[k]
496  self.info[k] = (myIncrement + increment, myIntegral + integral)
497  else:
498  self.info[k] = (increment, integral)
499  # don't forget to update keys
500  self.keys = self.info.keys()
501 
502  def __repr__(self):
503  s = "LumiFSR Python class\n"
504  s += "\tRuns : \n"
505  for r in self.runs:
506  s += "\t\t%i\n" % (r)
507  s += "\tFiles : \n"
508  for f in self.files:
509  s += "\t\t%s\n" % (f)
510  s += "\tInfo : \n"
511  for k in self.keys:
512  increment, integral = self.info[k]
513  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
514  return s
515 
516 
517 # =============================================================================
518 
519 
521  def __init__(self, o):
522  cl = 'LHCb::PackedCaloHypo'
523  assert o.__class__.__name__ == cl
524  self.centX = o.centX
525  self.centY = o.centY
526  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
527  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
528  self.firstCluster = o.firstCluster
529  self.firstDigit = o.firstDigit
530  self.firstHypo = o.firstHypo
531  self.hypothesis = o.hypothesis
532  self.key = o.key
533  self.lastCluster = o.lastCluster
534  self.lastDigit = o.lastDigit
535  self.lastHypo = o.lastHypo
536  self.lh = o.lh
537  self.pos = (o.posE, o.posX, o.posY)
538  self.z = o.z
539 
540  def __repr__(self):
541  s = "PackedCaloHypo : \n"
542  s += "\tcentX : %s\n" % (str(self.centX))
543  s += "\tcentY : %s\n" % (str(self.centY))
544  s += "\tcerr : %s\n" % (str(self.cerr))
545  s += "\tcov : %s\n" % (str(self.cov))
546  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
547  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
548  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
549  s += "\thypothesis : %s\n" % (str(self.hypothesis))
550  s += "\tkey : %s\n" % (str(self.key))
551  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
552  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
553  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
554  s += "\tlh : %s\n" % (str(self.lh))
555  s += "\tpos : %s\n" % (str(self.pos))
556  s += "\tz : %s\n" % (str(self.z))
557  s += "---------------------------------------\n"
558  return s
559 
560 
561 # =============================================================================
562 
563 
564 class SyncMini(object):
565  def __init__(self, event, lastEvent=None):
566  self.event = event
567  self.t = 0.0
568  self.lastEvent = None
569  if lastEvent:
570  self.lastEvent = lastEvent
571 
572  def check(self):
573  return self.event.is_set()
574 
575  def checkLast(self):
576  return self.lastEvent.is_set()
577 
578  def reset(self):
579  self.event.clear()
580  self.t = time.time()
581 
582  def getTime(self):
583  return self.t
584 
585  def set(self):
586  self.event.set()
587 
588  def __repr__(self):
589  s = "---------- SyncMini --------------\n"
590  s += " Status : %s\n" % (self.event.is_set())
591  s += " t : %5.2f\n" % (self.t)
592  if self.lastEvent:
593  s += "Last Event : %s\n" % (self.lastEvent.is_set())
594  s += "----------------------------------\n"
595  return s
596 
597 
598 # =============================================================================
599 
600 
601 class Syncer(object):
602  def __init__(self,
603  nWorkers,
604  log,
605  manyEvents=False,
606  limit=None,
607  step=None,
608  firstEvent=None):
609  # Class to help synchronise the sub-processes
610  self.limit = limit
611  self.step = step
612  self.d = {}
613  self.manyEvents = manyEvents
614 
615  for i in range(-2, nWorkers):
616  self.d[i] = SyncMini(Event(), lastEvent=Event())
617  if self.manyEvents:
618  self.limitFirst = firstEvent
619 
620  self.keys = self.d.keys()
621  self.nWorkers = nWorkers
622  self.log = log
623 
624  def syncAll(self, step="Not specified"):
625  # is it this method, or is it the rolling version needed?
626  # if so, drop through...
627 
628  if self.manyEvents:
629  sc = self.syncAllRolling()
630  return sc
631 
632  # Regular version ----------------------------
633  for i in range(0, self.limit, self.step):
634  if self.checkAll():
635  self.log.info('%s : All procs done @ %i s' % (step, i))
636  break
637  else:
638  time.sleep(self.step)
639 
640  # Now the time limit is up... check the status one final time
641  if self.checkAll():
642  self.log.info("All processes : %s ok." % (step))
643  return SUCCESS
644  else:
645  self.log.critical('Some process is hanging on : %s' % (step))
646  for k in self.keys:
647  hangString = "%s : Proc/Stat : %i/%s" % (step, k,
648  self.d[k].check())
649  self.log.critical(hangString)
650  return FAILURE
651 
652  def syncAllRolling(self):
653  # Keep track of the progress of Event processing
654  # Each process syncs after each event, so keep clearing
655  # the sync Event, and re-checking
656  # Note the time between True checks too, if the time
657  # between events exceeds singleEvent, this is considered a hang
658 
659  # set the initial time
660  begin = time.time()
661  firstEv = {}
662  timers = {}
663  for k in self.keys:
664  self.d[k].reset()
665  firstEv[k] = False
666  timers[k] = 0.0
667 
668  active = self.keys
669  while True:
670  # check the status of each sync object
671  for k in active:
672  sMini = self.d[k]
673 
674  if sMini.check() or sMini.checkLast():
675  if sMini.checkLast() and sMini.check():
676  # if last Event set,then event loop finished
677  active.remove(k)
678  alive = time.time() - begin
679  self.log.info(
680  "Audit : Node %i alive for %5.2f" % (k, alive))
681  else:
682  sMini.reset()
683  else:
684  # the event still has not been checked, how long is that?
685  # is it the first Event?
686  wait = time.time() - sMini.getTime()
687  cond = wait > self.limit
688  if not firstEv[k]:
689  cond = wait > self.limitFirst
690  firstEv[k] = True
691  if cond:
692  # It is hanging!
693  self.log.critical('Single event wait : %5.2f' % (wait))
694  self.processHang()
695  return FAILURE
696 
697  # Termination Criteria : if all procs have been removed, we're done
698  if self.checkLastEvents():
699  self.log.info('TC met for event loop')
700  break
701  else:
702  # sleep, loop again
703  time.sleep(self.step)
704 
705  self.log.info("All processes Completed all Events ok")
706  return SUCCESS
707 
708  def processHang(self):
709  self.log.critical('Some proc is hanging during Event processing!')
710  for k in self.keys:
711  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
712  return
713 
714  def checkAll(self):
715  # Check the status of each Sync object
716  # return True or False
717  currentStatus = [mini.check() for mini in self.d.values()]
718  return all(currentStatus)
719 
720  def checkLastEvents(self):
721  # check if all of the lastEvents are set to true in self.d[k][1]
722  stat = [sMini.checkLast() for sMini in self.d.values()]
723  return all(stat)
724 
725 
726 # =========================== Methods =========================================
727 
728 
729 def getEventNumber(evt):
730  # The class-independent version of the Event Number Retrieval method
731  #
732  n = None
733  # First Attempt : Unpacked Event Data
734  lst = ['/Event/Gen/Header', '/Event/Rec/Header']
735  for l in lst:
736  try:
737  n = evt[l].evtNumber()
738  return n
739  except:
740  # No evt number at this path
741  continue
742 
743  # second attepmt : try DAQ/RawEvent data
744  # The Evt Number is in bank type 16, bank 0, data pt 4
745  try:
746  n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
747  return n
748  except:
749  pass
750 
751  # Default Action
752  return n
753 
754 
755 # ================================= EOF =======================================
GaudiMP.pTools.HistoAgent.bookDataObject
def bookDataObject(self, n, o)
Definition: pTools.py:147
GaudiMP.pTools.PackedCaloHypo.__repr__
def __repr__(self)
Definition: pTools.py:540
AlgSequencer.all
all
Definition: AlgSequencer.py:54
GaudiMP.pTools.LumiFSR.files
files
Definition: pTools.py:452
GaudiMP.pTools.HistoAgent.log
log
Definition: pTools.py:66
GaudiMP.pTools.HistoAgent.Receive
def Receive(self)
Definition: pTools.py:85
GaudiMP.pTools.SyncMini.getTime
def getTime(self)
Definition: pTools.py:582
GaudiMP.pTools.Syncer.keys
keys
Definition: pTools.py:614
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:219
GaudiTests.Histograms.axes_labels.check
def check(causes, result)
Definition: axes_labels.py:45
GaudiMP.pTools.FileRecordsAgent.MergeFSRobject
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:368
GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:406
GaudiMP.pTools.HistoAgent.bookTProfile
def bookTProfile(self, n, o)
Definition: pTools.py:192
details::size
constexpr auto size(const T &, Args &&...) noexcept
Definition: AnyDataWrapper.h:22
GaudiMP.pTools.SyncMini.__repr__
def __repr__(self)
Definition: pTools.py:588
GaudiMP.pTools.Syncer.limit
limit
Definition: pTools.py:604
GaudiMP.pTools.HistoAgent._gmpc
_gmpc
Definition: pTools.py:62
GaudiMP.pTools.PackedCaloHypo.lastCluster
lastCluster
Definition: pTools.py:533
GaudiMP.pTools.SyncMini.event
event
Definition: pTools.py:566
GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:384
GaudiMP.pTools.HistoAgent.bookTH2D
def bookTH2D(self, n, o)
Definition: pTools.py:163
GaudiMP.pTools.HistoAgent.bookTProfile2D
def bookTProfile2D(self, n, o)
Definition: pTools.py:202
GaudiMP.pTools.PackedCaloHypo.firstDigit
firstDigit
Definition: pTools.py:529
GaudiMP.pTools.Syncer.syncAll
def syncAll(self, step="Not specified")
Definition: pTools.py:624
GaudiMP.pTools.Syncer.step
step
Definition: pTools.py:605
gaudirun.output
output
Definition: gaudirun.py:506
GaudiMP.pTools.PackedCaloHypo.lastHypo
lastHypo
Definition: pTools.py:535
GaudiMP.pTools.HistoAgent.bookingDict
bookingDict
Definition: pTools.py:70
GaudiMP.pTools.SyncMini.checkLast
def checkLast(self)
Definition: pTools.py:575
GaudiMP.pTools.aida2root
aida2root
Definition: pTools.py:55
GaudiMP.pTools.FileRecordsAgent.fsr
fsr
Definition: pTools.py:222
GaudiMP.pTools.SyncMini.lastEvent
lastEvent
Definition: pTools.py:568
GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:410
GaudiMP.pTools.FileRecordsAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:220
GaudiMP.pTools.PackedCaloHypo.lastDigit
lastDigit
Definition: pTools.py:534
GaudiMP.pTools.HistoAgent.__init__
def __init__(self, gmpComponent)
Definition: pTools.py:61
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:391
GaudiMP.pTools.getEventNumber
def getEventNumber(evt)
Definition: pTools.py:729
GaudiMP.pTools.SyncMini.__init__
def __init__(self, event, lastEvent=None)
Definition: pTools.py:565
GaudiMP.pTools.LumiFSR.keys
keys
Definition: pTools.py:454
GaudiMP.pTools.Syncer
Definition: pTools.py:601
GaudiMP.pTools.PackedCaloHypo.cerr
cerr
Definition: pTools.py:526
GaudiMP.pTools.Syncer.processHang
def processHang(self)
Definition: pTools.py:708
GaudiMP.pTools.Syncer.__init__
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:602
GaudiMP.pTools.LumiFSR.runs
runs
Definition: pTools.py:451
GaudiMP.pTools.PackedCaloHypo.hypothesis
hypothesis
Definition: pTools.py:531
GaudiMP.pTools.PackedCaloHypo.key
key
Definition: pTools.py:532
GaudiMP.pTools.SyncMini
Definition: pTools.py:564
GaudiMP.pTools.Syncer.log
log
Definition: pTools.py:616
GaudiMP.pTools.HistoAgent
Definition: pTools.py:60
GaudiMP.pTools.FileRecordsAgent.Receive
def Receive(self)
Definition: pTools.py:303
GaudiMP.pTools.Syncer.syncAllRolling
def syncAllRolling(self)
Definition: pTools.py:652
GaudiMP.pTools.SyncMini.reset
def reset(self)
Definition: pTools.py:578
GaudiMP.pTools.FileRecordsAgent.localCmp
def localCmp(self, tupA, tupB)
Definition: pTools.py:228
GaudiMP.pTools.HistoAgent.histos
histos
Definition: pTools.py:64
GaudiMP.pTools.LumiFSR.merge
def merge(self, otherLumi)
Definition: pTools.py:475
GaudiMP.pTools.PackedCaloHypo.cov
cov
Definition: pTools.py:527
gaudirun.type
type
Definition: gaudirun.py:154
GaudiMP.pTools.PackedCaloHypo.centX
centX
Definition: pTools.py:524
GaudiMP.pTools.Syncer.checkAll
def checkAll(self)
Definition: pTools.py:714
GaudiMP.pTools.FileRecordsAgent.Rebuild
def Rebuild(self)
Definition: pTools.py:323
GaudiMP.pTools.Syncer.checkLastEvents
def checkLastEvents(self)
Definition: pTools.py:720
GaudiMP.pTools.PackedCaloHypo.z
z
Definition: pTools.py:538
GaudiMP.pTools.HistoAgent.bookTH1D
def bookTH1D(self, n, o)
Definition: pTools.py:153
GaudiMP.pTools.HistoAgent.qin
qin
Definition: pTools.py:65
GaudiMP.pTools.LumiFSR.__init__
def __init__(self, lumi)
Definition: pTools.py:444
GaudiMP.pTools.Syncer.limitFirst
limitFirst
Definition: pTools.py:612
GaudiMP.pTools.SyncMini.t
t
Definition: pTools.py:567
GaudiMP.pTools.PackedCaloHypo.__init__
def __init__(self, o)
Definition: pTools.py:521
GaudiMP.pTools.PackedCaloHypo.firstHypo
firstHypo
Definition: pTools.py:530
GaudiMP.pTools.HistoAgent.bookTH3D
def bookTH3D(self, n, o)
Definition: pTools.py:176
GaudiMP.pTools.LumiFSR.info
info
Definition: pTools.py:453
GaudiMP.pTools.Syncer.manyEvents
manyEvents
Definition: pTools.py:607
GaudiMP.pTools.PackedCaloHypo.lh
lh
Definition: pTools.py:536
GaudiMP.pTools.FileRecordsAgent.objectsIn
objectsIn
Definition: pTools.py:225
GaudiMP.pTools.LumiFSR.__repr__
def __repr__(self)
Definition: pTools.py:502
GaudiMP.pTools.FileRecordsAgent.q
q
Definition: pTools.py:223
GaudiMP.pTools.HistoAgent.hvt
hvt
Definition: pTools.py:63
GaudiMP.pTools.FileRecordsAgent._gmpc
_gmpc
Definition: pTools.py:221
GaudiMP.pTools.FileRecordsAgent.objectsOut
objectsOut
Definition: pTools.py:226
GaudiMP.pTools.PackedCaloHypo.pos
pos
Definition: pTools.py:537
GaudiMP.pTools.PackedCaloHypo.firstCluster
firstCluster
Definition: pTools.py:528
GaudiMP.pTools.FileRecordsAgent.SendFileRecords
def SendFileRecords(self)
Definition: pTools.py:241
GaudiMP.pTools.PackedCaloHypo
Definition: pTools.py:520
GaudiMP.pTools.SyncMini.set
def set(self)
Definition: pTools.py:585
GaudiMP.pTools.PackedCaloHypo.centY
centY
Definition: pTools.py:525
GaudiMP.pTools.Syncer.d
d
Definition: pTools.py:606
GaudiMP.pTools.LumiFSR
Definition: pTools.py:443
GaudiMP.pTools.FileRecordsAgent.log
log
Definition: pTools.py:224
GaudiMP.pTools.HistoAgent.register
def register(self, tup)
Definition: pTools.py:80
GaudiMP.pTools.HistoAgent.RebuildHistoStore
def RebuildHistoStore(self)
Definition: pTools.py:100
GaudiPython.Persistency.add
def add(instance)
Definition: Persistency.py:49
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:147
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:97
GaudiMP.pTools.SyncMini.check
def check(self)
Definition: pTools.py:572
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiMP.pTools.Syncer.nWorkers
nWorkers
Definition: pTools.py:615