Gaudi Framework, version v23r5

Home   Generated: Wed Nov 28 2012
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
pTools.py
Go to the documentation of this file.
1 from GaudiPython import gbl, SUCCESS, FAILURE
2 from multiprocessing import Event
3 import pickle, time
4 
5 # Eoin Smith
6 # 3 Aug 2010
7 
8 #
9 # This script contains the ancillary classes and functions used in the
10 # GaudiPython Parallel model
11 #
12 # Classes :
13 # - HistoAgent : In charge of extracting Histograms from their Transient Store
14 # on a reader/worker, communicating them to the writer, and on
15 # the writer receiving them and rebuilding a single store
16 #
17 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
18 #
19 # - LumiFSR : FSR data from different workers needs to be carefully merged to
20 # replicate the serial version; this class aids in that task by
21 # representing an LHCb LumiFSR object as a python class
22 #
23 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
24 # differences in serial and parallel output
25 #
26 # - Syncer : This class is responsible for syncing processes for a specified
27 # section of execution. For example, one Syncer object might be
28 # syncing Initialisation, one for Running, one for Finalisation.
29 # Syncer uses multiprocessing.Event() objects as flags which are
30 # visible across the N processes sharing them.
31 # IMPORTANT : The Syncer objects in the GaudiPython Parallel model
32 # ensure that there is no hanging; they in effect, allow a timeout
33 # for Initialisation, Run, Finalise on all processes
34 #
35 # - SyncMini : A class wrapper for a multiprocessing.Event() object
36 #
37 # Methods :
38 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
39 # ( AppMgr().evtsvc() ) to this to get the current
40 # Event Number as an integer (even from RawEvents!)
41 #
42 
43 
44 # used to convert stored histos (in AIDA format) to ROOT format
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
46 
47 # =========================== Classes =========================================
48 
49 class HistoAgent( ) :
50  def __init__( self, gmpComponent ) :
51  self._gmpc = gmpComponent
52  self.hvt = self._gmpc.hvt
53  self.histos = []
54  self.qin = self._gmpc.hq
55  self.log = self._gmpc.log
56 
57  # There are many methods for booking Histogram Objects to Histo store
58  # here they are collected in a dictionary, with key = a relevant name
59  self.bookingDict = {}
60  self.bookingDict['DataObject'] = self.bookDataObject
61  self.bookingDict['NTuple::Directory'] = self.bookDataObject
62  self.bookingDict['NTuple::File'] = self.bookDataObject
63  self.bookingDict['TH1D'] = self.bookTH1D
64  self.bookingDict['TH2D'] = self.bookTH2D
65  self.bookingDict['TH3D'] = self.bookTH3D
66  self.bookingDict['TProfile'] = self.bookTProfile
67  self.bookingDict['TProfile2D'] = self.bookTProfile2D
68 
69  def register( self, tup ) :
70  # add a tuple of (worker-id, histoDict) to self.histos
71  assert tup.__class__.__name__ == 'tuple'
72  self.histos.append( tup )
73 
74  def Receive( self ) :
75  hstatus = self._gmpc.nWorkers+1 # +1 for the Reader!
76  while True :
77  tup = self.qin.get()
78  if tup == 'HISTOS_SENT' :
79  self.log.debug('received HISTOS_SENT message')
80  hstatus -= 1
81  if not hstatus : break
82  else :
83  self.register( tup )
84  self._gmpc.sEvent.set()
85  self.log.info('Writer received all histo bundles and set sync event')
86  return SUCCESS
87 
88 
89  def RebuildHistoStore( self ) :
90  '''
91  Rebuild the Histogram Store from the histos received by Receive()
92  If we have a histo which is not in the store,
93  book and fill it according to self.bookingDict
94  If we have a histo with a matching histo in the store,
95  add the two histos, remembering that aida2root must be used on
96  the Stored histo for compatibility.
97  '''
98  errors = 0
99  for tup in self.histos :
100  workerID, histDict = tup
101  added = 0 ; registered = 0; booked = 0
102  for n in histDict.keys() :
103  o = histDict[ n ]
104  obj = self.hvt.retrieve( n )
105  if obj :
106  try :
107  aida2root(obj).Add(o)
108  except :
109  self.log.warning('FAILED TO ADD : %s'%(str(obj)))
110  errors += 1
111  added += 1
112  else :
113  if o.__class__.__name__ in self.bookingDict.keys() :
114  try :
115  self.bookingDict[o.__class__.__name__](n, o)
116  except :
117  self.log.warning('FAILED TO REGISTER : %s\tto%s'\
118  %(o.__class__.__name__, n))
119  errors += 1
120  else :
121  self.log.warning( 'No booking method for: %s\t%s\t%s'\
122  %(n,type(o),o.__class__.__name__) )
123  errors += 1
124  booked += 1
125  hs = self.hvt.getHistoNames()
126  self.log.info( 'Histo Store Rebuilt : ' )
127  self.log.info( ' Contains %i objects.'%(len(hs)) )
128  self.log.info( ' Errors in Rebuilding : %i'%(errors) )
129  return SUCCESS
130 
131 
132  def bookDataObject( self, n, o ):
133  '''
134  Register a DataObject to the Histo Store
135  '''
136  self._gmpc.hvt.registerObject( n, o )
137 
138  def bookTH1D( self, n, o ) :
139  '''
140  Register a ROOT 1D THisto to the Histo Store
141  '''
142  obj = self.hvt._ihs.book( n, o.GetTitle(),\
143  o.GetXaxis().GetNbins(),\
144  o.GetXaxis().GetXmin(),\
145  o.GetXaxis().GetXmax() )
146  aida2root(obj).Add(o)
147 
148  def bookTH2D( self, n, o ) :
149  '''
150  Register a ROOT 2D THisto to the Histo Store
151  '''
152  obj = self.hvt._ihs.book( n, o.GetTitle(),\
153  o.GetXaxis().GetNbins(),\
154  o.GetXaxis().GetXmin(),\
155  o.GetXaxis().GetXmax(),\
156  o.GetYaxis().GetNbins(),\
157  o.GetYaxis().GetXmin(),\
158  o.GetYaxis().GetXmax() )
159  aida2root(obj).Add(o)
160 
161  def bookTH3D( self, n, o ) :
162  '''
163  Register a ROOT 3D THisto to the Histo Store
164  '''
165  obj = self.hvt._ihs.book( n, o.GetTitle(),\
166  o.GetXaxis().GetXbins(),\
167  o.GetXaxis().GetXmin(),\
168  o.GetXaxis().GetXmax(),\
169  o.GetYaxis().GetXbins(),\
170  o.GetYaxis().GetXmin(),\
171  o.GetYaxis().GetXmax(),\
172  o.GetZaxis().GetXbins(),\
173  o.GetZaxis().GetXmin(),\
174  o.GetZaxis().GetXmax() )
175  aida2root(obj).Add(o)
176 
177  def bookTProfile( self, n, o ) :
178  '''
179  Register a ROOT TProfile to the Histo Store
180  '''
181  obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
182  o.GetXaxis().GetNbins(),\
183  o.GetXaxis().GetXmin(),\
184  o.GetXaxis().GetXmax(),\
185  o.GetOption() )
186  aida2root(obj).Add(o)
187 
188  def bookTProfile2D( self, n, o ) :
189  '''
190  Register a ROOT TProfile2D to the Histo Store
191  '''
192  obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
193  o.GetXaxis().GetNbins(),\
194  o.GetXaxis().GetXmin(),\
195  o.GetXaxis().GetXmax(),\
196  o.GetYaxis().GetNbins(),\
197  o.GetYaxis().GetXmin(),\
198  o.GetYaxis().GetXmax() )
199  aida2root(obj).Add(o)
200 
201 # =============================================================================
202 
204  def __init__( self, gmpComponent ) :
205  self._gmpc = gmpComponent
206  self.fsr = self._gmpc.fsr
207  self.q = self._gmpc.fq
208  self.log = self._gmpc.log
209  self.objectsIn = [] # used for collecting FSR store objects
210  self.objectsOut = []
211 
212  def localCmp( self, tupA, tupB ) :
213  # sort tuples by a particular element
214  # for the sort() method
215  ind = 0
216  valA = tupA[ind]
217  valB = tupB[ind]
218  if valA<valB : return -1
219  elif valA>valB : return 1
220  else : return 0
221 
222 
223  def SendFileRecords( self ) :
224  # send the FileRecords data as part of finalisation
225 
226  # Take Care of FileRecords!
227  # There are two main things to consider here
228  # 1) The DataObjects in the FileRecords Transient Store
229  # 2) The fact that they are Keyed Containers, containing other objects
230  #
231  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
232  # a completeness guarantee,
233  #
234  # send in form ( nodeID, path, object)
235  self.log.info('Sending FileRecords...')
236  lst = self.fsr.getHistoNames()
237 
238  # Check Validity
239  if not lst :
240  self.log.info('No FileRecords Data to send to Writer.')
241  self.q.put( 'END_FSR' )
242  return SUCCESS
243 
244  # no need to send the root node
245  if '/FileRecords' in lst : lst.remove('/FileRecords')
246 
247  for l in lst :
248  o = self.fsr.retrieveObject( l )
249  if hasattr(o, "configureDirectAccess") :
250  o.configureDirectAccess()
251  # lead worker sends everything, as completeness guarantee
252  if self._gmpc.nodeID == 0 :
253  self.objectsOut.append( (0, l, pickle.dumps(o)) )
254  else :
255  # only add the Event Counter
256  # and non-Empty Keyed Containers (ignore empty ones)
257  if l == '/FileRecords/EventCountFSR' :
258  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
259  self.objectsOut.append( tup )
260  else :
261  # It's a Keyed Container
262  assert "KeyedContainer" in o.__class__.__name__
263  nObjects = o.numberOfObjects()
264  if nObjects :
265  self.log.debug("Keyed Container %s with %i objects"\
266  %(l, nObjects))
267  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
268  self.objectsOut.append( tup )
269  self.log.debug('Done with FSR store, just to send to Writer.')
270 
271  if self.objectsOut :
272  self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
273  for ob in self.objectsOut :
274  self.log.debug('\t%s'%(ob[0]))
275  self.q.put( self.objectsOut )
276  else :
277  self.log.info('Valid FSR Store, but no data to send to Writer')
278  self.log.info('SendFSR complete')
279  self.q.put( 'END_FSR' )
280  return SUCCESS
281 
282  def Receive( self ) :
283  # Receive contents of all Workers FileRecords Transient Stores
284  self.log.info('Receiving FSR store data...')
285  nc = self._gmpc.nWorkers
286  while nc > 0 :
287  objects = self.q.get( )
288  if objects == 'END_FSR' :
289  nc -= 1
290  continue
291  if nc==0 :
292  break
293  # but if it's regular objects...
294  for o in objects :
295  self.objectsIn.append(o)
296  # Now sort it by which worker it came from
297  # an object is : (nodeID, path, pickledObject)
298  self.objectsIn.sort(cmp=self.localCmp)
299  self.log.info('All FSR data received')
300  return SUCCESS
301 
302  def Rebuild( self ) :
303  # objects is a list of (path, serializedObject) tuples
304  for sourceNode, path, serialob in self.objectsIn :
305  self.log.debug('Working with %s'%(path))
306  ob = pickle.loads(serialob)
307  if hasattr( ob, 'update' ) :
308  ob.update()
309  if hasattr( ob, 'numberOfObjects' ) :
310  nCont = ob.numberOfObjects()
311  self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
312  if sourceNode == 0 :
313  self.log.debug('Registering Object to : %s'%(path))
314  self.fsr.registerObject( path, ob )
315  else :
316  self.log.debug('Merging Object to : %s'%(path))
317  self.MergeFSRobject( sourceNode, path, ob )
318  # As RecordStream has been split into Worker and Writer parts, the
319  # count for output is wrong... fix that here, as every event received
320  # by the writer is written (validation testing occurs on the worker)
321 
322  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
323  if bool( self.fsr._idp ) : # There might not be an FSR stream (Gauss)
324  ecount = '/FileRecords/EventCountFSR'
325  if self.fsr[ecount] :
326  self.fsr[ecount].setOutput( self._gmpc.nIn )
327  self.log.info( 'Event Counter Output set : %s : %i'\
328  %(ecount, self.fsr[ecount].output()) )
329  # Do some reporting
330  self.log.debug('FSR store reconstructed!')
331  lst = self.fsr.getHistoNames()
332  if lst :
333  for l in lst :
334  ob = self.fsr.retrieveObject(l)
335  if hasattr( ob, 'configureDirectAccess' ) :
336  ob.configureDirectAccess()
337  if hasattr( ob, 'containedObjects' ) :
338  # if ob.numberOfObjects() :
339  self.log.debug('\t%s (cont. objects : %i)'\
340  %(l, ob.numberOfObjects()))
341  else :
342  self.log.debug('\t%s'%(l))
343  self.log.info('FSR Store fully rebuilt.')
344  return SUCCESS
345 
346  def MergeFSRobject( self, sourceNode, path, ob ) :
347  # Merge Non-Empty Keyed Container from Worker>0
348  if path == '/FileRecords/TimeSpanFSR' :
349  # TimeSpanFSR is a straightforward case
350  self.ProcessTimeSpanFSR( path, ob )
351  elif path == '/FileRecords/EventCountFSR' :
352  # Event Counter is also easy
353  self.ProcessEventCountFSR( path, ob )
354  # now other cases may not be so easy...
355  elif "KeyedContainer" in ob.__class__.__name__ :
356  # Keyed Container of LumiFSRs : extract and re-register
357  # self.ProcessLumiFSR( path, ob )
358  if "LumiFSR" in ob.__class__.__name__ :
359  self.MergeLumiFSR( path, ob )
360  else:
361  self.log.info("Skipping Merge of Keyed Container %s for %s"\
362  %(ob.__class__.__name__,path))
363 
364  def ProcessTimeSpanFSR( self, path, ob ) :
365  ob2 = self.fsr.retrieveObject( path )
366  if ob.containedObjects().size() :
367  sz = ob.containedObjects().size()
368  cob = ob2.containedObjects()[0]
369  min = cob.earliest()
370  max = cob.latest()
371  for j in xrange( sz ) :
372  cob = ob.containedObjects()[j]
373  self.log.debug( 'Adding TimeSpanFSR' )
374  if cob.earliest() < min:
375  min = cob.earliest()
376  if cob.latest() > max:
377  max = cob.latest()
378  # this is annoying: it has to be rebuilt, without a key & added
379  continue
380  tsfsr = gbl.LHCb.TimeSpanFSR()
381  tsfsr.setEarliest( min )
382  tsfsr.setLatest( max )
383  self.fsr[path].clear()
384  self.fsr[path].add( tsfsr )
385 
386  def ProcessEventCountFSR( self, path, ob ) :
387  self.log.debug('Event Count Input Addition')
388  self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
389 
390  def MergeLumiFSR( self, path, keyedC ) :
391  from ROOT import string
392  # Fetch the first lumi
393  keyedContainer = self.fsr.retrieveObject(path)
394  # The LumiFSR KeyedContainers only have one object
395  assert keyedContainer.numberOfObjects() == 1
396  l = keyedContainer.containedObject(0)
397  baseLumi = LumiFSR( l )
398  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
399  nCont = keyedC.numberOfObjects()
400  for i in xrange(nCont) :
401  obj = keyedC.containedObject(i)
402  nextLumi = LumiFSR( obj )
403  baseLumi.merge( nextLumi )
404  # Now Rebuild and ReRegister
405  newLumi = gbl.LHCb.LumiFSR()
406  for r in baseLumi.runs :
407  newLumi.addRunNumber( r )
408  for f in baseLumi.files :
409  newLumi.addFileID( string(f) )
410  for k in baseLumi.keys :
411  increment, integral = baseLumi.info[k]
412  newLumi.addInfo(k, increment, integral)
413  # clear existing Keyed Container
414  self.fsr[path].clear()
415  # Add newly merged lumiFSR
416  self.fsr[path].add(newLumi)
417  return SUCCESS
418 
419 # =============================================================================
420 
421 class LumiFSR( ) :
422  def __init__(self, lumi) :
423  # lumi looks like :
424  # { runs : 69857 69858
425  # files : root:/castor/cer.../069857_0000000006.raw
426  # info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
427 
428  # class variables
429  self.runs = []
430  self.files = []
431  self.info = {}
432  self.keys = []
433 
434  # get run numbers
435  for r in lumi.runNumbers() :
436  self.runs.append(r)
437  # get file ids
438  for f in lumi.fileIDs() :
439  self.files.append(f)
440  # Now the tricky bit, the info is not accessible via Python
441  # except as a string
442  s = str(lumi)
443  sa = s.split("info (key/incr/integral) : ")[-1]
444  sa = sa.split('/')[:-1]
445  for rec in sa :
446  k,i,t = rec.split()
447  k = int(k)
448  i = int(i)
449  t = int(t)
450  self.info[k] = (i,t)
451  self.keys = self.info.keys()
452  def merge( self, otherLumi ) :
453  assert otherLumi.__class__.__name__ == "LumiFSR"
454  # add any extra runs
455  for r in otherLumi.runs :
456  if r in self.runs :
457  pass
458  else :
459  self.runs.append( r )
460  self.runs.sort()
461  # add any extra fileIDs
462  for f in otherLumi.files :
463  if f in self.files :
464  pass
465  else :
466  self.files.append( f )
467  self.files.sort()
468  # Now add any extra records
469  for k in otherLumi.keys :
470  increment, integral = otherLumi.info[k]
471  if k in self.keys :
472  myIncrement, myIntegral = self.info[k]
473  self.info[k] = ( myIncrement+increment, myIntegral+integral )
474  else :
475  self.info[k] = ( increment, integral )
476  # don't forget to update keys
477  self.keys = self.info.keys()
478  def __repr__( self ) :
479  s = "LumiFSR Python class\n"
480  s += "\tRuns : \n"
481  for r in self.runs :
482  s += "\t\t%i\n"%(r)
483  s += "\tFiles : \n"
484  for f in self.files :
485  s += "\t\t%s\n"%(f)
486  s += "\tInfo : \n"
487  for k in self.keys :
488  increment, integral = self.info[k]
489  s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
490  return s
491 
492 # =============================================================================
493 
494 class PackedCaloHypo() :
495  def __init__(self, o) :
496  cl = 'LHCb::PackedCaloHypo'
497  assert o.__class__.__name__ == cl
498  self.centX = o.centX
499  self.centY = o.centY
500  self.cerr = (o.cerr00,o.cerr10,o.cerr11)
501  self.cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
502  self.firstCluster = o.firstCluster
503  self.firstDigit = o.firstDigit
504  self.firstHypo = o.firstHypo
505  self.hypothesis = o.hypothesis
506  self.key = o.key
507  self.lastCluster = o.lastCluster
508  self.lastDigit = o.lastDigit
509  self.lastHypo = o.lastHypo
510  self.lh = o.lh
511  self.pos = (o.posE, o.posX, o.posY)
512  self.z = o.z
513  def __repr__( self ) :
514  s = "PackedCaloHypo : \n"
515  s += "\tcentX : %s\n"%( str(self.centX) )
516  s += "\tcentY : %s\n"%( str(self.centY) )
517  s += "\tcerr : %s\n"%( str(self.cerr ) )
518  s += "\tcov : %s\n"%( str(self.cov ) )
519  s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
520  s += "\tfirstDigit : %s\n"%( str(self.firstDigit) )
521  s += "\tfirstHypo : %s\n"%( str(self.firstHypo) )
522  s += "\thypothesis : %s\n"%( str(self.hypothesis) )
523  s += "\tkey : %s\n"%( str(self.key) )
524  s += "\tlastCluster : %s\n"%( str(self.lastCluster) )
525  s += "\tlastDigit : %s\n"%( str(self.lastDigit) )
526  s += "\tlastHypo : %s\n"%( str(self.lastHypo) )
527  s += "\tlh : %s\n"%( str(self.lh ) )
528  s += "\tpos : %s\n"%( str(self.pos ) )
529  s += "\tz : %s\n"%( str(self.z ) )
530  s += "---------------------------------------\n"
531  return s
532 
533 # =============================================================================
534 
535 class SyncMini( object ) :
536  def __init__( self, event, lastEvent=None ) :
537  self.event = event
538  self.t = 0.0
539  self.lastEvent = None
540  if lastEvent :
541  self.lastEvent = lastEvent
542  def check( self ) :
543  return self.event.is_set()
544  def checkLast( self ) :
545  return self.lastEvent.is_set()
546  def reset( self ) :
547  self.event.clear()
548  self.t = time.time()
549  def getTime( self ) :
550  return self.t
551  def set( self ) :
552  self.event.set()
553  def __repr__( self ) :
554  s = "---------- SyncMini --------------\n"
555  s += " Status : %s\n"%(self.event.is_set())
556  s += " t : %5.2f\n"%(self.t)
557  if self.lastEvent :
558  s += "Last Event : %s\n"%(self.lastEvent.is_set())
559  s += "----------------------------------\n"
560  return s
561 
562 # =============================================================================
563 
564 class Syncer( object ) :
565  def __init__( self, nWorkers, log, manyEvents=False,
566  limit=None, step=None, firstEvent=None ) :
567  # Class to help synchronise the sub-processes
568  self.limit = limit
569  self.step = step
570  self.d = {}
571  self.manyEvents = manyEvents
572  for i in xrange(-2, nWorkers) :
573  self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
574  if self.manyEvents :
575  self.limitFirst = firstEvent
576  self.keys = self.d.keys()
577  self.nWorkers = nWorkers
578  self.log = log
579 
580  def syncAll( self, step="Not specified" ) :
581  # is it this method, or is it the rolling version needed?
582  # if so, drop through...
583 
584  if self.manyEvents :
585  sc = self.syncAllRolling( )
586  return sc
587 
588  # Regular version ----------------------------
589  for i in xrange( 0, self.limit, self.step ) :
590  if self.checkAll( ) :
591  self.log.info('%s : All procs done @ %i s'%(step,i))
592  break
593  else :
594  time.sleep(self.step)
595 
596  # Now the time limit is up... check the status one final time
597  if self.checkAll() :
598  self.log.info("All processes : %s ok."%(step))
599  return SUCCESS
600  else :
601  self.log.critical('Some process is hanging on : %s'%(step))
602  for k in self.keys :
603  hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
604  self.log.critical( hangString )
605  return FAILURE
606 
607  def syncAllRolling( self ) :
608  # Keep track of the progress of Event processing
609  # Each process syncs after each event, so keep clearing
610  # the sync Event, and re-checking
611  # Note the time between True checks too, if the time
612  # between events exceeds singleEvent, this is considered a hang
613 
614  # set the initial time
615  begin = time.time()
616  firstEv = {}
617  timers = {}
618  for k in self.keys :
619  self.d[k].reset()
620  firstEv[k] = False
621  timers[k] = 0.0
622 
623  active = self.keys
624  while True :
625  # check the status of each sync object
626  for k in active :
627  sMini = self.d[k]
628 
629  if sMini.check() or sMini.checkLast():
630  if sMini.checkLast() and sMini.check() :
631  # if last Event set,then event loop finished
632  active.remove( k )
633  alive = time.time()-begin
634  self.log.info( "Audit : Node %i alive for %5.2f"\
635  %(k,alive) )
636  else :
637  sMini.reset()
638  else :
639  # the event still has not been checked, how long is that?
640  # is it the first Event?
641  wait = time.time()-sMini.getTime()
642  cond = wait > self.limit
643  if not firstEv[k] :
644  cond = wait > self.limitFirst
645  firstEv[k] = True
646  if cond :
647  # It is hanging!
648  self.log.critical('Single event wait : %5.2f'%(wait))
649  self.processHang()
650  return FAILURE
651 
652  # Termination Criteria : if all procs have been removed, we're done
653  if self.checkLastEvents() :
654  self.log.info('TC met for event loop')
655  break
656  else :
657  # sleep, loop again
658  time.sleep(self.step)
659 
660  self.log.info("All processes Completed all Events ok")
661  return SUCCESS
662 
663  def processHang( self ) :
664  self.log.critical('Some proc is hanging during Event processing!')
665  for k in self.keys :
666  self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
667  return
668 
669  def checkAll( self ) :
670  # Check the status of each Sync object
671  # return True or False
672  currentStatus = [ mini.check() for mini in self.d.values() ]
673  return all( currentStatus )
674 
675  def checkLastEvents( self ) :
676  # check if all of the lastEvents are set to true in self.d[k][1]
677  stat = [ sMini.checkLast() for sMini in self.d.values() ]
678  return all(stat)
679 
680 # =========================== Methods =========================================
681 
682 def getEventNumber( evt ) :
683  # The class-independent version of the Event Number Retrieval method
684  #
685  n = None
686  # First Attempt : Unpacked Event Data
687  lst = [ '/Event/Gen/Header',
688  '/Event/Rec/Header' ]
689  for l in lst :
690  try :
691  n = evt[l].evtNumber()
692  return n
693  except :
694  # No evt number at this path
695  continue
696 
697  # second attepmt : try DAQ/RawEvent data
698  # The Evt Number is in bank type 16, bank 0, data pt 4
699  try :
700  n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
701  return n
702  except :
703  pass
704 
705  # Default Action
706  return n
707 
708 # ================================= EOF =======================================
709 
710 

Generated at Wed Nov 28 2012 12:17:16 for Gaudi Framework, version v23r5 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004