The Gaudi Framework  v29r0 (ff2e7097)
pTools.py
Go to the documentation of this file.
1 from GaudiPython import gbl, SUCCESS, FAILURE
2 from multiprocessing import Event
3 import pickle
4 import time
5 
6 # Eoin Smith
7 # 3 Aug 2010
8 
9 #
10 # This script contains the ancillary classes and functions used in the
11 # GaudiPython Parallel model
12 #
13 # Classes :
14 # - HistoAgent : In charge of extracting Histograms from their Transient Store
15 # on a reader/worker, communicating them to the writer, and on
16 # the writer receiving them and rebuilding a single store
17 #
18 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
19 #
20 # - LumiFSR : FSR data from different workers needs to be carefully merged to
21 # replicate the serial version; this class aids in that task by
22 # representing an LHCb LumiFSR object as a python class
23 #
24 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
25 # differences in serial and parallel output
26 #
27 # - Syncer : This class is responsible for syncing processes for a specified
28 # section of execution. For example, one Syncer object might be
29 # syncing Initialisation, one for Running, one for Finalisation.
30 # Syncer uses multiprocessing.Event() objects as flags which are
31 # visible across the N processes sharing them.
32 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
33 # ensure that there is no hanging; they in effect, allow a timeout
34 # for Initialisation, Run, Finalise on all processes
35 #
36 # - SyncMini : A class wrapper for a multiprocessing.Event() object
37 #
38 # Methods :
39 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
40 # ( AppMgr().evtsvc() ) to this to get the current
41 # Event Number as an integer (even from RawEvents!)
42 #
43 
44 
45 # used to convert stored histos (in AIDA format) to ROOT format
46 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
47 
48 # =========================== Classes =========================================
49 
50 
51 class HistoAgent():
52  def __init__(self, gmpComponent):
53  self._gmpc = gmpComponent
54  self.hvt = self._gmpc.hvt
55  self.histos = []
56  self.qin = self._gmpc.hq
57  self.log = self._gmpc.log
58 
59  # There are many methods for booking Histogram Objects to Histo store
60  # here they are collected in a dictionary, with key = a relevant name
61  self.bookingDict = {}
62  self.bookingDict['DataObject'] = self.bookDataObject
63  self.bookingDict['NTuple::Directory'] = self.bookDataObject
64  self.bookingDict['NTuple::File'] = self.bookDataObject
65  self.bookingDict['TH1D'] = self.bookTH1D
66  self.bookingDict['TH2D'] = self.bookTH2D
67  self.bookingDict['TH3D'] = self.bookTH3D
68  self.bookingDict['TProfile'] = self.bookTProfile
69  self.bookingDict['TProfile2D'] = self.bookTProfile2D
70 
71  def register(self, tup):
72  # add a tuple of (worker-id, histoDict) to self.histos
73  assert tup.__class__.__name__ == 'tuple'
74  self.histos.append(tup)
75 
76  def Receive(self):
77  hstatus = self._gmpc.nWorkers + 1 # +1 for the Reader!
78  while True:
79  tup = self.qin.get()
80  if tup == 'HISTOS_SENT':
81  self.log.debug('received HISTOS_SENT message')
82  hstatus -= 1
83  if not hstatus:
84  break
85  else:
86  self.register(tup)
87  self._gmpc.sEvent.set()
88  self.log.info('Writer received all histo bundles and set sync event')
89  return SUCCESS
90 
91  def RebuildHistoStore(self):
92  '''
93  Rebuild the Histogram Store from the histos received by Receive()
94  If we have a histo which is not in the store,
95  book and fill it according to self.bookingDict
96  If we have a histo with a matching histo in the store,
97  add the two histos, remembering that aida2root must be used on
98  the Stored histo for compatibility.
99  '''
100  errors = 0
101  for tup in self.histos:
102  workerID, histDict = tup
103  added = 0
104  registered = 0
105  booked = 0
106 
107  for n in histDict.keys():
108  o = histDict[n]
109  obj = self.hvt.retrieve(n)
110 
111  if obj:
112  try:
113  aida2root(obj).Add(o)
114  except:
115  self.log.warning('FAILED TO ADD : %s' % (str(obj)))
116  errors += 1
117  added += 1
118  else:
119 
120  if o.__class__.__name__ in self.bookingDict.keys():
121  try:
122  self.bookingDict[o.__class__.__name__](n, o)
123  except:
124  self.log.warning('FAILED TO REGISTER : %s\tto%s'
125  % (o.__class__.__name__, n))
126  errors += 1
127  else:
128  self.log.warning('No booking method for: %s\t%s\t%s'
129  % (n, type(o), o.__class__.__name__))
130  errors += 1
131  booked += 1
132  hs = self.hvt.getHistoNames()
133  self.log.info('Histo Store Rebuilt : ')
134  self.log.info(' Contains %i objects.' % (len(hs)))
135  self.log.info(' Errors in Rebuilding : %i' % (errors))
136  return SUCCESS
137 
138  def bookDataObject(self, n, o):
139  '''
140  Register a DataObject to the Histo Store
141  '''
142  self._gmpc.hvt.registerObject(n, o)
143 
144  def bookTH1D(self, n, o):
145  '''
146  Register a ROOT 1D THisto to the Histo Store
147  '''
148  obj = self.hvt._ihs.book(n, o.GetTitle(),
149  o.GetXaxis().GetNbins(),
150  o.GetXaxis().GetXmin(),
151  o.GetXaxis().GetXmax())
152  aida2root(obj).Add(o)
153 
154  def bookTH2D(self, n, o):
155  '''
156  Register a ROOT 2D THisto to the Histo Store
157  '''
158  obj = self.hvt._ihs.book(n, o.GetTitle(),
159  o.GetXaxis().GetNbins(),
160  o.GetXaxis().GetXmin(),
161  o.GetXaxis().GetXmax(),
162  o.GetYaxis().GetNbins(),
163  o.GetYaxis().GetXmin(),
164  o.GetYaxis().GetXmax())
165  aida2root(obj).Add(o)
166 
167  def bookTH3D(self, n, o):
168  '''
169  Register a ROOT 3D THisto to the Histo Store
170  '''
171  obj = self.hvt._ihs.book(n, o.GetTitle(),
172  o.GetXaxis().GetXbins(),
173  o.GetXaxis().GetXmin(),
174  o.GetXaxis().GetXmax(),
175  o.GetYaxis().GetXbins(),
176  o.GetYaxis().GetXmin(),
177  o.GetYaxis().GetXmax(),
178  o.GetZaxis().GetXbins(),
179  o.GetZaxis().GetXmin(),
180  o.GetZaxis().GetXmax())
181  aida2root(obj).Add(o)
182 
183  def bookTProfile(self, n, o):
184  '''
185  Register a ROOT TProfile to the Histo Store
186  '''
187  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
188  o.GetXaxis().GetNbins(),
189  o.GetXaxis().GetXmin(),
190  o.GetXaxis().GetXmax(),
191  o.GetOption())
192  aida2root(obj).Add(o)
193 
194  def bookTProfile2D(self, n, o):
195  '''
196  Register a ROOT TProfile2D to the Histo Store
197  '''
198  obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
199  o.GetXaxis().GetNbins(),
200  o.GetXaxis().GetXmin(),
201  o.GetXaxis().GetXmax(),
202  o.GetYaxis().GetNbins(),
203  o.GetYaxis().GetXmin(),
204  o.GetYaxis().GetXmax())
205  aida2root(obj).Add(o)
206 
207 # =============================================================================
208 
209 
211  def __init__(self, gmpComponent):
212  self._gmpc = gmpComponent
213  self.fsr = self._gmpc.fsr
214  self.q = self._gmpc.fq
215  self.log = self._gmpc.log
216  self.objectsIn = [] # used for collecting FSR store objects
217  self.objectsOut = []
218 
219  def localCmp(self, tupA, tupB):
220  # sort tuples by a particular element
221  # for the sort() method
222  ind = 0
223  valA = tupA[ind]
224  valB = tupB[ind]
225  if valA < valB:
226  return -1
227  elif valA > valB:
228  return 1
229  else:
230  return 0
231 
232  def SendFileRecords(self):
233  # send the FileRecords data as part of finalisation
234 
235  # Take Care of FileRecords!
236  # There are two main things to consider here
237  # 1) The DataObjects in the FileRecords Transient Store
238  # 2) The fact that they are Keyed Containers, containing other objects
239  #
240  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
241  # a completeness guarantee,
242  #
243  # send in form ( nodeID, path, object)
244  self.log.info('Sending FileRecords...')
245  lst = self.fsr.getHistoNames()
246 
247  # Check Validity
248  if not lst:
249  self.log.info('No FileRecords Data to send to Writer.')
250  self.q.put('END_FSR')
251  return SUCCESS
252 
253  # no need to send the root node
254  if '/FileRecords' in lst:
255  lst.remove('/FileRecords')
256 
257  for l in lst:
258  o = self.fsr.retrieveObject(l)
259  if hasattr(o, "configureDirectAccess"):
260  o.configureDirectAccess()
261  # lead worker sends everything, as completeness guarantee
262  if self._gmpc.nodeID == 0:
263  self.objectsOut.append((0, l, pickle.dumps(o)))
264  else:
265  # only add the Event Counter
266  # and non-Empty Keyed Containers (ignore empty ones)
267  if l == '/FileRecords/EventCountFSR':
268  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
269  self.objectsOut.append(tup)
270  else:
271  # It's a Keyed Container
272  assert "KeyedContainer" in o.__class__.__name__
273  nObjects = o.numberOfObjects()
274  if nObjects:
275  self.log.debug("Keyed Container %s with %i objects"
276  % (l, nObjects))
277  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
278  self.objectsOut.append(tup)
279  self.log.debug('Done with FSR store, just to send to Writer.')
280 
281  if self.objectsOut:
282  self.log.debug('%i FSR objects to Writer' % (len(self.objectsOut)))
283  for ob in self.objectsOut:
284  self.log.debug('\t%s' % (ob[0]))
285  self.q.put(self.objectsOut)
286  else:
287  self.log.info('Valid FSR Store, but no data to send to Writer')
288  self.log.info('SendFSR complete')
289  self.q.put('END_FSR')
290  return SUCCESS
291 
292  def Receive(self):
293  # Receive contents of all Workers FileRecords Transient Stores
294  self.log.info('Receiving FSR store data...')
295  nc = self._gmpc.nWorkers
296  while nc > 0:
297  objects = self.q.get()
298  if objects == 'END_FSR':
299  nc -= 1
300  continue
301  if nc == 0:
302  break
303  # but if it's regular objects...
304  for o in objects:
305  self.objectsIn.append(o)
306  # Now sort it by which worker it came from
307  # an object is : (nodeID, path, pickledObject)
308  self.objectsIn.sort(cmp=self.localCmp)
309  self.log.info('All FSR data received')
310  return SUCCESS
311 
312  def Rebuild(self):
313  # objects is a list of (path, serializedObject) tuples
314  for sourceNode, path, serialob in self.objectsIn:
315  self.log.debug('Working with %s' % (path))
316  ob = pickle.loads(serialob)
317  if hasattr(ob, 'update'):
318  ob.update()
319  if hasattr(ob, 'numberOfObjects'):
320  nCont = ob.numberOfObjects()
321  self.log.debug('\t %s has containedObjects : %i' %
322  (type(ob).__name__, nCont))
323  if sourceNode == 0:
324  self.log.debug('Registering Object to : %s' % (path))
325  self.fsr.registerObject(path, ob)
326  else:
327  self.log.debug('Merging Object to : %s' % (path))
328  self.MergeFSRobject(sourceNode, path, ob)
329  # As RecordStream has been split into Worker and Writer parts, the
330  # count for output is wrong... fix that here, as every event received
331  # by the writer is written (validation testing occurs on the worker)
332 
333  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
334  if bool(self.fsr._idp): # There might not be an FSR stream (Gauss)
335  ecount = '/FileRecords/EventCountFSR'
336  if self.fsr[ecount]:
337  self.fsr[ecount].setOutput(self._gmpc.nIn)
338  self.log.info('Event Counter Output set : %s : %i'
339  % (ecount, self.fsr[ecount].output()))
340  # Do some reporting
341  self.log.debug('FSR store reconstructed!')
342  lst = self.fsr.getHistoNames()
343  if lst:
344  for l in lst:
345  ob = self.fsr.retrieveObject(l)
346  if hasattr(ob, 'configureDirectAccess'):
347  ob.configureDirectAccess()
348  if hasattr(ob, 'containedObjects'):
349  # if ob.numberOfObjects() :
350  self.log.debug('\t%s (cont. objects : %i)'
351  % (l, ob.numberOfObjects()))
352  else:
353  self.log.debug('\t%s' % (l))
354  self.log.info('FSR Store fully rebuilt.')
355  return SUCCESS
356 
357  def MergeFSRobject(self, sourceNode, path, ob):
358  # Merge Non-Empty Keyed Container from Worker>0
359  if path == '/FileRecords/TimeSpanFSR':
360  # TimeSpanFSR is a straightforward case
361  self.ProcessTimeSpanFSR(path, ob)
362  elif path == '/FileRecords/EventCountFSR':
363  # Event Counter is also easy
364  self.ProcessEventCountFSR(path, ob)
365  # now other cases may not be so easy...
366  elif "KeyedContainer" in ob.__class__.__name__:
367  # Keyed Container of LumiFSRs : extract and re-register
368  # self.ProcessLumiFSR( path, ob )
369  if "LumiFSR" in ob.__class__.__name__:
370  self.MergeLumiFSR(path, ob)
371  else:
372  self.log.info("Skipping Merge of Keyed Container %s for %s"
373  % (ob.__class__.__name__, path))
374 
375  def ProcessTimeSpanFSR(self, path, ob):
376  ob2 = self.fsr.retrieveObject(path)
377  if ob.containedObjects().size():
378  sz = ob.containedObjects().size()
379  cob = ob2.containedObjects()[0]
380  min = cob.earliest()
381  max = cob.latest()
382  for j in xrange(sz):
383  cob = ob.containedObjects()[j]
384  self.log.debug('Adding TimeSpanFSR')
385  if cob.earliest() < min:
386  min = cob.earliest()
387  if cob.latest() > max:
388  max = cob.latest()
389  # this is annoying: it has to be rebuilt, without a key & added
390  continue
391  tsfsr = gbl.LHCb.TimeSpanFSR()
392  tsfsr.setEarliest(min)
393  tsfsr.setLatest(max)
394  self.fsr[path].clear()
395  self.fsr[path].add(tsfsr)
396 
397  def ProcessEventCountFSR(self, path, ob):
398  self.log.debug('Event Count Input Addition')
399  self.fsr[path].setInput(self.fsr[path].input() + ob.input())
400 
401  def MergeLumiFSR(self, path, keyedC):
402  from ROOT import string
403  # Fetch the first lumi
404  keyedContainer = self.fsr.retrieveObject(path)
405  # The LumiFSR KeyedContainers only have one object
406  assert keyedContainer.numberOfObjects() == 1
407  l = keyedContainer.containedObject(0)
408  baseLumi = LumiFSR(l)
409  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
410  nCont = keyedC.numberOfObjects()
411  for i in xrange(nCont):
412  obj = keyedC.containedObject(i)
413  nextLumi = LumiFSR(obj)
414  baseLumi.merge(nextLumi)
415  # Now Rebuild and ReRegister
416  newLumi = gbl.LHCb.LumiFSR()
417  for r in baseLumi.runs:
418  newLumi.addRunNumber(r)
419  for f in baseLumi.files:
420  newLumi.addFileID(string(f))
421  for k in baseLumi.keys:
422  increment, integral = baseLumi.info[k]
423  newLumi.addInfo(k, increment, integral)
424  # clear existing Keyed Container
425  self.fsr[path].clear()
426  # Add newly merged lumiFSR
427  self.fsr[path].add(newLumi)
428  return SUCCESS
429 
430 # =============================================================================
431 
432 
433 class LumiFSR():
434  def __init__(self, lumi):
435  # lumi looks like :
436  # { runs : 69857 69858
437  # files : root:/castor/cer.../069857_0000000006.raw
438  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
439 
440  # class variables
441  self.runs = []
442  self.files = []
443  self.info = {}
444  self.keys = []
445 
446  # get run numbers
447  for r in lumi.runNumbers():
448  self.runs.append(r)
449  # get file ids
450  for f in lumi.fileIDs():
451  self.files.append(f)
452  # Now the tricky bit, the info is not accessible via Python
453  # except as a string
454  s = str(lumi)
455  sa = s.split("info (key/incr/integral) : ")[-1]
456  sa = sa.split('/')[:-1]
457  for rec in sa:
458  k, i, t = rec.split()
459  k = int(k)
460  i = int(i)
461  t = int(t)
462  self.info[k] = (i, t)
463  self.keys = self.info.keys()
464 
465  def merge(self, otherLumi):
466  assert otherLumi.__class__.__name__ == "LumiFSR"
467  # add any extra runs
468  for r in otherLumi.runs:
469  if r in self.runs:
470  pass
471  else:
472  self.runs.append(r)
473  self.runs.sort()
474  # add any extra fileIDs
475  for f in otherLumi.files:
476  if f in self.files:
477  pass
478  else:
479  self.files.append(f)
480  self.files.sort()
481  # Now add any extra records
482  for k in otherLumi.keys:
483  increment, integral = otherLumi.info[k]
484  if k in self.keys:
485  myIncrement, myIntegral = self.info[k]
486  self.info[k] = (myIncrement + increment, myIntegral + integral)
487  else:
488  self.info[k] = (increment, integral)
489  # don't forget to update keys
490  self.keys = self.info.keys()
491 
492  def __repr__(self):
493  s = "LumiFSR Python class\n"
494  s += "\tRuns : \n"
495  for r in self.runs:
496  s += "\t\t%i\n" % (r)
497  s += "\tFiles : \n"
498  for f in self.files:
499  s += "\t\t%s\n" % (f)
500  s += "\tInfo : \n"
501  for k in self.keys:
502  increment, integral = self.info[k]
503  s += "\t\t%i\t%i\t%i\n" % (k, increment, integral)
504  return s
505 
506 # =============================================================================
507 
508 
510  def __init__(self, o):
511  cl = 'LHCb::PackedCaloHypo'
512  assert o.__class__.__name__ == cl
513  self.centX = o.centX
514  self.centY = o.centY
515  self.cerr = (o.cerr00, o.cerr10, o.cerr11)
516  self.cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
517  self.firstCluster = o.firstCluster
518  self.firstDigit = o.firstDigit
519  self.firstHypo = o.firstHypo
520  self.hypothesis = o.hypothesis
521  self.key = o.key
522  self.lastCluster = o.lastCluster
523  self.lastDigit = o.lastDigit
524  self.lastHypo = o.lastHypo
525  self.lh = o.lh
526  self.pos = (o.posE, o.posX, o.posY)
527  self.z = o.z
528 
529  def __repr__(self):
530  s = "PackedCaloHypo : \n"
531  s += "\tcentX : %s\n" % (str(self.centX))
532  s += "\tcentY : %s\n" % (str(self.centY))
533  s += "\tcerr : %s\n" % (str(self.cerr))
534  s += "\tcov : %s\n" % (str(self.cov))
535  s += "\tfirstCluster : %s\n" % (str(self.firstCluster))
536  s += "\tfirstDigit : %s\n" % (str(self.firstDigit))
537  s += "\tfirstHypo : %s\n" % (str(self.firstHypo))
538  s += "\thypothesis : %s\n" % (str(self.hypothesis))
539  s += "\tkey : %s\n" % (str(self.key))
540  s += "\tlastCluster : %s\n" % (str(self.lastCluster))
541  s += "\tlastDigit : %s\n" % (str(self.lastDigit))
542  s += "\tlastHypo : %s\n" % (str(self.lastHypo))
543  s += "\tlh : %s\n" % (str(self.lh))
544  s += "\tpos : %s\n" % (str(self.pos))
545  s += "\tz : %s\n" % (str(self.z))
546  s += "---------------------------------------\n"
547  return s
548 
549 # =============================================================================
550 
551 
552 class SyncMini(object):
553  def __init__(self, event, lastEvent=None):
554  self.event = event
555  self.t = 0.0
556  self.lastEvent = None
557  if lastEvent:
558  self.lastEvent = lastEvent
559 
560  def check(self):
561  return self.event.is_set()
562 
563  def checkLast(self):
564  return self.lastEvent.is_set()
565 
566  def reset(self):
567  self.event.clear()
568  self.t = time.time()
569 
570  def getTime(self):
571  return self.t
572 
573  def set(self):
574  self.event.set()
575 
576  def __repr__(self):
577  s = "---------- SyncMini --------------\n"
578  s += " Status : %s\n" % (self.event.is_set())
579  s += " t : %5.2f\n" % (self.t)
580  if self.lastEvent:
581  s += "Last Event : %s\n" % (self.lastEvent.is_set())
582  s += "----------------------------------\n"
583  return s
584 
585 # =============================================================================
586 
587 
588 class Syncer(object):
589  def __init__(self, nWorkers, log, manyEvents=False,
590  limit=None, step=None, firstEvent=None):
591  # Class to help synchronise the sub-processes
592  self.limit = limit
593  self.step = step
594  self.d = {}
595  self.manyEvents = manyEvents
596 
597  for i in xrange(-2, nWorkers):
598  self.d[i] = SyncMini(Event(), lastEvent=Event())
599  if self.manyEvents:
600  self.limitFirst = firstEvent
601 
602  self.keys = self.d.keys()
603  self.nWorkers = nWorkers
604  self.log = log
605 
606  def syncAll(self, step="Not specified"):
607  # is it this method, or is it the rolling version needed?
608  # if so, drop through...
609 
610  if self.manyEvents:
611  sc = self.syncAllRolling()
612  return sc
613 
614  # Regular version ----------------------------
615  for i in xrange(0, self.limit, self.step):
616  if self.checkAll():
617  self.log.info('%s : All procs done @ %i s' % (step, i))
618  break
619  else:
620  time.sleep(self.step)
621 
622  # Now the time limit is up... check the status one final time
623  if self.checkAll():
624  self.log.info("All processes : %s ok." % (step))
625  return SUCCESS
626  else:
627  self.log.critical('Some process is hanging on : %s' % (step))
628  for k in self.keys:
629  hangString = "%s : Proc/Stat : %i/%s" % (
630  step, k, self.d[k].check())
631  self.log.critical(hangString)
632  return FAILURE
633 
634  def syncAllRolling(self):
635  # Keep track of the progress of Event processing
636  # Each process syncs after each event, so keep clearing
637  # the sync Event, and re-checking
638  # Note the time between True checks too, if the time
639  # between events exceeds singleEvent, this is considered a hang
640 
641  # set the initial time
642  begin = time.time()
643  firstEv = {}
644  timers = {}
645  for k in self.keys:
646  self.d[k].reset()
647  firstEv[k] = False
648  timers[k] = 0.0
649 
650  active = self.keys
651  while True:
652  # check the status of each sync object
653  for k in active:
654  sMini = self.d[k]
655 
656  if sMini.check() or sMini.checkLast():
657  if sMini.checkLast() and sMini.check():
658  # if last Event set,then event loop finished
659  active.remove(k)
660  alive = time.time() - begin
661  self.log.info("Audit : Node %i alive for %5.2f"
662  % (k, alive))
663  else:
664  sMini.reset()
665  else:
666  # the event still has not been checked, how long is that?
667  # is it the first Event?
668  wait = time.time() - sMini.getTime()
669  cond = wait > self.limit
670  if not firstEv[k]:
671  cond = wait > self.limitFirst
672  firstEv[k] = True
673  if cond:
674  # It is hanging!
675  self.log.critical('Single event wait : %5.2f' % (wait))
676  self.processHang()
677  return FAILURE
678 
679  # Termination Criteria : if all procs have been removed, we're done
680  if self.checkLastEvents():
681  self.log.info('TC met for event loop')
682  break
683  else:
684  # sleep, loop again
685  time.sleep(self.step)
686 
687  self.log.info("All processes Completed all Events ok")
688  return SUCCESS
689 
690  def processHang(self):
691  self.log.critical('Some proc is hanging during Event processing!')
692  for k in self.keys:
693  self.log.critical("Proc/Stat : %i / %s" % (k, self.d[k].check()))
694  return
695 
696  def checkAll(self):
697  # Check the status of each Sync object
698  # return True or False
699  currentStatus = [mini.check() for mini in self.d.values()]
700  return all(currentStatus)
701 
702  def checkLastEvents(self):
703  # check if all of the lastEvents are set to true in self.d[k][1]
704  stat = [sMini.checkLast() for sMini in self.d.values()]
705  return all(stat)
706 
707 # =========================== Methods =========================================
708 
709 
710 def getEventNumber(evt):
711  # The class-independent version of the Event Number Retrieval method
712  #
713  n = None
714  # First Attempt : Unpacked Event Data
715  lst = ['/Event/Gen/Header',
716  '/Event/Rec/Header']
717  for l in lst:
718  try:
719  n = evt[l].evtNumber()
720  return n
721  except:
722  # No evt number at this path
723  continue
724 
725  # second attepmt : try DAQ/RawEvent data
726  # The Evt Number is in bank type 16, bank 0, data pt 4
727  try:
728  n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
729  return n
730  except:
731  pass
732 
733  # Default Action
734  return n
735 
736 # ================================= EOF =======================================
def checkLastEvents(self)
Definition: pTools.py:702
def checkAll(self)
Definition: pTools.py:696
def __init__(self, lumi)
Definition: pTools.py:434
def getEventNumber(evt)
Definition: pTools.py:710
def syncAllRolling(self)
Definition: pTools.py:634
def bookTH2D(self, n, o)
Definition: pTools.py:154
def __repr__(self)
Definition: pTools.py:576
def __init__(self, gmpComponent)
Definition: pTools.py:211
def __init__(self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None)
Definition: pTools.py:590
def RebuildHistoStore(self)
Definition: pTools.py:91
def localCmp(self, tupA, tupB)
Definition: pTools.py:219
def bookTH1D(self, n, o)
Definition: pTools.py:144
def processHang(self)
Definition: pTools.py:690
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:397
def checkLast(self)
Definition: pTools.py:563
def register(self, tup)
Definition: pTools.py:71
def __repr__(self)
Definition: pTools.py:492
def __init__(self, gmpComponent)
Definition: pTools.py:52
def bookTH3D(self, n, o)
Definition: pTools.py:167
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:401
def __init__(self, event, lastEvent=None)
Definition: pTools.py:553
def syncAll(self, step="Not specified")
Definition: pTools.py:606
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:375
def bookTProfile(self, n, o)
Definition: pTools.py:183
def bookDataObject(self, n, o)
Definition: pTools.py:138
def merge(self, otherLumi)
Definition: pTools.py:465
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:357
def bookTProfile2D(self, n, o)
Definition: pTools.py:194