Gaudi Framework, version v22r0

Home   Generated: 9 Feb 2011

pTools.py

Go to the documentation of this file.
00001 from GaudiPython import gbl, SUCCESS, FAILURE
00002 from multiprocessing import Event
00003 import pickle, time
00004 
00005 # Eoin Smith
00006 # 3 Aug 2010
00007 
00008 #
00009 # This script contains the ancillary classes and functions used in the
00010 #  GaudiPython Parallel model
00011 #
00012 # Classes :
00013 # - HistoAgent : In charge of extracting Histograms from their Transient Store
00014 #                 on a reader/worker, communicating them to the writer, and on
00015 #                 the writer receiving them and rebuilding a single store
00016 #
00017 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
00018 #
00019 # - LumiFSR : FSR data from different workers needs to be carefully merged to
00020 #              replicate the serial version; this class aids in that task by
00021 #              representing an LHCb LumiFSR object as a python class
00022 #
00023 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
00024 #                      differences in serial and parallel output
00025 #
00026 # - Syncer : This class is responsible for syncing processes for a specified
00027 #             section of execution.  For example, one Syncer object might be
00028 #             syncing Initialisation, one for Running, one for Finalisation.
00029 #             Syncer uses multiprocessing.Event() objects as flags which are
00030 #             visible across the N processes sharing them.
00031 #             IMPORTANT : The Syncer objects in the GaudiPython Parallel model
00032 #              ensure that there is no hanging; they in effect, allow a timeout
00033 #              for Initialisation, Run, Finalise on all processes
00034 #
00035 # - SyncMini : A class wrapper for a multiprocessing.Event() object
00036 #
00037 # Methods :
00038 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
00039 #                          ( AppMgr().evtsvc() ) to this to get the current
00040 #                          Event Number as an integer (even from RawEvents!)
00041 #
00042 
00043 
00044 # used to convert stored histos (in AIDA format) to ROOT format
00045 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00046 
00047 # =========================== Classes =========================================
00048 
00049 class HistoAgent( ) :
00050     def __init__( self, gmpComponent ) :
00051         self._gmpc = gmpComponent
00052         self.hvt = self._gmpc.hvt
00053         self.histos = []
00054         self.qin = self._gmpc.hq
00055         self.log = self._gmpc.log
00056 
00057         # There are many methods for booking Histogram Objects to Histo store
00058         # here they are collected in a dictionary, with key = a relevant name
00059         self.bookingDict = {}
00060         self.bookingDict['DataObject']        = self.bookDataObject
00061         self.bookingDict['NTuple::Directory'] = self.bookDataObject
00062         self.bookingDict['NTuple::File']      = self.bookDataObject
00063         self.bookingDict['TH1D']       = self.bookTH1D
00064         self.bookingDict['TH2D']       = self.bookTH2D
00065         self.bookingDict['TH3D']       = self.bookTH3D
00066         self.bookingDict['TProfile']   = self.bookTProfile
00067         self.bookingDict['TProfile2D'] = self.bookTProfile2D
00068 
00069     def register( self, tup ) :
00070         # add a tuple of (worker-id, histoDict) to self.histos
00071         assert tup.__class__.__name__ == 'tuple'
00072         self.histos.append( tup )
00073 
00074     def Receive( self ) :
00075         hstatus = self._gmpc.nWorkers+1    # +1 for the Reader!
00076         while True :
00077             tup = self.qin.get()
00078             if tup == 'HISTOS_SENT' :
00079                 self.log.debug('received HISTOS_SENT message')
00080                 hstatus -= 1
00081                 if not hstatus : break
00082             else   :
00083               self.register( tup )
00084         self._gmpc.sEvent.set()
00085         self.log.info('Writer received all histo bundles and set sync event')
00086         return SUCCESS
00087 
00088 
00089     def RebuildHistoStore( self ) :
00090         '''
00091         Rebuild the Histogram Store from the histos received by Receive()
00092         If we have a histo which is not in the store,
00093         book and fill it according to self.bookingDict
00094         If we have a histo with a matching histo in the store,
00095         add the two histos, remembering that aida2root must be used on
00096         the Stored histo for compatibility.
00097         '''
00098         errors = 0
00099         for tup in self.histos :
00100             workerID, histDict = tup
00101             added = 0 ; registered = 0; booked = 0
00102             for n in histDict.keys() :
00103                 o = histDict[ n ]
00104                 obj = self.hvt.retrieve( n )
00105                 if obj :
00106                     try    :
00107                         aida2root(obj).Add(o)
00108                     except :
00109                         self.log.warning('FAILED TO ADD : %s'%(str(obj)))
00110                         errors += 1
00111                     added += 1
00112                 else :
00113                     if o.__class__.__name__ in self.bookingDict.keys() :
00114                         try    :
00115                             self.bookingDict[o.__class__.__name__](n, o)
00116                         except :
00117                             self.log.warning('FAILED TO REGISTER : %s\tto%s'\
00118                                              %(o.__class__.__name__, n))
00119                             errors += 1
00120                     else :
00121                         self.log.warning( 'No booking method for: %s\t%s\t%s'\
00122                                           %(n,type(o),o.__class__.__name__) )
00123                         errors += 1
00124                     booked += 1
00125         hs = self.hvt.getHistoNames()
00126         self.log.info( 'Histo Store Rebuilt : ' )
00127         self.log.info( '  Contains %i objects.'%(len(hs)) )
00128         self.log.info( '  Errors in Rebuilding : %i'%(errors) )
00129         return SUCCESS
00130 
00131 
00132     def bookDataObject( self, n, o ):
00133         '''
00134         Register a DataObject to the Histo Store
00135         '''
00136         self._gmpc.hvt.registerObject( n, o )
00137 
00138     def bookTH1D( self, n, o ) :
00139         '''
00140         Register a ROOT 1D THisto to the Histo Store
00141         '''
00142         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00143                                      o.GetXaxis().GetNbins(),\
00144                                      o.GetXaxis().GetXmin(),\
00145                                      o.GetXaxis().GetXmax() )
00146         aida2root(obj).Add(o)
00147 
00148     def bookTH2D( self, n, o ) :
00149         '''
00150         Register a ROOT 2D THisto to the Histo Store
00151         '''
00152         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00153                                      o.GetXaxis().GetNbins(),\
00154                                      o.GetXaxis().GetXmin(),\
00155                                      o.GetXaxis().GetXmax(),\
00156                                      o.GetYaxis().GetNbins(),\
00157                                      o.GetYaxis().GetXmin(),\
00158                                      o.GetYaxis().GetXmax() )
00159         aida2root(obj).Add(o)
00160 
00161     def bookTH3D( self, n, o ) :
00162         '''
00163         Register a ROOT 3D THisto to the Histo Store
00164         '''
00165         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00166                                      o.GetXaxis().GetXbins(),\
00167                                      o.GetXaxis().GetXmin(),\
00168                                      o.GetXaxis().GetXmax(),\
00169                                      o.GetYaxis().GetXbins(),\
00170                                      o.GetYaxis().GetXmin(),\
00171                                      o.GetYaxis().GetXmax(),\
00172                                      o.GetZaxis().GetXbins(),\
00173                                      o.GetZaxis().GetXmin(),\
00174                                      o.GetZaxis().GetXmax() )
00175         aida2root(obj).Add(o)
00176 
00177     def bookTProfile( self, n, o ) :
00178         '''
00179         Register a ROOT TProfile to the Histo Store
00180         '''
00181         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00182                                          o.GetXaxis().GetNbins(),\
00183                                          o.GetXaxis().GetXmin(),\
00184                                          o.GetXaxis().GetXmax(),\
00185                                          o.GetOption() )
00186         aida2root(obj).Add(o)
00187 
00188     def bookTProfile2D( self, n, o ) :
00189         '''
00190         Register a ROOT TProfile2D to the Histo Store
00191         '''
00192         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00193                                          o.GetXaxis().GetNbins(),\
00194                                          o.GetXaxis().GetXmin(),\
00195                                          o.GetXaxis().GetXmax(),\
00196                                          o.GetYaxis().GetNbins(),\
00197                                          o.GetYaxis().GetXmin(),\
00198                                          o.GetYaxis().GetXmax()  )
00199         aida2root(obj).Add(o)
00200 
00201 # =============================================================================
00202 
00203 class FileRecordsAgent( ) :
00204     def __init__( self, gmpComponent ) :
00205         self._gmpc = gmpComponent
00206         self.fsr = self._gmpc.fsr
00207         self.q   = self._gmpc.fq
00208         self.log = self._gmpc.log
00209         self.objectsIn  = []   # used for collecting FSR store objects
00210         self.objectsOut = []
00211 
00212     def localCmp( self, tupA, tupB ) :
00213         # sort tuples by a particular element
00214         # for the sort() method
00215         ind  = 0
00216         valA = tupA[ind]
00217         valB = tupB[ind]
00218         if   valA<valB : return -1
00219         elif valA>valB : return  1
00220         else           : return  0
00221 
00222 
00223     def SendFileRecords( self ) :
00224         # send the FileRecords data as part of finalisation
00225 
00226         # Take Care of FileRecords!
00227         # There are two main things to consider here
00228         # 1) The DataObjects in the FileRecords Transient Store
00229         # 2) The fact that they are Keyed Containers, containing other objects
00230         #
00231         # The Lead Worker, nodeID=0, sends everything in the FSR store, as
00232         #   a completeness guarantee,
00233         #
00234         # send in form ( nodeID, path, object)
00235         self.log.info('Sending FileRecords...')
00236         lst      = self.fsr.getHistoNames()
00237 
00238         # Check Validity
00239         if not lst :
00240             self.log.info('No FileRecords Data to send to Writer.')
00241             self.q.put( 'END_FSR' )
00242             return SUCCESS
00243 
00244         # no need to send the root node
00245         if '/FileRecords' in lst : lst.remove('/FileRecords')
00246 
00247         for l in lst :
00248             o = self.fsr.retrieveObject( l )
00249             if hasattr(o, "configureDirectAccess") :
00250                 o.configureDirectAccess()
00251             # lead worker sends everything, as completeness guarantee
00252             if self._gmpc.nodeID == 0 :
00253                 self.objectsOut.append( (0, l, pickle.dumps(o)) )
00254             else :
00255                 # only add the Event Counter
00256                 # and non-Empty Keyed Containers (ignore empty ones)
00257                 if l == '/FileRecords/EventCountFSR' :
00258                     tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00259                     self.objectsOut.append( tup )
00260                 else :
00261                     # It's a Keyed Container
00262                     assert "KeyedContainer" in o.__class__.__name__
00263                     nObjects = o.numberOfObjects()
00264                     if nObjects :
00265                         self.log.debug("Keyed Container %s with %i objects"\
00266                                        %(l, nObjects))
00267                         tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00268                         self.objectsOut.append( tup )
00269         self.log.debug('Done with FSR store, just to send to Writer.')
00270 
00271         if self.objectsOut :
00272             self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
00273             for ob in self.objectsOut :
00274                 self.log.debug('\t%s'%(ob[0]))
00275             self.q.put( self.objectsOut )
00276         else :
00277             self.log.info('Valid FSR Store, but no data to send to Writer')
00278         self.log.info('SendFSR complete')
00279         self.q.put( 'END_FSR' )
00280         return SUCCESS
00281 
00282     def Receive( self ) :
00283         # Receive contents of all Workers FileRecords Transient Stores
00284         self.log.info('Receiving FSR store data...')
00285         nc = self._gmpc.nWorkers
00286         while nc > 0 :
00287             objects = self.q.get( )
00288             if objects == 'END_FSR' :
00289                 nc -= 1
00290                 continue
00291                 if nc==0 :
00292                     break
00293             # but if it's regular objects...
00294             for o in objects :
00295                 self.objectsIn.append(o)
00296         # Now sort it by which worker it came from
00297         # an object is : (nodeID, path, pickledObject)
00298         self.objectsIn.sort(cmp=self.localCmp)
00299         self.log.info('All FSR data received')
00300         return SUCCESS
00301 
00302     def Rebuild( self ) :
00303         # objects is a list of (path, serializedObject) tuples
00304         for sourceNode, path, serialob in self.objectsIn :
00305             self.log.debug('Working with %s'%(path))
00306             ob = pickle.loads(serialob)
00307             if hasattr( ob, "configureDirectAccess" ) :
00308                 ob.configureDirectAccess()
00309                 nCont = ob.numberOfObjects()
00310                 self.log.debug( '\tcontainedObjects : %i'%(nCont) )
00311             if sourceNode == 0 :
00312                 self.log.debug('Registering Object to : %s'%(path))
00313                 self.fsr.registerObject( path, ob )
00314             else :
00315                 self.log.debug('Merging Object to : %s'%(path))
00316                 self.MergeFSRobject( sourceNode, path, ob )
00317         # As RecordStream has been split into Worker and Writer parts, the
00318         # count for output is wrong... fix that here, as every event received
00319         # by the writer is written (validation testing occurs on the worker)
00320 
00321         self.log.info('FSR Store Rebuilt.  Correcting EventCountFSR')
00322         if bool( self.fsr._idp ) :   # There might not be an FSR stream (Gauss)
00323             ecount  = '/FileRecords/EventCountFSR'
00324             if self.fsr[ecount] :
00325                 self.fsr[ecount].setOutput( self._gmpc.nIn )
00326                 self.log.info( 'Event Counter Output set : %s : %i'\
00327                                %(ecount, self.fsr[ecount].output()) )
00328             # Do some reporting
00329             self.log.debug('FSR store reconstructed!')
00330             lst = self.fsr.getHistoNames()
00331             if lst :
00332                 for l in lst :
00333                     ob = self.fsr.retrieveObject(l)
00334                     if hasattr( ob, 'configureDirectAccess' ) :
00335                         ob.configureDirectAccess()
00336                     if hasattr( ob, 'containedObjects' ) :
00337                         # if ob.numberOfObjects() :
00338                         self.log.debug('\t%s (cont. objects : %i)'\
00339                                        %(l, ob.numberOfObjects()))
00340                     else :
00341                         self.log.debug('\t%s'%(l))
00342         self.log.info('FSR Store fully rebuilt.')
00343         return SUCCESS
00344 
00345     def MergeFSRobject( self, sourceNode, path, ob ) :
00346         # Merge Non-Empty Keyed Container from Worker>0
00347         if path == '/FileRecords/TimeSpanFSR' :
00348             # TimeSpanFSR is a straightforward case
00349             self.ProcessTimeSpanFSR( path, ob )
00350         elif path == '/FileRecords/EventCountFSR' :
00351             # Event Counter is also easy
00352             self.ProcessEventCountFSR( path, ob )
00353         # now other cases may not be so easy...
00354         elif "KeyedContainer" in ob.__class__.__name__ :
00355             # Keyed Container of LumiFSRs : extract and re-register
00356             # self.ProcessLumiFSR( path, ob )
00357             if "LumiFSR" in ob.__class__.__name__ :
00358                 self.MergeLumiFSR( path, ob )
00359             else:
00360                 self.log.info("Skipping Merge of Keyed Container %s for %s"\
00361                               %(ob.__class__.__name__,path))
00362 
00363     def ProcessTimeSpanFSR( self, path, ob ) :
00364         if ob.containedObjects().size() :
00365             sz = ob.containedObjects().size()
00366             for j in xrange(sz) :
00367                 cob = ob.containedObjects()[j]
00368                 self.log.debug('Adding TimeSpanFSR')
00369                 # this is annoying: it has to be rebuilt, without a key & added
00370                 tsfsr = gbl.LHCb.TimeSpanFSR()
00371                 tsfsr.setEarliest( cob.earliest() )
00372                 tsfsr.setLatest(   cob.latest(  ) )
00373                 self.fsr[path].add(tsfsr)
00374                 continue
00375 
00376     def ProcessEventCountFSR( self, path, ob ) :
00377         self.log.debug('Event Count Input Addition')
00378         self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
00379 
00380     def MergeLumiFSR( self, path, keyedC ) :
00381         from ROOT import string
00382         # Fetch the first lumi
00383         keyedContainer = self.fsr.retrieveObject(path)
00384         # The LumiFSR KeyedContainers only have one object
00385         assert keyedContainer.numberOfObjects() == 1
00386         l = keyedContainer.containedObject(0)
00387         baseLumi = LumiFSR( l )
00388         # Now deal with the argument Non-empty Keyed Container of LumiFSRs
00389         nCont = keyedC.numberOfObjects()
00390         for i in xrange(nCont) :
00391             obj = keyedC.containedObject(i)
00392             nextLumi = LumiFSR( obj )
00393             baseLumi.merge( nextLumi )
00394         # Now Rebuild and ReRegister
00395         newLumi = gbl.LHCb.LumiFSR()
00396         for r in baseLumi.runs :
00397             newLumi.addRunNumber( r )
00398         for f in baseLumi.files :
00399             newLumi.addFileID( string(f) )
00400         for k in baseLumi.keys :
00401             increment, integral = baseLumi.info[k]
00402             newLumi.addInfo(k, increment, integral)
00403         # clear existing Keyed Container
00404         self.fsr[path].clear()
00405         # Add newly merged lumiFSR
00406         self.fsr[path].add(newLumi)
00407         return SUCCESS
00408 
00409 # =============================================================================
00410 
00411 class LumiFSR( ) :
00412     def __init__(self, lumi) :
00413         # lumi looks like :
00414         # {  runs : 69857 69858
00415         #    files : root:/castor/cer.../069857_0000000006.raw
00416         #    info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
00417 
00418         # class variables
00419         self.runs  = []
00420         self.files = []
00421         self.info  = {}
00422         self.keys  = []
00423 
00424         # get run numbers
00425         for r in lumi.runNumbers() :
00426             self.runs.append(r)
00427         # get file ids
00428         for f in lumi.fileIDs() :
00429             self.files.append(f)
00430         # Now the tricky bit, the info is not accessible via Python
00431         # except as a string
00432         s = str(lumi)
00433         sa = s.split("info (key/incr/integral) : ")[-1]
00434         sa = sa.split('/')[:-1]
00435         for rec in sa :
00436             k,i,t = rec.split()
00437             k = int(k)
00438             i = int(i)
00439             t = int(t)
00440             self.info[k] = (i,t)
00441         self.keys = self.info.keys()
00442     def merge( self, otherLumi ) :
00443         assert otherLumi.__class__.__name__ == "LumiFSR"
00444         # add any extra runs
00445         for r in otherLumi.runs :
00446             if r in self.runs :
00447                 pass
00448             else :
00449                 self.runs.append( r )
00450         self.runs.sort()
00451         # add any extra fileIDs
00452         for f in otherLumi.files :
00453             if f in self.files :
00454                 pass
00455             else :
00456                 self.files.append( f )
00457         self.files.sort()
00458         # Now add any extra records
00459         for k in otherLumi.keys :
00460             increment, integral = otherLumi.info[k]
00461             if k in self.keys :
00462                 myIncrement, myIntegral = self.info[k]
00463                 self.info[k] = ( myIncrement+increment, myIntegral+integral )
00464             else :
00465                 self.info[k] = ( increment, integral )
00466         # don't forget to update keys
00467         self.keys = self.info.keys()
00468     def __repr__( self ) :
00469         s  = "LumiFSR Python class\n"
00470         s += "\tRuns : \n"
00471         for r in self.runs :
00472             s += "\t\t%i\n"%(r)
00473         s +=  "\tFiles : \n"
00474         for f in self.files :
00475             s += "\t\t%s\n"%(f)
00476         s += "\tInfo : \n"
00477         for k in self.keys :
00478             increment, integral = self.info[k]
00479             s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
00480         return s
00481 
00482 # =============================================================================
00483 
00484 class PackedCaloHypo() :
00485     def __init__(self, o) :
00486       cl = 'LHCb::PackedCaloHypo'
00487       assert o.__class__.__name__ == cl
00488       self.centX = o.centX
00489       self.centY = o.centY
00490       self.cerr  = (o.cerr00,o.cerr10,o.cerr11)
00491       self.cov   = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
00492       self.firstCluster = o.firstCluster
00493       self.firstDigit   = o.firstDigit
00494       self.firstHypo    = o.firstHypo
00495       self.hypothesis   = o.hypothesis
00496       self.key          = o.key
00497       self.lastCluster  = o.lastCluster
00498       self.lastDigit    = o.lastDigit
00499       self.lastHypo     = o.lastHypo
00500       self.lh           = o.lh
00501       self.pos          = (o.posE, o.posX, o.posY)
00502       self.z            = o.z
00503     def __repr__( self ) :
00504         s  = "PackedCaloHypo : \n"
00505         s += "\tcentX        : %s\n"%( str(self.centX) )
00506         s += "\tcentY        : %s\n"%( str(self.centY) )
00507         s += "\tcerr         : %s\n"%( str(self.cerr ) )
00508         s += "\tcov          : %s\n"%( str(self.cov  ) )
00509         s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
00510         s += "\tfirstDigit   : %s\n"%( str(self.firstDigit) )
00511         s += "\tfirstHypo    : %s\n"%( str(self.firstHypo) )
00512         s += "\thypothesis   : %s\n"%( str(self.hypothesis) )
00513         s += "\tkey          : %s\n"%( str(self.key) )
00514         s += "\tlastCluster  : %s\n"%( str(self.lastCluster) )
00515         s += "\tlastDigit    : %s\n"%( str(self.lastDigit) )
00516         s += "\tlastHypo     : %s\n"%( str(self.lastHypo) )
00517         s += "\tlh           : %s\n"%( str(self.lh   ) )
00518         s += "\tpos          : %s\n"%( str(self.pos  ) )
00519         s += "\tz            : %s\n"%( str(self.z  ) )
00520         s += "---------------------------------------\n"
00521         return s
00522 
00523 # =============================================================================
00524 
00525 class SyncMini( object ) :
00526     def __init__( self, event, lastEvent=None ) :
00527         self.event  = event
00528         self.t      = 0.0
00529         self.lastEvent = None
00530         if lastEvent :
00531             self.lastEvent = lastEvent
00532     def check( self ) :
00533         return self.event.is_set()
00534     def checkLast( self ) :
00535         return self.lastEvent.is_set()
00536     def reset( self ) :
00537         self.event.clear()
00538         self.t = time.time()
00539     def getTime( self ) :
00540         return self.t
00541     def set( self ) :
00542         self.event.set()
00543     def __repr__( self ) :
00544         s  = "---------- SyncMini --------------\n"
00545         s += "    Status : %s\n"%(self.event.is_set())
00546         s += "         t : %5.2f\n"%(self.t)
00547         if self.lastEvent :
00548             s += "Last Event : %s\n"%(self.lastEvent.is_set())
00549         s += "----------------------------------\n"
00550         return s
00551 
00552 # =============================================================================
00553 
00554 class Syncer( object ) :
00555     def __init__( self, nWorkers, log, manyEvents=False,
00556                   limit=None, step=None, firstEvent=None ) :
00557         # Class to help synchronise the sub-processes
00558         self.limit  = limit
00559         self.step   = step
00560         self.d = {}
00561         self.manyEvents = manyEvents
00562         for i in xrange(-2, nWorkers) :
00563             self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
00564             if self.manyEvents :
00565                 self.limitFirst = firstEvent
00566         self.keys       = self.d.keys()
00567         self.nWorkers   = nWorkers
00568         self.log        = log
00569 
00570     def syncAll( self, step="Not specified" ) :
00571         # is it this method, or is it the rolling version needed?
00572         # if so, drop through...
00573 
00574         if self.manyEvents :
00575             sc = self.syncAllRolling( )
00576             return sc
00577 
00578         # Regular version ----------------------------
00579         for i in xrange( 0, self.limit, self.step ) :
00580             if self.checkAll( ) :
00581                 self.log.info('%s : All procs done @ %i s'%(step,i))
00582                 break
00583             else :
00584                 time.sleep(self.step)
00585 
00586         # Now the time limit is up... check the status one final time
00587         if self.checkAll() :
00588             self.log.info("All processes : %s ok."%(step))
00589             return SUCCESS
00590         else :
00591             self.log.critical('Some process is hanging on : %s'%(step))
00592             for k in self.keys :
00593                 hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
00594                 self.log.critical( hangString )
00595             return FAILURE
00596 
00597     def syncAllRolling( self ) :
00598         # Keep track of the progress of Event processing
00599         # Each process syncs after each event, so keep clearing
00600         #  the sync Event, and re-checking
00601         # Note the time between True checks too, if the time
00602         #  between events exceeds singleEvent, this is considered a hang
00603 
00604         # set the initial time
00605         begin   = time.time()
00606         firstEv = {}
00607         timers  = {}
00608         for k in self.keys :
00609             self.d[k].reset()
00610             firstEv[k] = False
00611             timers[k]  = 0.0
00612 
00613         active = self.keys
00614         while True :
00615             # check the status of each sync object
00616             for k in active :
00617                 sMini = self.d[k]
00618                 # print sMini
00619                 if sMini.check() :
00620                     if sMini.checkLast() :
00621                         # if last Event set,then event loop finished
00622                         active.remove( k )
00623                         alive = time.time()-begin
00624                         self.log.info( "Audit : Node %i alive for %5.2f"\
00625                                        %(k,alive) )
00626                     else :
00627                         sMini.reset()
00628                 else :
00629                     # the event still has not been checked, how long is that?
00630                     # is it the first Event?
00631                     wait = time.time()-sMini.getTime()
00632                     cond = wait > self.limit
00633                     if not firstEv[k] :
00634                         cond       = wait > self.limitFirst
00635                         firstEv[k] = True
00636                     if cond :
00637                         # It is hanging!
00638                         self.log.critical('Single event wait : %5.2f'%(wait))
00639                         self.processHang()
00640                         return FAILURE
00641 
00642             # Termination Criteria : if all procs have been removed, we're done
00643             if self.checkLastEvents() :
00644                 self.log.info('TC met for event loop')
00645                 break
00646             else :
00647                 # sleep, loop again
00648                 time.sleep(self.step)
00649 
00650         self.log.info("All processes Completed all Events ok")
00651         return SUCCESS
00652 
00653     def processHang( self ) :
00654         self.log.critical('Some proc is hanging during Event processing!')
00655         for k in self.keys :
00656             self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
00657         return
00658 
00659     def checkAll( self ) :
00660         # Check the status of each Sync object
00661         # return True or False
00662         currentStatus = [ mini.check() for mini in self.d.values() ]
00663         return all( currentStatus )
00664 
00665     def checkLastEvents( self ) :
00666        # check if all of the lastEvents are set to true in self.d[k][1]
00667        stat    = [ sMini.checkLast() for sMini in self.d.values() ]
00668        return all(stat)
00669 
00670 # =========================== Methods =========================================
00671 
00672 def getEventNumber( evt ) :
00673     # The class-independent version of the Event Number Retrieval method
00674     #
00675     n = None
00676     # First Attempt : Unpacked Event Data
00677     lst = [ '/Event/Gen/Header',
00678             '/Event/Rec/Header' ]
00679     for l in lst :
00680         try :
00681             n = evt[l].evtNumber()
00682             return n
00683         except :
00684             # No evt number at this path
00685             continue
00686 
00687     # second attepmt : try DAQ/RawEvent data
00688     # The Evt Number is in bank type 16, bank 0, data pt 4
00689     try :
00690         n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00691         return n
00692     except :
00693         pass
00694 
00695     # Default Action
00696     return n
00697 
00698 # ================================= EOF =======================================
00699 
00700 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Wed Feb 9 16:24:58 2011 for Gaudi Framework, version v22r0 by Doxygen version 1.6.2 written by Dimitri van Heesch, © 1997-2004