Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
pTools.py
Go to the documentation of this file.
1 from GaudiPython import gbl, SUCCESS, FAILURE
2 from multiprocessing import Event
3 import pickle
4 import time
5 
6 # Eoin Smith
7 # 3 Aug 2010
8 
9 #
10 # This script contains the ancillary classes and functions used in the
11 # GaudiPython Parallel model
12 #
13 # Classes :
14 # - HistoAgent : In charge of extracting Histograms from their Transient Store
15 # on a reader/worker, communicating them to the writer, and on
16 # the writer receiving them and rebuilding a single store
17 #
18 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
19 #
20 # - LumiFSR : FSR data from different workers needs to be carefully merged to
21 # replicate the serial version; this class aids in that task by
22 # representing an LHCb LumiFSR object as a python class
23 #
24 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
25 # differences in serial and parallel output
26 #
27 # - Syncer : This class is responsible for syncing processes for a specified
28 # section of execution. For example, one Syncer object might be
29 # syncing Initialisation, one for Running, one for Finalisation.
30 # Syncer uses multiprocessing.Event() objects as flags which are
31 # visible across the N processes sharing them.
32 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
33 # ensure that there is no hanging; they in effect, allow a timeout
34 # for Initialisation, Run, Finalise on all processes
35 #
36 # - SyncMini : A class wrapper for a multiprocessing.Event() object
37 #
38 # Methods :
39 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
40 # ( AppMgr().evtsvc() ) to this to get the current
41 # Event Number as an integer (even from RawEvents!)
42 #
43 
44 # used to convert stored histos (in AIDA format) to ROOT format
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
46 
47 # =========================== Classes =========================================
48 
49 
50 class HistoAgent():
51  def __init__(self, gmpComponent):
52  self._gmpc = gmpComponent
53  self.hvt = self._gmpc.hvt
54  self.histos = []
55  self.qin = self._gmpc.hq
56  self.log = self._gmpc.log
57 
58  # There are many methods for booking Histogram Objects to Histo store
59  # here they are collected in a dictionary, with key = a relevant name
60  self.bookingDict = {}
61  self.bookingDict['DataObject'] = self.bookDataObject
62  self.bookingDict['NTuple::Directory'] = self.bookDataObject
63  self.bookingDict['NTuple::File'] = self.bookDataObject
64  self.bookingDict['TH1D'] = self.bookTH1D
65  self.bookingDict['TH2D'] = self.bookTH2D
66  self.bookingDict['TH3D'] = self.bookTH3D
67  self.bookingDict['TProfile'] = self.bookTProfile
68  self.bookingDict['TProfile2D'] = self.bookTProfile2D
69 
70  def register(self, tup):
71  # add a tuple of (worker-id, histoDict) to self.histos
72  assert tup.__class__.__name__ == 'tuple'
73  self.histos.append(tup)
74 
75  def Receive(self):
76  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
77  while True:
78  tup = self.qin.get()
79  if tup == 'HISTOS_SENT':
80  self.log.debug('received HISTOS_SENT message')
81  hstatus -= 1
82  if not hstatus:
83  break
84  else:
85  self.register(tup)
86  self._gmpc.sEvent.set()
87  self.log.info('Writer received all histo bundles and set sync event')
88  return SUCCESS
89 
90  def RebuildHistoStore(self):
91  '''
92  Rebuild the Histogram Store from the histos received by Receive()
93  If we have a histo which is not in the store,
94  book and fill it according to self.bookingDict
95  If we have a histo with a matching histo in the store,
96  add the two histos, remembering that aida2root must be used on
97  the Stored histo for compatibility.
98  '''
99  errors = 0
100  for tup in self.histos:
101  workerID, histDict = tup
102  added = 0
103  registered = 0
104  booked = 0
105 
106  for n in histDict.keys():
107  o = histDict[n]
108  obj = self.hvt.retrieve(n)
109 
110  if obj:
111  try:
112  aida2root(obj).Add(o)
113  except:
114  self.log.warning('FAILED TO ADD : %s' % (str(obj)))
115  errors += 1
116  added += 1
117  else:
118 
119  if o.__class__.__name__ in self.bookingDict.keys():
120  try:
121  self.bookingDict[o.__class__.__name__](n, o)
122  except:
123  self.log.warning('FAILED TO REGISTER : %s\tto%s' %
124  (o.__class__.__name__, n))
125  errors += 1
126  else:
127  self.log.warning('No booking method for: %s\t%s\t%s' %
128  (n, type(o), o.__class__.__name__))
129  errors += 1
130  booked += 1
131  hs = self.hvt.getHistoNames()
132  self.log.info('Histo Store Rebuilt : ')
133  self.log.info(' Contains %i objects.' % (len(hs)))
134  self.log.info(' Errors in Rebuilding : %i' % (errors))
135  return SUCCESS
136 
137  def bookDataObject(self, n, o):
138  '''
139  Register a DataObject to the Histo Store
140  '''
141  self._gmpc.hvt.registerObject(n, o)
142 
143  def bookTH1D(self, n, o):
144  '''
145  Register a ROOT 1D THisto to the Histo Store
146  '''
147  obj = self.hvt._ihs.book(n, o.GetTitle(),
148  o.GetXaxis().GetNbins(),
149  o.GetXaxis().GetXmin(),
150  o.GetXaxis().GetXmax())
151  aida2root(obj).Add(o)
152 
153  def bookTH2D(self, n, o):
154  '''
155  Register a ROOT 2D THisto to the Histo Store
156  '''
157  obj = self.hvt._ihs.book(n, o.GetTitle(),
158  o.GetXaxis().GetNbins(),
159  o.GetXaxis().GetXmin(),
160  o.GetXaxis().GetXmax(),
161  o.GetYaxis().GetNbins(),
162  o.GetYaxis().GetXmin(),
163  o.GetYaxis().GetXmax())
164  aida2root(obj).Add(o)
165 
166  def bookTH3D(self, n, o):
167  '''
168  Register a ROOT 3D THisto to the Histo Store
169  '''
170  obj = self.hvt._ihs.book(n, o.GetTitle(),
171  o.GetXaxis().GetXbins(),
172  o.GetXaxis().GetXmin(),
173  o.GetXaxis().GetXmax(),
174  o.GetYaxis().GetXbins(),
175  o.GetYaxis().GetXmin(),
176  o.GetYaxis().GetXmax(),
177  o.GetZaxis().GetXbins(),
178  o.GetZaxis().GetXmin(),
179  o.GetZaxis().GetXmax())
180  aida2root(obj).Add(o)
181 
182  def bookTProfile(self, n, o):
183  '''
184  Register a ROOT TProfile to the Histo Store
185  '''
186  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
187  o.GetXaxis().GetNbins(),
188  o.GetXaxis().GetXmin(),
189  o.GetXaxis().GetXmax(), o.GetOption())
190  aida2root(obj).Add(o)
191 
192  def bookTProfile2D(self, n, o):
193  '''
194  Register a ROOT TProfile2D to the Histo Store
195  '''
196  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
197  o.GetXaxis().GetNbins(),
198  o.GetXaxis().GetXmin(),
199  o.GetXaxis().GetXmax(),
200  o.GetYaxis().GetNbins(),
201  o.GetYaxis().GetXmin(),
202  o.GetYaxis().GetXmax())
203  aida2root(obj).Add(o)
204 
205 
206 # =============================================================================
207 
208 
210  def __init__(self, gmpComponent):
211  self._gmpc = gmpComponent
212  self.fsr = self._gmpc.fsr
213  self.q = self._gmpc.fq
214  self.log = self._gmpc.log
215  self.objectsIn = [] # used for collecting FSR store objects
216  self.objectsOut = []
217 
218  def localCmp(self, tupA, tupB):
219  # sort tuples by a particular element
220  # for the sort() method
221  ind = 0
222  valA = tupA[ind]
223  valB = tupB[ind]
224  if valA < valB:
225  return -1
226  elif valA > valB:
227  return 1
228  else:
229  return 0
230 
231  def SendFileRecords(self):
232  # send the FileRecords data as part of finalisation
233 
234  # Take Care of FileRecords!
235  # There are two main things to consider here
236  # 1) The DataObjects in the FileRecords Transient Store
237  # 2) The fact that they are Keyed Containers, containing other objects
238  #
239  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
240  # a completeness guarantee,
241  #
242  # send in form ( nodeID, path, object)
243  self.log.info('Sending FileRecords...')
244  lst = self.fsr.getHistoNames()
245 
246  # Check Validity
247  if not lst:
248  self.log.info('No FileRecords Data to send to Writer.')
249  self.q.put('END_FSR')
250  return SUCCESS
251 
252  # no need to send the root node
253  if '/FileRecords' in lst:
254  lst.remove('/FileRecords')
255 
256  for l in lst:
257  o = self.fsr.retrieveObject(l)
258  if hasattr(o, "configureDirectAccess"):
259  o.configureDirectAccess()
260  # lead worker sends everything, as completeness guarantee
261  if self._gmpc.nodeID == 0:
262  self.objectsOut.append((0, l, pickle.dumps(o)))
263  else:
264  # only add the Event Counter
265  # and non-Empty Keyed Containers (ignore empty ones)
266  if l == '/FileRecords/EventCountFSR':
267  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
268  self.objectsOut.append(tup)
269  else:
270  # It's a Keyed Container
271  assert "KeyedContainer" in o.__class__.__name__
272  nObjects = o.numberOfObjects()
273  if nObjects:
274  self.log.debug("Keyed Container %s with %i objects" %
275  (l, nObjects))
276  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
277  self.objectsOut.append(tup)
278  self.log.debug('Done with FSR store, just to send to Writer.')
279 
280  if self.objectsOut:
281  self.log.debug('%i FSR objects to Writer' % (len(self.objectsOut)))
282  for ob in self.objectsOut:
283  self.log.debug('\t%s' % (ob[0]))
284  self.q.put(self.objectsOut)
285  else:
286  self.log.info('Valid FSR Store, but no data to send to Writer')
287  self.log.info('SendFSR complete')
288  self.q.put('END_FSR')
289  return SUCCESS
290 
291  def Receive(self):
292  # Receive contents of all Workers FileRecords Transient Stores
293  self.log.info('Receiving FSR store data...')
294  nc = self._gmpc.nWorkers
295  while nc > 0:
296  objects = self.q.get()
297  if objects == 'END_FSR':
298  nc -= 1
299  continue
300  if nc == 0:
301  break
302  # but if it's regular objects...
303  for o in objects:
304  self.objectsIn.append(o)
305  # Now sort it by which worker it came from
306  # an object is : (nodeID, path, pickledObject)
307  self.objectsIn.sort(cmp=self.localCmp)
308  self.log.info('All FSR data received')
309  return SUCCESS
310 
311  def Rebuild(self):
312  # objects is a list of (path, serializedObject) tuples
313  for sourceNode, path, serialob in self.objectsIn:
314  self.log.debug('Working with %s' % (path))
315  ob = pickle.loads(serialob)
316  if hasattr(ob, 'update'):
317  ob.update()
318  if hasattr(ob, 'numberOfObjects'):
319  nCont = ob.numberOfObjects()
320  self.log.debug('\t %s has containedObjects : %i' %
321  (type(ob).__name__, nCont))
322  if sourceNode == 0:
323  self.log.debug('Registering Object to : %s' % (path))
324  self.fsr.registerObject(path, ob)
325  else:
326  self.log.debug('Merging Object to : %s' % (path))
327  self.MergeFSRobject(sourceNode, path, ob)
328  # As RecordStream has been split into Worker and Writer parts, the
329  # count for output is wrong... fix that here, as every event received
330  # by the writer is written (validation testing occurs on the worker)
331 
332  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
333  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
334  ecount = '/FileRecords/EventCountFSR'
335  if self.fsr[ecount]:
336  self.fsr[ecount].setOutput(self._gmpc.nIn)
337  self.log.info('Event Counter Output set : %s : %i' %
338  (ecount, self.fsr[ecount].output()))
339  # Do some reporting
340  self.log.debug('FSR store reconstructed!')
341  lst = self.fsr.getHistoNames()
342  if lst:
343  for l in lst:
344  ob = self.fsr.retrieveObject(l)
345  if hasattr(ob, 'configureDirectAccess'):
346  ob.configureDirectAccess()
347  if hasattr(ob, 'containedObjects'):
348  # if ob.numberOfObjects() :
349  self.log.debug('\t%s (cont. objects : %i)' %
350  (l, ob.numberOfObjects()))
351  else:
352  self.log.debug('\t%s' % (l))
353  self.log.info('FSR Store fully rebuilt.')
354  return SUCCESS
355 
356  def MergeFSRobject(self, sourceNode, path, ob):
357  # Merge Non-Empty Keyed Container from Worker>0
358  if path == '/FileRecords/TimeSpanFSR':
359  # TimeSpanFSR is a straightforward case
360  self.ProcessTimeSpanFSR(path, ob)
361  elif path == '/FileRecords/EventCountFSR':
362  # Event Counter is also easy
363  self.ProcessEventCountFSR(path, ob)
364  # now other cases may not be so easy...
365  elif "KeyedContainer" in ob.__class__.__name__:
366  # Keyed Container of LumiFSRs : extract and re-register
367  # self.ProcessLumiFSR( path, ob )
368  if "LumiFSR" in ob.__class__.__name__:
369  self.MergeLumiFSR(path, ob)
370  else:
371  self.log.info("Skipping Merge of Keyed Container %s for %s" %
372  (ob.__class__.__name__, path))
373 
374  def ProcessTimeSpanFSR(self, path, ob):
375  ob2 = self.fsr.retrieveObject(path)
376  if ob.containedObjects().size():
377  sz = ob.containedObjects().size()
378  cob = ob2.containedObjects()[0]
379  min = cob.earliest()
380  max = cob.latest()
381  for j in xrange(sz):
382  cob = ob.containedObjects()[j]
383  self.log.debug('Adding TimeSpanFSR')
384  if cob.earliest() < min:
385  min = cob.earliest()
386  if cob.latest() > max:
387  max = cob.latest()
388  # this is annoying: it has to be rebuilt, without a key & added
389  continue
390  tsfsr = gbl.LHCb.TimeSpanFSR()
391  tsfsr.setEarliest(min)
392  tsfsr.setLatest(max)
393  self.fsr[path].clear()
394  self.fsr[path].add(tsfsr)
395 
396  def ProcessEventCountFSR(self, path, ob):
397  self.log.debug('Event Count Input Addition')
398  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
399 
400  def MergeLumiFSR(self, path, keyedC):
401  from ROOT import string
402  # Fetch the first lumi
403  keyedContainer = self.fsr.retrieveObject(path)
404  # The LumiFSR KeyedContainers only have one object
405  assert keyedContainer.numberOfObjects() == 1
406  l = keyedContainer.containedObject(0)
407  baseLumi = LumiFSR(l)
408  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
409  nCont = keyedC.numberOfObjects()
410  for i in xrange(nCont):
411  obj = keyedC.containedObject(i)
412  nextLumi = LumiFSR(obj)
413  baseLumi.merge(nextLumi)
414  # Now Rebuild and ReRegister
415  newLumi = gbl.LHCb.LumiFSR()
416  for r in baseLumi.runs:
417  newLumi.addRunNumber(r)
418  for f in baseLumi.files:
419  newLumi.addFileID(string(f))
420  for k in baseLumi.keys:
421  increment, integral = baseLumi.info[k]
422  newLumi.addInfo(k, increment, integral)
423  # clear existing Keyed Container
424  self.fsr[path].clear()
425  # Add newly merged lumiFSR
426  self.fsr[path].add(newLumi)
427  return SUCCESS
428 
429 
430 # =============================================================================
431 
432 
433 class LumiFSR():
434  def __init__(self, lumi):
435  # lumi looks like :
436  # { runs : 69857 69858
437  # files : root:/castor/cer.../069857_0000000006.raw
438  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
439 
440  # class variables
441  self.runs = []
442  self.files = []
443  self.info = {}
444  self.keys = []
445 
446  # get run numbers
447  for r in lumi.runNumbers():
448  self.runs.append(r)
449  # get file ids
450  for f in lumi.fileIDs():
451  self.files.append(f)
452  # Now the tricky bit, the info is not accessible via Python
453  # except as a string
454  s = str(lumi)
455  sa = s.split("info (key/incr/integral) : ")[-1]
456  sa = sa.split('/')[:-1]
457  for rec in sa:
458  k, i, t = rec.split()
459  k = int(k)
460  i = int(i)
461  t = int(t)
462  self.info[k] = (i, t)
463  self.keys = self.info.keys()
464 
465  def merge(self, otherLumi):
466  assert otherLumi.__class__.__name__ == "LumiFSR"
467  # add any extra runs
468  for r in otherLumi.runs:
469  if r in self.runs:
470  pass
471  else:
472  self.runs.append(r)
473  self.runs.sort()
474  # add any extra fileIDs
475  for f in otherLumi.files:
476  if f in self.files:
477  pass
478  else:
479  self.files.append(f)
480  self.files.sort()
481  # Now add any extra records
482  for k in otherLumi.keys:
483  increment, integral = otherLumi.info[k]
484  if k in self.keys:
485  myIncrement, myIntegral = self.info[k]
486  self.info[k] = (myIncrement + increment, myIntegral + integral)
487  else:
488  self.info[k] = (increment, integral)
489  # don't forget to update keys
490  self.keys = self.info.keys()
491 
492  def __repr__(self):
493  s = "LumiFSR Python class\n"
494  s += "\tRuns : \n"
495  for r in self.runs:
496  s += "\t\t%i\n" % (r)
497  s += "\tFiles : \n"
498  for f in self.files:
499  s += "\t\t%s\n" % (f)
500  s += "\tInfo : \n"
501  for k in self.keys:
502  increment, integral = self.info[k]
503  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
504  return s
505 
506 
507 # =============================================================================
508 
509 
511  def __init__(self, o):
512  cl = 'LHCb::PackedCaloHypo'
513  assert o.__class__.__name__ == cl
514  self.centX = o.centX
515  self.centY = o.centY
516  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
517  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
518  self.firstCluster = o.firstCluster
519  self.firstDigit = o.firstDigit
520  self.firstHypo = o.firstHypo
521  self.hypothesis = o.hypothesis
522  self.key = o.key
523  self.lastCluster = o.lastCluster
524  self.lastDigit = o.lastDigit
525  self.lastHypo = o.lastHypo
526  self.lh = o.lh
527  self.pos = (o.posE, o.posX, o.posY)
528  self.z = o.z
529 
530  def __repr__(self):
531  s = "PackedCaloHypo : \n"
532  s += "\tcentX : %s\n" % (str(self.centX))
533  s += "\tcentY : %s\n" % (str(self.centY))
534  s += "\tcerr : %s\n" % (str(self.cerr))
535  s += "\tcov : %s\n" % (str(self.cov))
536  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
537  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
538  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
539  s += "\thypothesis : %s\n" % (str(self.hypothesis))
540  s += "\tkey : %s\n" % (str(self.key))
541  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
542  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
543  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
544  s += "\tlh : %s\n" % (str(self.lh))
545  s += "\tpos : %s\n" % (str(self.pos))
546  s += "\tz : %s\n" % (str(self.z))
547  s += "---------------------------------------\n"
548  return s
549 
550 
551 # =============================================================================
552 
553 
554 class SyncMini(object):
555  def __init__(self, event, lastEvent=None):
556  self.event = event
557  self.t = 0.0
558  self.lastEvent = None
559  if lastEvent:
560  self.lastEvent = lastEvent
561 
562  def check(self):
563  return self.event.is_set()
564 
565  def checkLast(self):
566  return self.lastEvent.is_set()
567 
568  def reset(self):
569  self.event.clear()
570  self.t = time.time()
571 
572  def getTime(self):
573  return self.t
574 
575  def set(self):
576  self.event.set()
577 
578  def __repr__(self):
579  s = "---------- SyncMini --------------\n"
580  s += " Status : %s\n" % (self.event.is_set())
581  s += " t : %5.2f\n" % (self.t)
582  if self.lastEvent:
583  s += "Last Event : %s\n" % (self.lastEvent.is_set())
584  s += "----------------------------------\n"
585  return s
586 
587 
588 # =============================================================================
589 
590 
591 class Syncer(object):
592  def __init__(self,
593  nWorkers,
594  log,
595  manyEvents=False,
596  limit=None,
597  step=None,
598  firstEvent=None):
599  # Class to help synchronise the sub-processes
600  self.limit = limit
601  self.step = step
602  self.d = {}
603  self.manyEvents = manyEvents
604 
605  for i in xrange(-2, nWorkers):
606  self.d[i] = SyncMini(Event(), lastEvent=Event())
607  if self.manyEvents:
608  self.limitFirst = firstEvent
609 
610  self.keys = self.d.keys()
611  self.nWorkers = nWorkers
612  self.log = log
613 
614  def syncAll(self, step="Not specified"):
615  # is it this method, or is it the rolling version needed?
616  # if so, drop through...
617 
618  if self.manyEvents:
619  sc = self.syncAllRolling()
620  return sc
621 
622  # Regular version ----------------------------
623  for i in xrange(0, self.limit, self.step):
624  if self.checkAll():
625  self.log.info('%s : All procs done @ %i s' % (step, i))
626  break
627  else:
628  time.sleep(self.step)
629 
630  # Now the time limit is up... check the status one final time
631  if self.checkAll():
632  self.log.info("All processes : %s ok." % (step))
633  return SUCCESS
634  else:
635  self.log.critical('Some process is hanging on : %s' % (step))
636  for k in self.keys:
637  hangString = "%s : Proc/Stat : %i/%s" % (step, k,
638  self.d[k].check())
639  self.log.critical(hangString)
640  return FAILURE
641 
642  def syncAllRolling(self):
643  # Keep track of the progress of Event processing
644  # Each process syncs after each event, so keep clearing
645  # the sync Event, and re-checking
646  # Note the time between True checks too, if the time
647  # between events exceeds singleEvent, this is considered a hang
648 
649  # set the initial time
650  begin = time.time()
651  firstEv = {}
652  timers = {}
653  for k in self.keys:
654  self.d[k].reset()
655  firstEv[k] = False
656  timers[k] = 0.0
657 
658  active = self.keys
659  while True:
660  # check the status of each sync object
661  for k in active:
662  sMini = self.d[k]
663 
664  if sMini.check() or sMini.checkLast():
665  if sMini.checkLast() and sMini.check():
666  # if last Event set,then event loop finished
667  active.remove(k)
668  alive = time.time() - begin
669  self.log.info(
670  "Audit : Node %i alive for %5.2f" % (k, alive))
671  else:
672  sMini.reset()
673  else:
674  # the event still has not been checked, how long is that?
675  # is it the first Event?
676  wait = time.time() - sMini.getTime()
677  cond = wait > self.limit
678  if not firstEv[k]:
679  cond = wait > self.limitFirst
680  firstEv[k] = True
681  if cond:
682  # It is hanging!
683  self.log.critical('Single event wait : %5.2f' % (wait))
684  self.processHang()
685  return FAILURE
686 
687  # Termination Criteria : if all procs have been removed, we're done
688  if self.checkLastEvents():
689  self.log.info('TC met for event loop')
690  break
691  else:
692  # sleep, loop again
693  time.sleep(self.step)
694 
695  self.log.info("All processes Completed all Events ok")
696  return SUCCESS
697 
698  def processHang(self):
699  self.log.critical('Some proc is hanging during Event processing!')
700  for k in self.keys:
701  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
702  return
703 
704  def checkAll(self):
705  # Check the status of each Sync object
706  # return True or False
707  currentStatus = [mini.check() for mini in self.d.values()]
708  return all(currentStatus)
709 
710  def checkLastEvents(self):
711  # check if all of the lastEvents are set to true in self.d[k][1]
712  stat = [sMini.checkLast() for sMini in self.d.values()]
713  return all(stat)
714 
715 
716 # =========================== Methods =========================================
717 
718 
719 def getEventNumber(evt):
720  # The class-independent version of the Event Number Retrieval method
721  #
722  n = None
723  # First Attempt : Unpacked Event Data
724  lst = ['/Event/Gen/Header', '/Event/Rec/Header']
725  for l in lst:
726  try:
727  n = evt[l].evtNumber()
728  return n
729  except:
730  # No evt number at this path
731  continue
732 
733  # second attepmt : try DAQ/RawEvent data
734  # The Evt Number is in bank type 16, bank 0, data pt 4
735  try:
736  n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
737  return n
738  except:
739  pass
740 
741  # Default Action
742  return n
743 
744 
745 # ================================= EOF =======================================
def checkLastEvents(self)
Definition: pTools.py:710
def checkAll(self)
Definition: pTools.py:704
def __init__(self, lumi)
Definition: pTools.py:434
def getEventNumber(evt)
Definition: pTools.py:719
def syncAllRolling(self)
Definition: pTools.py:642
def bookTH2D(self, n, o)
Definition: pTools.py:153
def __repr__(self)
Definition: pTools.py:578
def __init__(self, gmpComponent)
Definition: pTools.py:210
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:598
def RebuildHistoStore(self)
Definition: pTools.py:90
def localCmp(self, tupA, tupB)
Definition: pTools.py:218
constexpr auto size(const C &c) noexcept(noexcept(c.size())) -> decltype(c.size())
def bookTH1D(self, n, o)
Definition: pTools.py:143
def processHang(self)
Definition: pTools.py:698
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:396
def checkLast(self)
Definition: pTools.py:565
def register(self, tup)
Definition: pTools.py:70
def __repr__(self)
Definition: pTools.py:492
def __init__(self, gmpComponent)
Definition: pTools.py:51
def bookTH3D(self, n, o)
Definition: pTools.py:166
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:400
def __init__(self, event, lastEvent=None)
Definition: pTools.py:555
def syncAll(self, step="Not specified")
Definition: pTools.py:614
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:374
def bookTProfile(self, n, o)
Definition: pTools.py:182
def bookDataObject(self, n, o)
Definition: pTools.py:137
def merge(self, otherLumi)
Definition: pTools.py:465
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:356
def bookTProfile2D(self, n, o)
Definition: pTools.py:192