All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
pTools.py
Go to the documentation of this file.
1 from GaudiPython import gbl, SUCCESS, FAILURE
2 from multiprocessing import Event
3 import pickle, time
4 
5 # Eoin Smith
6 # 3 Aug 2010
7 
8 #
9 # This script contains the ancillary classes and functions used in the
10 # GaudiPython Parallel model
11 #
12 # Classes :
13 # - HistoAgent : In charge of extracting Histograms from their Transient Store
14 # on a reader/worker, communicating them to the writer, and on
15 # the writer receiving them and rebuilding a single store
16 #
17 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
18 #
19 # - LumiFSR : FSR data from different workers needs to be carefully merged to
20 # replicate the serial version; this class aids in that task by
21 # representing an LHCb LumiFSR object as a python class
22 #
23 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
24 # differences in serial and parallel output
25 #
26 # - Syncer : This class is responsible for syncing processes for a specified
27 # section of execution. For example, one Syncer object might be
28 # syncing Initialisation, one for Running, one for Finalisation.
29 # Syncer uses multiprocessing.Event() objects as flags which are
30 # visible across the N processes sharing them.
31 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
32 # ensure that there is no hanging; they in effect, allow a timeout
33 # for Initialisation, Run, Finalise on all processes
34 #
35 # - SyncMini : A class wrapper for a multiprocessing.Event() object
36 #
37 # Methods :
38 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
39 # ( AppMgr().evtsvc() ) to this to get the current
40 # Event Number as an integer (even from RawEvents!)
41 #
42 
43 
44 # used to convert stored histos (in AIDA format) to ROOT format
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
46 
47 # =========================== Classes =========================================
48 
49 class HistoAgent( ) :
50  def __init__( self, gmpComponent ) :
51  self._gmpc = gmpComponent
52  self.hvt = self._gmpc.hvt
53  self.histos = []
54  self.qin = self._gmpc.hq
55  self.log = self._gmpc.log
56 
57  # There are many methods for booking Histogram Objects to Histo store
58  # here they are collected in a dictionary, with key = a relevant name
59  self.bookingDict = {}
60  self.bookingDict['DataObject'] = self.bookDataObject
61  self.bookingDict['NTuple::Directory'] = self.bookDataObject
62  self.bookingDict['NTuple::File'] = self.bookDataObject
63  self.bookingDict['TH1D'] = self.bookTH1D
64  self.bookingDict['TH2D'] = self.bookTH2D
65  self.bookingDict['TH3D'] = self.bookTH3D
66  self.bookingDict['TProfile'] = self.bookTProfile
67  self.bookingDict['TProfile2D'] = self.bookTProfile2D
68 
69  def register( self, tup ) :
70  # add a tuple of (worker-id, histoDict) to self.histos
71  assert tup.__class__.__name__ == 'tuple'
72  self.histos.append( tup )
73 
74  def Receive( self ) :
75  hstatus = self._gmpc.nWorkers+1 # +1 for the Reader!
76  while True :
77  tup = self.qin.get()
78  if tup == 'HISTOS_SENT' :
79  self.log.debug('received HISTOS_SENT message')
80  hstatus -= 1
81  if not hstatus : break
82  else :
83  self.register( tup )
84  self._gmpc.sEvent.set()
85  self.log.info('Writer received all histo bundles and set sync event')
86  return SUCCESS
87 
88 
89  def RebuildHistoStore( self ) :
90  '''
91  Rebuild the Histogram Store from the histos received by Receive()
92  If we have a histo which is not in the store,
93  book and fill it according to self.bookingDict
94  If we have a histo with a matching histo in the store,
95  add the two histos, remembering that aida2root must be used on
96  the Stored histo for compatibility.
97  '''
98  errors = 0
99  for tup in self.histos :
100  workerID, histDict = tup
101  added = 0 ; registered = 0; booked = 0
102 
103  for n in histDict.keys() :
104  o = histDict[ n ]
105  obj = self.hvt.retrieve( n )
106 
107  if obj :
108  try :
109  aida2root(obj).Add(o)
110  except :
111  self.log.warning('FAILED TO ADD : %s'%(str(obj)))
112  errors += 1
113  added += 1
114  else :
115 
116  if o.__class__.__name__ in self.bookingDict.keys() :
117  try :
118  self.bookingDict[o.__class__.__name__](n, o)
119  except :
120  self.log.warning('FAILED TO REGISTER : %s\tto%s'\
121  %(o.__class__.__name__, n))
122  errors += 1
123  else :
124  self.log.warning( 'No booking method for: %s\t%s\t%s'\
125  %(n,type(o),o.__class__.__name__) )
126  errors += 1
127  booked += 1
128  hs = self.hvt.getHistoNames()
129  self.log.info( 'Histo Store Rebuilt : ' )
130  self.log.info( ' Contains %i objects.'%(len(hs)) )
131  self.log.info( ' Errors in Rebuilding : %i'%(errors) )
132  return SUCCESS
133 
134 
135  def bookDataObject( self, n, o ):
136  '''
137  Register a DataObject to the Histo Store
138  '''
139  self._gmpc.hvt.registerObject( n, o )
140 
141  def bookTH1D( self, n, o ) :
142  '''
143  Register a ROOT 1D THisto to the Histo Store
144  '''
145  obj = self.hvt._ihs.book( n, o.GetTitle(),\
146  o.GetXaxis().GetNbins(),\
147  o.GetXaxis().GetXmin(),\
148  o.GetXaxis().GetXmax() )
149  aida2root(obj).Add(o)
150 
151  def bookTH2D( self, n, o ) :
152  '''
153  Register a ROOT 2D THisto to the Histo Store
154  '''
155  obj = self.hvt._ihs.book( n, o.GetTitle(),\
156  o.GetXaxis().GetNbins(),\
157  o.GetXaxis().GetXmin(),\
158  o.GetXaxis().GetXmax(),\
159  o.GetYaxis().GetNbins(),\
160  o.GetYaxis().GetXmin(),\
161  o.GetYaxis().GetXmax() )
162  aida2root(obj).Add(o)
163 
164  def bookTH3D( self, n, o ) :
165  '''
166  Register a ROOT 3D THisto to the Histo Store
167  '''
168  obj = self.hvt._ihs.book( n, o.GetTitle(),\
169  o.GetXaxis().GetXbins(),\
170  o.GetXaxis().GetXmin(),\
171  o.GetXaxis().GetXmax(),\
172  o.GetYaxis().GetXbins(),\
173  o.GetYaxis().GetXmin(),\
174  o.GetYaxis().GetXmax(),\
175  o.GetZaxis().GetXbins(),\
176  o.GetZaxis().GetXmin(),\
177  o.GetZaxis().GetXmax() )
178  aida2root(obj).Add(o)
179 
180  def bookTProfile( self, n, o ) :
181  '''
182  Register a ROOT TProfile to the Histo Store
183  '''
184  obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
185  o.GetXaxis().GetNbins(),\
186  o.GetXaxis().GetXmin(),\
187  o.GetXaxis().GetXmax(),\
188  o.GetOption() )
189  aida2root(obj).Add(o)
190 
191  def bookTProfile2D( self, n, o ) :
192  '''
193  Register a ROOT TProfile2D to the Histo Store
194  '''
195  obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
196  o.GetXaxis().GetNbins(),\
197  o.GetXaxis().GetXmin(),\
198  o.GetXaxis().GetXmax(),\
199  o.GetYaxis().GetNbins(),\
200  o.GetYaxis().GetXmin(),\
201  o.GetYaxis().GetXmax() )
202  aida2root(obj).Add(o)
203 
204 # =============================================================================
205 
207  def __init__( self, gmpComponent ) :
208  self._gmpc = gmpComponent
209  self.fsr = self._gmpc.fsr
210  self.q = self._gmpc.fq
211  self.log = self._gmpc.log
212  self.objectsIn = [] # used for collecting FSR store objects
213  self.objectsOut = []
214 
215  def localCmp( self, tupA, tupB ) :
216  # sort tuples by a particular element
217  # for the sort() method
218  ind = 0
219  valA = tupA[ind]
220  valB = tupB[ind]
221  if valA<valB : return -1
222  elif valA>valB : return 1
223  else : return 0
224 
225 
226  def SendFileRecords( self ) :
227  # send the FileRecords data as part of finalisation
228 
229  # Take Care of FileRecords!
230  # There are two main things to consider here
231  # 1) The DataObjects in the FileRecords Transient Store
232  # 2) The fact that they are Keyed Containers, containing other objects
233  #
234  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
235  # a completeness guarantee,
236  #
237  # send in form ( nodeID, path, object)
238  self.log.info('Sending FileRecords...')
239  lst = self.fsr.getHistoNames()
240 
241  # Check Validity
242  if not lst :
243  self.log.info('No FileRecords Data to send to Writer.')
244  self.q.put( 'END_FSR' )
245  return SUCCESS
246 
247  # no need to send the root node
248  if '/FileRecords' in lst : lst.remove('/FileRecords')
249 
250  for l in lst :
251  o = self.fsr.retrieveObject( l )
252  if hasattr(o, "configureDirectAccess") :
253  o.configureDirectAccess()
254  # lead worker sends everything, as completeness guarantee
255  if self._gmpc.nodeID == 0 :
256  self.objectsOut.append( (0, l, pickle.dumps(o)) )
257  else :
258  # only add the Event Counter
259  # and non-Empty Keyed Containers (ignore empty ones)
260  if l == '/FileRecords/EventCountFSR' :
261  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
262  self.objectsOut.append( tup )
263  else :
264  # It's a Keyed Container
265  assert "KeyedContainer" in o.__class__.__name__
266  nObjects = o.numberOfObjects()
267  if nObjects :
268  self.log.debug("Keyed Container %s with %i objects"\
269  %(l, nObjects))
270  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
271  self.objectsOut.append( tup )
272  self.log.debug('Done with FSR store, just to send to Writer.')
273 
274  if self.objectsOut :
275  self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
276  for ob in self.objectsOut :
277  self.log.debug('\t%s'%(ob[0]))
278  self.q.put( self.objectsOut )
279  else :
280  self.log.info('Valid FSR Store, but no data to send to Writer')
281  self.log.info('SendFSR complete')
282  self.q.put( 'END_FSR' )
283  return SUCCESS
284 
285  def Receive( self ) :
286  # Receive contents of all Workers FileRecords Transient Stores
287  self.log.info('Receiving FSR store data...')
288  nc = self._gmpc.nWorkers
289  while nc > 0 :
290  objects = self.q.get( )
291  if objects == 'END_FSR' :
292  nc -= 1
293  continue
294  if nc==0 :
295  break
296  # but if it's regular objects...
297  for o in objects :
298  self.objectsIn.append(o)
299  # Now sort it by which worker it came from
300  # an object is : (nodeID, path, pickledObject)
301  self.objectsIn.sort(cmp=self.localCmp)
302  self.log.info('All FSR data received')
303  return SUCCESS
304 
305  def Rebuild( self ) :
306  # objects is a list of (path, serializedObject) tuples
307  for sourceNode, path, serialob in self.objectsIn :
308  self.log.debug('Working with %s'%(path))
309  ob = pickle.loads(serialob)
310  if hasattr( ob, 'update' ) :
311  ob.update()
312  if hasattr( ob, 'numberOfObjects' ) :
313  nCont = ob.numberOfObjects()
314  self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
315  if sourceNode == 0 :
316  self.log.debug('Registering Object to : %s'%(path))
317  self.fsr.registerObject( path, ob )
318  else :
319  self.log.debug('Merging Object to : %s'%(path))
320  self.MergeFSRobject( sourceNode, path, ob )
321  # As RecordStream has been split into Worker and Writer parts, the
322  # count for output is wrong... fix that here, as every event received
323  # by the writer is written (validation testing occurs on the worker)
324 
325  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
326  if bool( self.fsr._idp ) : # There might not be an FSR stream (Gauss)
327  ecount = '/FileRecords/EventCountFSR'
328  if self.fsr[ecount] :
329  self.fsr[ecount].setOutput( self._gmpc.nIn )
330  self.log.info( 'Event Counter Output set : %s : %i'\
331  %(ecount, self.fsr[ecount].output()) )
332  # Do some reporting
333  self.log.debug('FSR store reconstructed!')
334  lst = self.fsr.getHistoNames()
335  if lst :
336  for l in lst :
337  ob = self.fsr.retrieveObject(l)
338  if hasattr( ob, 'configureDirectAccess' ) :
339  ob.configureDirectAccess()
340  if hasattr( ob, 'containedObjects' ) :
341  # if ob.numberOfObjects() :
342  self.log.debug('\t%s (cont. objects : %i)'\
343  %(l, ob.numberOfObjects()))
344  else :
345  self.log.debug('\t%s'%(l))
346  self.log.info('FSR Store fully rebuilt.')
347  return SUCCESS
348 
349  def MergeFSRobject( self, sourceNode, path, ob ) :
350  # Merge Non-Empty Keyed Container from Worker>0
351  if path == '/FileRecords/TimeSpanFSR' :
352  # TimeSpanFSR is a straightforward case
353  self.ProcessTimeSpanFSR( path, ob )
354  elif path == '/FileRecords/EventCountFSR' :
355  # Event Counter is also easy
356  self.ProcessEventCountFSR( path, ob )
357  # now other cases may not be so easy...
358  elif "KeyedContainer" in ob.__class__.__name__ :
359  # Keyed Container of LumiFSRs : extract and re-register
360  # self.ProcessLumiFSR( path, ob )
361  if "LumiFSR" in ob.__class__.__name__ :
362  self.MergeLumiFSR( path, ob )
363  else:
364  self.log.info("Skipping Merge of Keyed Container %s for %s"\
365  %(ob.__class__.__name__,path))
366 
367  def ProcessTimeSpanFSR( self, path, ob ) :
368  ob2 = self.fsr.retrieveObject( path )
369  if ob.containedObjects().size() :
370  sz = ob.containedObjects().size()
371  cob = ob2.containedObjects()[0]
372  min = cob.earliest()
373  max = cob.latest()
374  for j in xrange( sz ) :
375  cob = ob.containedObjects()[j]
376  self.log.debug( 'Adding TimeSpanFSR' )
377  if cob.earliest() < min:
378  min = cob.earliest()
379  if cob.latest() > max:
380  max = cob.latest()
381  # this is annoying: it has to be rebuilt, without a key & added
382  continue
383  tsfsr = gbl.LHCb.TimeSpanFSR()
384  tsfsr.setEarliest( min )
385  tsfsr.setLatest( max )
386  self.fsr[path].clear()
387  self.fsr[path].add( tsfsr )
388 
389  def ProcessEventCountFSR( self, path, ob ) :
390  self.log.debug('Event Count Input Addition')
391  self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
392 
393  def MergeLumiFSR( self, path, keyedC ) :
394  from ROOT import string
395  # Fetch the first lumi
396  keyedContainer = self.fsr.retrieveObject(path)
397  # The LumiFSR KeyedContainers only have one object
398  assert keyedContainer.numberOfObjects() == 1
399  l = keyedContainer.containedObject(0)
400  baseLumi = LumiFSR( l )
401  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
402  nCont = keyedC.numberOfObjects()
403  for i in xrange(nCont) :
404  obj = keyedC.containedObject(i)
405  nextLumi = LumiFSR( obj )
406  baseLumi.merge( nextLumi )
407  # Now Rebuild and ReRegister
408  newLumi = gbl.LHCb.LumiFSR()
409  for r in baseLumi.runs :
410  newLumi.addRunNumber( r )
411  for f in baseLumi.files :
412  newLumi.addFileID( string(f) )
413  for k in baseLumi.keys :
414  increment, integral = baseLumi.info[k]
415  newLumi.addInfo(k, increment, integral)
416  # clear existing Keyed Container
417  self.fsr[path].clear()
418  # Add newly merged lumiFSR
419  self.fsr[path].add(newLumi)
420  return SUCCESS
421 
422 # =============================================================================
423 
424 class LumiFSR( ) :
425  def __init__(self, lumi) :
426  # lumi looks like :
427  # { runs : 69857 69858
428  # files : root:/castor/cer.../069857_0000000006.raw
429  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
430 
431  # class variables
432  self.runs = []
433  self.files = []
434  self.info = {}
435  self.keys = []
436 
437  # get run numbers
438  for r in lumi.runNumbers() :
439  self.runs.append(r)
440  # get file ids
441  for f in lumi.fileIDs() :
442  self.files.append(f)
443  # Now the tricky bit, the info is not accessible via Python
444  # except as a string
445  s = str(lumi)
446  sa = s.split("info (key/incr/integral) : ")[-1]
447  sa = sa.split('/')[:-1]
448  for rec in sa :
449  k,i,t = rec.split()
450  k = int(k)
451  i = int(i)
452  t = int(t)
453  self.info[k] = (i,t)
454  self.keys = self.info.keys()
455  def merge( self, otherLumi ) :
456  assert otherLumi.__class__.__name__ == "LumiFSR"
457  # add any extra runs
458  for r in otherLumi.runs :
459  if r in self.runs :
460  pass
461  else :
462  self.runs.append( r )
463  self.runs.sort()
464  # add any extra fileIDs
465  for f in otherLumi.files :
466  if f in self.files :
467  pass
468  else :
469  self.files.append( f )
470  self.files.sort()
471  # Now add any extra records
472  for k in otherLumi.keys :
473  increment, integral = otherLumi.info[k]
474  if k in self.keys :
475  myIncrement, myIntegral = self.info[k]
476  self.info[k] = ( myIncrement+increment, myIntegral+integral )
477  else :
478  self.info[k] = ( increment, integral )
479  # don't forget to update keys
480  self.keys = self.info.keys()
481  def __repr__( self ) :
482  s = "LumiFSR Python class\n"
483  s += "\tRuns : \n"
484  for r in self.runs :
485  s += "\t\t%i\n"%(r)
486  s += "\tFiles : \n"
487  for f in self.files :
488  s += "\t\t%s\n"%(f)
489  s += "\tInfo : \n"
490  for k in self.keys :
491  increment, integral = self.info[k]
492  s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
493  return s
494 
495 # =============================================================================
496 
497 class PackedCaloHypo() :
498  def __init__(self, o) :
499  cl = 'LHCb::PackedCaloHypo'
500  assert o.__class__.__name__ == cl
501  self.centX = o.centX
502  self.centY = o.centY
503  self.cerr = (o.cerr00,o.cerr10,o.cerr11)
504  self.cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
505  self.firstCluster = o.firstCluster
506  self.firstDigit = o.firstDigit
507  self.firstHypo = o.firstHypo
508  self.hypothesis = o.hypothesis
509  self.key = o.key
510  self.lastCluster = o.lastCluster
511  self.lastDigit = o.lastDigit
512  self.lastHypo = o.lastHypo
513  self.lh = o.lh
514  self.pos = (o.posE, o.posX, o.posY)
515  self.z = o.z
516  def __repr__( self ) :
517  s = "PackedCaloHypo : \n"
518  s += "\tcentX : %s\n"%( str(self.centX) )
519  s += "\tcentY : %s\n"%( str(self.centY) )
520  s += "\tcerr : %s\n"%( str(self.cerr ) )
521  s += "\tcov : %s\n"%( str(self.cov ) )
522  s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
523  s += "\tfirstDigit : %s\n"%( str(self.firstDigit) )
524  s += "\tfirstHypo : %s\n"%( str(self.firstHypo) )
525  s += "\thypothesis : %s\n"%( str(self.hypothesis) )
526  s += "\tkey : %s\n"%( str(self.key) )
527  s += "\tlastCluster : %s\n"%( str(self.lastCluster) )
528  s += "\tlastDigit : %s\n"%( str(self.lastDigit) )
529  s += "\tlastHypo : %s\n"%( str(self.lastHypo) )
530  s += "\tlh : %s\n"%( str(self.lh ) )
531  s += "\tpos : %s\n"%( str(self.pos ) )
532  s += "\tz : %s\n"%( str(self.z ) )
533  s += "---------------------------------------\n"
534  return s
535 
536 # =============================================================================
537 
538 class SyncMini( object ) :
539  def __init__( self, event, lastEvent=None ) :
540  self.event = event
541  self.t = 0.0
542  self.lastEvent = None
543  if lastEvent :
544  self.lastEvent = lastEvent
545  def check( self ) :
546  return self.event.is_set()
547  def checkLast( self ) :
548  return self.lastEvent.is_set()
549  def reset( self ) :
550  self.event.clear()
551  self.t = time.time()
552  def getTime( self ) :
553  return self.t
554  def set( self ) :
555  self.event.set()
556  def __repr__( self ) :
557  s = "---------- SyncMini --------------\n"
558  s += " Status : %s\n"%(self.event.is_set())
559  s += " t : %5.2f\n"%(self.t)
560  if self.lastEvent :
561  s += "Last Event : %s\n"%(self.lastEvent.is_set())
562  s += "----------------------------------\n"
563  return s
564 
565 # =============================================================================
566 
567 class Syncer( object ) :
568  def __init__( self, nWorkers, log, manyEvents=False,
569  limit=None, step=None, firstEvent=None ) :
570  # Class to help synchronise the sub-processes
571  self.limit = limit
572  self.step = step
573  self.d = {}
574  self.manyEvents = manyEvents
575 
576  for i in xrange(-2, nWorkers) :
577  self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
578  if self.manyEvents :
579  self.limitFirst = firstEvent
580 
581  self.keys = self.d.keys()
582  self.nWorkers = nWorkers
583  self.log = log
584 
585  def syncAll( self, step="Not specified" ) :
586  # is it this method, or is it the rolling version needed?
587  # if so, drop through...
588 
589  if self.manyEvents :
590  sc = self.syncAllRolling( )
591  return sc
592 
593  # Regular version ----------------------------
594  for i in xrange( 0, self.limit, self.step ) :
595  if self.checkAll( ) :
596  self.log.info('%s : All procs done @ %i s'%(step,i))
597  break
598  else :
599  time.sleep(self.step)
600 
601  # Now the time limit is up... check the status one final time
602  if self.checkAll() :
603  self.log.info("All processes : %s ok."%(step))
604  return SUCCESS
605  else :
606  self.log.critical('Some process is hanging on : %s'%(step))
607  for k in self.keys :
608  hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
609  self.log.critical( hangString )
610  return FAILURE
611 
612  def syncAllRolling( self ) :
613  # Keep track of the progress of Event processing
614  # Each process syncs after each event, so keep clearing
615  # the sync Event, and re-checking
616  # Note the time between True checks too, if the time
617  # between events exceeds singleEvent, this is considered a hang
618 
619  # set the initial time
620  begin = time.time()
621  firstEv = {}
622  timers = {}
623  for k in self.keys :
624  self.d[k].reset()
625  firstEv[k] = False
626  timers[k] = 0.0
627 
628  active = self.keys
629  while True :
630  # check the status of each sync object
631  for k in active :
632  sMini = self.d[k]
633 
634  if sMini.check() or sMini.checkLast():
635  if sMini.checkLast() and sMini.check() :
636  # if last Event set,then event loop finished
637  active.remove( k )
638  alive = time.time()-begin
639  self.log.info( "Audit : Node %i alive for %5.2f"\
640  %(k,alive) )
641  else :
642  sMini.reset()
643  else :
644  # the event still has not been checked, how long is that?
645  # is it the first Event?
646  wait = time.time()-sMini.getTime()
647  cond = wait > self.limit
648  if not firstEv[k] :
649  cond = wait > self.limitFirst
650  firstEv[k] = True
651  if cond :
652  # It is hanging!
653  self.log.critical('Single event wait : %5.2f'%(wait))
654  self.processHang()
655  return FAILURE
656 
657  # Termination Criteria : if all procs have been removed, we're done
658  if self.checkLastEvents() :
659  self.log.info('TC met for event loop')
660  break
661  else :
662  # sleep, loop again
663  time.sleep(self.step)
664 
665  self.log.info("All processes Completed all Events ok")
666  return SUCCESS
667 
668  def processHang( self ) :
669  self.log.critical('Some proc is hanging during Event processing!')
670  for k in self.keys :
671  self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
672  return
673 
674  def checkAll( self ) :
675  # Check the status of each Sync object
676  # return True or False
677  currentStatus = [ mini.check() for mini in self.d.values() ]
678  return all( currentStatus )
679 
680  def checkLastEvents( self ) :
681  # check if all of the lastEvents are set to true in self.d[k][1]
682  stat = [ sMini.checkLast() for sMini in self.d.values() ]
683  return all(stat)
684 
685 # =========================== Methods =========================================
686 
687 def getEventNumber( evt ) :
688  # The class-independent version of the Event Number Retrieval method
689  #
690  n = None
691  # First Attempt : Unpacked Event Data
692  lst = [ '/Event/Gen/Header',
693  '/Event/Rec/Header' ]
694  for l in lst :
695  try :
696  n = evt[l].evtNumber()
697  return n
698  except :
699  # No evt number at this path
700  continue
701 
702  # second attepmt : try DAQ/RawEvent data
703  # The Evt Number is in bank type 16, bank 0, data pt 4
704  try :
705  n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
706  return n
707  except :
708  pass
709 
710  # Default Action
711  return n
712 
713 # ================================= EOF =======================================
714 
715 
def getEventNumber
Definition: pTools.py:687
string type
Definition: gaudirun.py:126