Gaudi Framework, version v22r4

Home   Generated: Fri Sep 2 2011

pTools.py

Go to the documentation of this file.
00001 from GaudiPython import gbl, SUCCESS, FAILURE
00002 from multiprocessing import Event
00003 import pickle, time
00004 
00005 # Eoin Smith
00006 # 3 Aug 2010
00007 
00008 #
00009 # This script contains the ancillary classes and functions used in the
00010 #  GaudiPython Parallel model
00011 #
00012 # Classes :
00013 # - HistoAgent : In charge of extracting Histograms from their Transient Store
00014 #                 on a reader/worker, communicating them to the writer, and on
00015 #                 the writer receiving them and rebuilding a single store
00016 #
00017 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
00018 #
00019 # - LumiFSR : FSR data from different workers needs to be carefully merged to
00020 #              replicate the serial version; this class aids in that task by
00021 #              representing an LHCb LumiFSR object as a python class
00022 #
00023 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
00024 #                      differences in serial and parallel output
00025 #
00026 # - Syncer : This class is responsible for syncing processes for a specified
00027 #             section of execution.  For example, one Syncer object might be
00028 #             syncing Initialisation, one for Running, one for Finalisation.
00029 #             Syncer uses multiprocessing.Event() objects as flags which are
00030 #             visible across the N processes sharing them.
00031 #             IMPORTANT : The Syncer objects in the GaudiPython Parallel model
00032 #              ensure that there is no hanging; they in effect, allow a timeout
00033 #              for Initialisation, Run, Finalise on all processes
00034 #
00035 # - SyncMini : A class wrapper for a multiprocessing.Event() object
00036 #
00037 # Methods :
00038 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
00039 #                          ( AppMgr().evtsvc() ) to this to get the current
00040 #                          Event Number as an integer (even from RawEvents!)
00041 #
00042 
00043 
00044 # used to convert stored histos (in AIDA format) to ROOT format
00045 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00046 
00047 # =========================== Classes =========================================
00048 
00049 class HistoAgent( ) :
00050     def __init__( self, gmpComponent ) :
00051         self._gmpc = gmpComponent
00052         self.hvt = self._gmpc.hvt
00053         self.histos = []
00054         self.qin = self._gmpc.hq
00055         self.log = self._gmpc.log
00056 
00057         # There are many methods for booking Histogram Objects to Histo store
00058         # here they are collected in a dictionary, with key = a relevant name
00059         self.bookingDict = {}
00060         self.bookingDict['DataObject']        = self.bookDataObject
00061         self.bookingDict['NTuple::Directory'] = self.bookDataObject
00062         self.bookingDict['NTuple::File']      = self.bookDataObject
00063         self.bookingDict['TH1D']       = self.bookTH1D
00064         self.bookingDict['TH2D']       = self.bookTH2D
00065         self.bookingDict['TH3D']       = self.bookTH3D
00066         self.bookingDict['TProfile']   = self.bookTProfile
00067         self.bookingDict['TProfile2D'] = self.bookTProfile2D
00068 
00069     def register( self, tup ) :
00070         # add a tuple of (worker-id, histoDict) to self.histos
00071         assert tup.__class__.__name__ == 'tuple'
00072         self.histos.append( tup )
00073 
00074     def Receive( self ) :
00075         hstatus = self._gmpc.nWorkers+1    # +1 for the Reader!
00076         while True :
00077             tup = self.qin.get()
00078             if tup == 'HISTOS_SENT' :
00079                 self.log.debug('received HISTOS_SENT message')
00080                 hstatus -= 1
00081                 if not hstatus : break
00082             else   :
00083               self.register( tup )
00084         self._gmpc.sEvent.set()
00085         self.log.info('Writer received all histo bundles and set sync event')
00086         return SUCCESS
00087 
00088 
00089     def RebuildHistoStore( self ) :
00090         '''
00091         Rebuild the Histogram Store from the histos received by Receive()
00092         If we have a histo which is not in the store,
00093         book and fill it according to self.bookingDict
00094         If we have a histo with a matching histo in the store,
00095         add the two histos, remembering that aida2root must be used on
00096         the Stored histo for compatibility.
00097         '''
00098         errors = 0
00099         for tup in self.histos :
00100             workerID, histDict = tup
00101             added = 0 ; registered = 0; booked = 0
00102             for n in histDict.keys() :
00103                 o = histDict[ n ]
00104                 obj = self.hvt.retrieve( n )
00105                 if obj :
00106                     try    :
00107                         aida2root(obj).Add(o)
00108                     except :
00109                         self.log.warning('FAILED TO ADD : %s'%(str(obj)))
00110                         errors += 1
00111                     added += 1
00112                 else :
00113                     if o.__class__.__name__ in self.bookingDict.keys() :
00114                         try    :
00115                             self.bookingDict[o.__class__.__name__](n, o)
00116                         except :
00117                             self.log.warning('FAILED TO REGISTER : %s\tto%s'\
00118                                              %(o.__class__.__name__, n))
00119                             errors += 1
00120                     else :
00121                         self.log.warning( 'No booking method for: %s\t%s\t%s'\
00122                                           %(n,type(o),o.__class__.__name__) )
00123                         errors += 1
00124                     booked += 1
00125         hs = self.hvt.getHistoNames()
00126         self.log.info( 'Histo Store Rebuilt : ' )
00127         self.log.info( '  Contains %i objects.'%(len(hs)) )
00128         self.log.info( '  Errors in Rebuilding : %i'%(errors) )
00129         return SUCCESS
00130 
00131 
00132     def bookDataObject( self, n, o ):
00133         '''
00134         Register a DataObject to the Histo Store
00135         '''
00136         self._gmpc.hvt.registerObject( n, o )
00137 
00138     def bookTH1D( self, n, o ) :
00139         '''
00140         Register a ROOT 1D THisto to the Histo Store
00141         '''
00142         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00143                                      o.GetXaxis().GetNbins(),\
00144                                      o.GetXaxis().GetXmin(),\
00145                                      o.GetXaxis().GetXmax() )
00146         aida2root(obj).Add(o)
00147 
00148     def bookTH2D( self, n, o ) :
00149         '''
00150         Register a ROOT 2D THisto to the Histo Store
00151         '''
00152         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00153                                      o.GetXaxis().GetNbins(),\
00154                                      o.GetXaxis().GetXmin(),\
00155                                      o.GetXaxis().GetXmax(),\
00156                                      o.GetYaxis().GetNbins(),\
00157                                      o.GetYaxis().GetXmin(),\
00158                                      o.GetYaxis().GetXmax() )
00159         aida2root(obj).Add(o)
00160 
00161     def bookTH3D( self, n, o ) :
00162         '''
00163         Register a ROOT 3D THisto to the Histo Store
00164         '''
00165         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00166                                      o.GetXaxis().GetXbins(),\
00167                                      o.GetXaxis().GetXmin(),\
00168                                      o.GetXaxis().GetXmax(),\
00169                                      o.GetYaxis().GetXbins(),\
00170                                      o.GetYaxis().GetXmin(),\
00171                                      o.GetYaxis().GetXmax(),\
00172                                      o.GetZaxis().GetXbins(),\
00173                                      o.GetZaxis().GetXmin(),\
00174                                      o.GetZaxis().GetXmax() )
00175         aida2root(obj).Add(o)
00176 
00177     def bookTProfile( self, n, o ) :
00178         '''
00179         Register a ROOT TProfile to the Histo Store
00180         '''
00181         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00182                                          o.GetXaxis().GetNbins(),\
00183                                          o.GetXaxis().GetXmin(),\
00184                                          o.GetXaxis().GetXmax(),\
00185                                          o.GetOption() )
00186         aida2root(obj).Add(o)
00187 
00188     def bookTProfile2D( self, n, o ) :
00189         '''
00190         Register a ROOT TProfile2D to the Histo Store
00191         '''
00192         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00193                                          o.GetXaxis().GetNbins(),\
00194                                          o.GetXaxis().GetXmin(),\
00195                                          o.GetXaxis().GetXmax(),\
00196                                          o.GetYaxis().GetNbins(),\
00197                                          o.GetYaxis().GetXmin(),\
00198                                          o.GetYaxis().GetXmax()  )
00199         aida2root(obj).Add(o)
00200 
00201 # =============================================================================
00202 
00203 class FileRecordsAgent( ) :
00204     def __init__( self, gmpComponent ) :
00205         self._gmpc = gmpComponent
00206         self.fsr = self._gmpc.fsr
00207         self.q   = self._gmpc.fq
00208         self.log = self._gmpc.log
00209         self.objectsIn  = []   # used for collecting FSR store objects
00210         self.objectsOut = []
00211 
00212     def localCmp( self, tupA, tupB ) :
00213         # sort tuples by a particular element
00214         # for the sort() method
00215         ind  = 0
00216         valA = tupA[ind]
00217         valB = tupB[ind]
00218         if   valA<valB : return -1
00219         elif valA>valB : return  1
00220         else           : return  0
00221 
00222 
00223     def SendFileRecords( self ) :
00224         # send the FileRecords data as part of finalisation
00225 
00226         # Take Care of FileRecords!
00227         # There are two main things to consider here
00228         # 1) The DataObjects in the FileRecords Transient Store
00229         # 2) The fact that they are Keyed Containers, containing other objects
00230         #
00231         # The Lead Worker, nodeID=0, sends everything in the FSR store, as
00232         #   a completeness guarantee,
00233         #
00234         # send in form ( nodeID, path, object)
00235         self.log.info('Sending FileRecords...')
00236         lst      = self.fsr.getHistoNames()
00237 
00238         # Check Validity
00239         if not lst :
00240             self.log.info('No FileRecords Data to send to Writer.')
00241             self.q.put( 'END_FSR' )
00242             return SUCCESS
00243 
00244         # no need to send the root node
00245         if '/FileRecords' in lst : lst.remove('/FileRecords')
00246 
00247         for l in lst :
00248             o = self.fsr.retrieveObject( l )
00249             if hasattr(o, "configureDirectAccess") :
00250                 o.configureDirectAccess()
00251             # lead worker sends everything, as completeness guarantee
00252             if self._gmpc.nodeID == 0 :
00253                 self.objectsOut.append( (0, l, pickle.dumps(o)) )
00254             else :
00255                 # only add the Event Counter
00256                 # and non-Empty Keyed Containers (ignore empty ones)
00257                 if l == '/FileRecords/EventCountFSR' :
00258                     tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00259                     self.objectsOut.append( tup )
00260                 else :
00261                     # It's a Keyed Container
00262                     assert "KeyedContainer" in o.__class__.__name__
00263                     nObjects = o.numberOfObjects()
00264                     if nObjects :
00265                         self.log.debug("Keyed Container %s with %i objects"\
00266                                        %(l, nObjects))
00267                         tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00268                         self.objectsOut.append( tup )
00269         self.log.debug('Done with FSR store, just to send to Writer.')
00270 
00271         if self.objectsOut :
00272             self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
00273             for ob in self.objectsOut :
00274                 self.log.debug('\t%s'%(ob[0]))
00275             self.q.put( self.objectsOut )
00276         else :
00277             self.log.info('Valid FSR Store, but no data to send to Writer')
00278         self.log.info('SendFSR complete')
00279         self.q.put( 'END_FSR' )
00280         return SUCCESS
00281 
00282     def Receive( self ) :
00283         # Receive contents of all Workers FileRecords Transient Stores
00284         self.log.info('Receiving FSR store data...')
00285         nc = self._gmpc.nWorkers
00286         while nc > 0 :
00287             objects = self.q.get( )
00288             if objects == 'END_FSR' :
00289                 nc -= 1
00290                 continue
00291                 if nc==0 :
00292                     break
00293             # but if it's regular objects...
00294             for o in objects :
00295                 self.objectsIn.append(o)
00296         # Now sort it by which worker it came from
00297         # an object is : (nodeID, path, pickledObject)
00298         self.objectsIn.sort(cmp=self.localCmp)
00299         self.log.info('All FSR data received')
00300         return SUCCESS
00301 
00302     def Rebuild( self ) :
00303         # objects is a list of (path, serializedObject) tuples
00304         for sourceNode, path, serialob in self.objectsIn :
00305             self.log.debug('Working with %s'%(path))
00306             ob = pickle.loads(serialob)
00307             if hasattr( ob, 'update' ) :
00308                 ob.update()
00309             if hasattr( ob, 'numberOfObjects' ) :
00310                 nCont = ob.numberOfObjects()
00311                 self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__,  nCont) )
00312             if sourceNode == 0 :
00313                 self.log.debug('Registering Object to : %s'%(path))
00314                 self.fsr.registerObject( path, ob )
00315             else :
00316                 self.log.debug('Merging Object to : %s'%(path))
00317                 self.MergeFSRobject( sourceNode, path, ob )
00318         # As RecordStream has been split into Worker and Writer parts, the
00319         # count for output is wrong... fix that here, as every event received
00320         # by the writer is written (validation testing occurs on the worker)
00321 
00322         self.log.info('FSR Store Rebuilt.  Correcting EventCountFSR')
00323         if bool( self.fsr._idp ) :   # There might not be an FSR stream (Gauss)
00324             ecount  = '/FileRecords/EventCountFSR'
00325             if self.fsr[ecount] :
00326                 self.fsr[ecount].setOutput( self._gmpc.nIn )
00327                 self.log.info( 'Event Counter Output set : %s : %i'\
00328                                %(ecount, self.fsr[ecount].output()) )
00329             # Do some reporting
00330             self.log.debug('FSR store reconstructed!')
00331             lst = self.fsr.getHistoNames()
00332             if lst :
00333                 for l in lst :
00334                     ob = self.fsr.retrieveObject(l)
00335                     if hasattr( ob, 'configureDirectAccess' ) :
00336                         ob.configureDirectAccess()
00337                     if hasattr( ob, 'containedObjects' ) :
00338                         # if ob.numberOfObjects() :
00339                         self.log.debug('\t%s (cont. objects : %i)'\
00340                                        %(l, ob.numberOfObjects()))
00341                     else :
00342                         self.log.debug('\t%s'%(l))
00343         self.log.info('FSR Store fully rebuilt.')
00344         return SUCCESS
00345 
00346     def MergeFSRobject( self, sourceNode, path, ob ) :
00347         # Merge Non-Empty Keyed Container from Worker>0
00348         if path == '/FileRecords/TimeSpanFSR' :
00349             # TimeSpanFSR is a straightforward case
00350             self.ProcessTimeSpanFSR( path, ob )
00351         elif path == '/FileRecords/EventCountFSR' :
00352             # Event Counter is also easy
00353             self.ProcessEventCountFSR( path, ob )
00354         # now other cases may not be so easy...
00355         elif "KeyedContainer" in ob.__class__.__name__ :
00356             # Keyed Container of LumiFSRs : extract and re-register
00357             # self.ProcessLumiFSR( path, ob )
00358             if "LumiFSR" in ob.__class__.__name__ :
00359                 self.MergeLumiFSR( path, ob )
00360             else:
00361                 self.log.info("Skipping Merge of Keyed Container %s for %s"\
00362                               %(ob.__class__.__name__,path))
00363 
00364     def ProcessTimeSpanFSR( self, path, ob ) :
00365         if ob.containedObjects().size() :
00366             sz = ob.containedObjects().size()
00367             for j in xrange(sz) :
00368                 cob = ob.containedObjects()[j]
00369                 self.log.debug('Adding TimeSpanFSR')
00370                 # this is annoying: it has to be rebuilt, without a key & added
00371                 tsfsr = gbl.LHCb.TimeSpanFSR()
00372                 tsfsr.setEarliest( cob.earliest() )
00373                 tsfsr.setLatest(   cob.latest(  ) )
00374                 self.fsr[path].add(tsfsr)
00375                 continue
00376 
00377     def ProcessEventCountFSR( self, path, ob ) :
00378         self.log.debug('Event Count Input Addition')
00379         self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
00380 
00381     def MergeLumiFSR( self, path, keyedC ) :
00382         from ROOT import string
00383         # Fetch the first lumi
00384         keyedContainer = self.fsr.retrieveObject(path)
00385         # The LumiFSR KeyedContainers only have one object
00386         assert keyedContainer.numberOfObjects() == 1
00387         l = keyedContainer.containedObject(0)
00388         baseLumi = LumiFSR( l )
00389         # Now deal with the argument Non-empty Keyed Container of LumiFSRs
00390         nCont = keyedC.numberOfObjects()
00391         for i in xrange(nCont) :
00392             obj = keyedC.containedObject(i)
00393             nextLumi = LumiFSR( obj )
00394             baseLumi.merge( nextLumi )
00395         # Now Rebuild and ReRegister
00396         newLumi = gbl.LHCb.LumiFSR()
00397         for r in baseLumi.runs :
00398             newLumi.addRunNumber( r )
00399         for f in baseLumi.files :
00400             newLumi.addFileID( string(f) )
00401         for k in baseLumi.keys :
00402             increment, integral = baseLumi.info[k]
00403             newLumi.addInfo(k, increment, integral)
00404         # clear existing Keyed Container
00405         self.fsr[path].clear()
00406         # Add newly merged lumiFSR
00407         self.fsr[path].add(newLumi)
00408         return SUCCESS
00409 
00410 # =============================================================================
00411 
00412 class LumiFSR( ) :
00413     def __init__(self, lumi) :
00414         # lumi looks like :
00415         # {  runs : 69857 69858
00416         #    files : root:/castor/cer.../069857_0000000006.raw
00417         #    info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
00418 
00419         # class variables
00420         self.runs  = []
00421         self.files = []
00422         self.info  = {}
00423         self.keys  = []
00424 
00425         # get run numbers
00426         for r in lumi.runNumbers() :
00427             self.runs.append(r)
00428         # get file ids
00429         for f in lumi.fileIDs() :
00430             self.files.append(f)
00431         # Now the tricky bit, the info is not accessible via Python
00432         # except as a string
00433         s = str(lumi)
00434         sa = s.split("info (key/incr/integral) : ")[-1]
00435         sa = sa.split('/')[:-1]
00436         for rec in sa :
00437             k,i,t = rec.split()
00438             k = int(k)
00439             i = int(i)
00440             t = int(t)
00441             self.info[k] = (i,t)
00442         self.keys = self.info.keys()
00443     def merge( self, otherLumi ) :
00444         assert otherLumi.__class__.__name__ == "LumiFSR"
00445         # add any extra runs
00446         for r in otherLumi.runs :
00447             if r in self.runs :
00448                 pass
00449             else :
00450                 self.runs.append( r )
00451         self.runs.sort()
00452         # add any extra fileIDs
00453         for f in otherLumi.files :
00454             if f in self.files :
00455                 pass
00456             else :
00457                 self.files.append( f )
00458         self.files.sort()
00459         # Now add any extra records
00460         for k in otherLumi.keys :
00461             increment, integral = otherLumi.info[k]
00462             if k in self.keys :
00463                 myIncrement, myIntegral = self.info[k]
00464                 self.info[k] = ( myIncrement+increment, myIntegral+integral )
00465             else :
00466                 self.info[k] = ( increment, integral )
00467         # don't forget to update keys
00468         self.keys = self.info.keys()
00469     def __repr__( self ) :
00470         s  = "LumiFSR Python class\n"
00471         s += "\tRuns : \n"
00472         for r in self.runs :
00473             s += "\t\t%i\n"%(r)
00474         s +=  "\tFiles : \n"
00475         for f in self.files :
00476             s += "\t\t%s\n"%(f)
00477         s += "\tInfo : \n"
00478         for k in self.keys :
00479             increment, integral = self.info[k]
00480             s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
00481         return s
00482 
00483 # =============================================================================
00484 
00485 class PackedCaloHypo() :
00486     def __init__(self, o) :
00487       cl = 'LHCb::PackedCaloHypo'
00488       assert o.__class__.__name__ == cl
00489       self.centX = o.centX
00490       self.centY = o.centY
00491       self.cerr  = (o.cerr00,o.cerr10,o.cerr11)
00492       self.cov   = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
00493       self.firstCluster = o.firstCluster
00494       self.firstDigit   = o.firstDigit
00495       self.firstHypo    = o.firstHypo
00496       self.hypothesis   = o.hypothesis
00497       self.key          = o.key
00498       self.lastCluster  = o.lastCluster
00499       self.lastDigit    = o.lastDigit
00500       self.lastHypo     = o.lastHypo
00501       self.lh           = o.lh
00502       self.pos          = (o.posE, o.posX, o.posY)
00503       self.z            = o.z
00504     def __repr__( self ) :
00505         s  = "PackedCaloHypo : \n"
00506         s += "\tcentX        : %s\n"%( str(self.centX) )
00507         s += "\tcentY        : %s\n"%( str(self.centY) )
00508         s += "\tcerr         : %s\n"%( str(self.cerr ) )
00509         s += "\tcov          : %s\n"%( str(self.cov  ) )
00510         s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
00511         s += "\tfirstDigit   : %s\n"%( str(self.firstDigit) )
00512         s += "\tfirstHypo    : %s\n"%( str(self.firstHypo) )
00513         s += "\thypothesis   : %s\n"%( str(self.hypothesis) )
00514         s += "\tkey          : %s\n"%( str(self.key) )
00515         s += "\tlastCluster  : %s\n"%( str(self.lastCluster) )
00516         s += "\tlastDigit    : %s\n"%( str(self.lastDigit) )
00517         s += "\tlastHypo     : %s\n"%( str(self.lastHypo) )
00518         s += "\tlh           : %s\n"%( str(self.lh   ) )
00519         s += "\tpos          : %s\n"%( str(self.pos  ) )
00520         s += "\tz            : %s\n"%( str(self.z  ) )
00521         s += "---------------------------------------\n"
00522         return s
00523 
00524 # =============================================================================
00525 
00526 class SyncMini( object ) :
00527     def __init__( self, event, lastEvent=None ) :
00528         self.event  = event
00529         self.t      = 0.0
00530         self.lastEvent = None
00531         if lastEvent :
00532             self.lastEvent = lastEvent
00533     def check( self ) :
00534         return self.event.is_set()
00535     def checkLast( self ) :
00536         return self.lastEvent.is_set()
00537     def reset( self ) :
00538         self.event.clear()
00539         self.t = time.time()
00540     def getTime( self ) :
00541         return self.t
00542     def set( self ) :
00543         self.event.set()
00544     def __repr__( self ) :
00545         s  = "---------- SyncMini --------------\n"
00546         s += "    Status : %s\n"%(self.event.is_set())
00547         s += "         t : %5.2f\n"%(self.t)
00548         if self.lastEvent :
00549             s += "Last Event : %s\n"%(self.lastEvent.is_set())
00550         s += "----------------------------------\n"
00551         return s
00552 
00553 # =============================================================================
00554 
00555 class Syncer( object ) :
00556     def __init__( self, nWorkers, log, manyEvents=False,
00557                   limit=None, step=None, firstEvent=None ) :
00558         # Class to help synchronise the sub-processes
00559         self.limit  = limit
00560         self.step   = step
00561         self.d = {}
00562         self.manyEvents = manyEvents
00563         for i in xrange(-2, nWorkers) :
00564             self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
00565             if self.manyEvents :
00566                 self.limitFirst = firstEvent
00567         self.keys       = self.d.keys()
00568         self.nWorkers   = nWorkers
00569         self.log        = log
00570 
00571     def syncAll( self, step="Not specified" ) :
00572         # is it this method, or is it the rolling version needed?
00573         # if so, drop through...
00574 
00575         if self.manyEvents :
00576             sc = self.syncAllRolling( )
00577             return sc
00578 
00579         # Regular version ----------------------------
00580         for i in xrange( 0, self.limit, self.step ) :
00581             if self.checkAll( ) :
00582                 self.log.info('%s : All procs done @ %i s'%(step,i))
00583                 break
00584             else :
00585                 time.sleep(self.step)
00586 
00587         # Now the time limit is up... check the status one final time
00588         if self.checkAll() :
00589             self.log.info("All processes : %s ok."%(step))
00590             return SUCCESS
00591         else :
00592             self.log.critical('Some process is hanging on : %s'%(step))
00593             for k in self.keys :
00594                 hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
00595                 self.log.critical( hangString )
00596             return FAILURE
00597 
00598     def syncAllRolling( self ) :
00599         # Keep track of the progress of Event processing
00600         # Each process syncs after each event, so keep clearing
00601         #  the sync Event, and re-checking
00602         # Note the time between True checks too, if the time
00603         #  between events exceeds singleEvent, this is considered a hang
00604 
00605         # set the initial time
00606         begin   = time.time()
00607         firstEv = {}
00608         timers  = {}
00609         for k in self.keys :
00610             self.d[k].reset()
00611             firstEv[k] = False
00612             timers[k]  = 0.0
00613 
00614         active = self.keys
00615         while True :
00616             # check the status of each sync object
00617             for k in active :
00618                 sMini = self.d[k]
00619                 # print sMini
00620                 if sMini.check() :
00621                     if sMini.checkLast() :
00622                         # if last Event set,then event loop finished
00623                         active.remove( k )
00624                         alive = time.time()-begin
00625                         self.log.info( "Audit : Node %i alive for %5.2f"\
00626                                        %(k,alive) )
00627                     else :
00628                         sMini.reset()
00629                 else :
00630                     # the event still has not been checked, how long is that?
00631                     # is it the first Event?
00632                     wait = time.time()-sMini.getTime()
00633                     cond = wait > self.limit
00634                     if not firstEv[k] :
00635                         cond       = wait > self.limitFirst
00636                         firstEv[k] = True
00637                     if cond :
00638                         # It is hanging!
00639                         self.log.critical('Single event wait : %5.2f'%(wait))
00640                         self.processHang()
00641                         return FAILURE
00642 
00643             # Termination Criteria : if all procs have been removed, we're done
00644             if self.checkLastEvents() :
00645                 self.log.info('TC met for event loop')
00646                 break
00647             else :
00648                 # sleep, loop again
00649                 time.sleep(self.step)
00650 
00651         self.log.info("All processes Completed all Events ok")
00652         return SUCCESS
00653 
00654     def processHang( self ) :
00655         self.log.critical('Some proc is hanging during Event processing!')
00656         for k in self.keys :
00657             self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
00658         return
00659 
00660     def checkAll( self ) :
00661         # Check the status of each Sync object
00662         # return True or False
00663         currentStatus = [ mini.check() for mini in self.d.values() ]
00664         return all( currentStatus )
00665 
00666     def checkLastEvents( self ) :
00667        # check if all of the lastEvents are set to true in self.d[k][1]
00668        stat    = [ sMini.checkLast() for sMini in self.d.values() ]
00669        return all(stat)
00670 
00671 # =========================== Methods =========================================
00672 
00673 def getEventNumber( evt ) :
00674     # The class-independent version of the Event Number Retrieval method
00675     #
00676     n = None
00677     # First Attempt : Unpacked Event Data
00678     lst = [ '/Event/Gen/Header',
00679             '/Event/Rec/Header' ]
00680     for l in lst :
00681         try :
00682             n = evt[l].evtNumber()
00683             return n
00684         except :
00685             # No evt number at this path
00686             continue
00687 
00688     # second attepmt : try DAQ/RawEvent data
00689     # The Evt Number is in bank type 16, bank 0, data pt 4
00690     try :
00691         n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00692         return n
00693     except :
00694         pass
00695 
00696     # Default Action
00697     return n
00698 
00699 # ================================= EOF =======================================
00700 
00701 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Fri Sep 2 2011 16:24:46 for Gaudi Framework, version v22r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004