Gaudi Framework, version v23r4

Home   Generated: Mon Sep 17 2012

pTools.py

Go to the documentation of this file.
00001 from GaudiPython import gbl, SUCCESS, FAILURE
00002 from multiprocessing import Event
00003 import pickle, time
00004 
00005 # Eoin Smith
00006 # 3 Aug 2010
00007 
00008 #
00009 # This script contains the ancillary classes and functions used in the
00010 #  GaudiPython Parallel model
00011 #
00012 # Classes :
00013 # - HistoAgent : In charge of extracting Histograms from their Transient Store
00014 #                 on a reader/worker, communicating them to the writer, and on
00015 #                 the writer receiving them and rebuilding a single store
00016 #
00017 # - FileRecordsAgent : Similar to HistoAgent, but for FileRecords Data
00018 #
00019 # - LumiFSR : FSR data from different workers needs to be carefully merged to
00020 #              replicate the serial version; this class aids in that task by
00021 #              representing an LHCb LumiFSR object as a python class
00022 #
00023 # - PackedCaloHypos : Pythonization of an LHCb class, used for inspecting some
00024 #                      differences in serial and parallel output
00025 #
00026 # - Syncer : This class is responsible for syncing processes for a specified
00027 #             section of execution.  For example, one Syncer object might be
00028 #             syncing Initialisation, one for Running, one for Finalisation.
00029 #             Syncer uses multiprocessing.Event() objects as flags which are
00030 #             visible across the N processes sharing them.
00031 #             IMPORTANT : The Syncer objects in the GaudiPython Parallel model
00032 #              ensure that there is no hanging; they in effect, allow a timeout
00033 #              for Initialisation, Run, Finalise on all processes
00034 #
00035 # - SyncMini : A class wrapper for a multiprocessing.Event() object
00036 #
00037 # Methods :
00038 # - getEventNumber(evt) : pass a valid instance of the GaudiPython TES
00039 #                          ( AppMgr().evtsvc() ) to this to get the current
00040 #                          Event Number as an integer (even from RawEvents!)
00041 #
00042 
00043 
00044 # used to convert stored histos (in AIDA format) to ROOT format
00045 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00046 
00047 # =========================== Classes =========================================
00048 
00049 class HistoAgent( ) :
00050     def __init__( self, gmpComponent ) :
00051         self._gmpc = gmpComponent
00052         self.hvt = self._gmpc.hvt
00053         self.histos = []
00054         self.qin = self._gmpc.hq
00055         self.log = self._gmpc.log
00056 
00057         # There are many methods for booking Histogram Objects to Histo store
00058         # here they are collected in a dictionary, with key = a relevant name
00059         self.bookingDict = {}
00060         self.bookingDict['DataObject']        = self.bookDataObject
00061         self.bookingDict['NTuple::Directory'] = self.bookDataObject
00062         self.bookingDict['NTuple::File']      = self.bookDataObject
00063         self.bookingDict['TH1D']       = self.bookTH1D
00064         self.bookingDict['TH2D']       = self.bookTH2D
00065         self.bookingDict['TH3D']       = self.bookTH3D
00066         self.bookingDict['TProfile']   = self.bookTProfile
00067         self.bookingDict['TProfile2D'] = self.bookTProfile2D
00068 
00069     def register( self, tup ) :
00070         # add a tuple of (worker-id, histoDict) to self.histos
00071         assert tup.__class__.__name__ == 'tuple'
00072         self.histos.append( tup )
00073 
00074     def Receive( self ) :
00075         hstatus = self._gmpc.nWorkers+1    # +1 for the Reader!
00076         while True :
00077             tup = self.qin.get()
00078             if tup == 'HISTOS_SENT' :
00079                 self.log.debug('received HISTOS_SENT message')
00080                 hstatus -= 1
00081                 if not hstatus : break
00082             else   :
00083               self.register( tup )
00084         self._gmpc.sEvent.set()
00085         self.log.info('Writer received all histo bundles and set sync event')
00086         return SUCCESS
00087 
00088 
00089     def RebuildHistoStore( self ) :
00090         '''
00091         Rebuild the Histogram Store from the histos received by Receive()
00092         If we have a histo which is not in the store,
00093         book and fill it according to self.bookingDict
00094         If we have a histo with a matching histo in the store,
00095         add the two histos, remembering that aida2root must be used on
00096         the Stored histo for compatibility.
00097         '''
00098         errors = 0
00099         for tup in self.histos :
00100             workerID, histDict = tup
00101             added = 0 ; registered = 0; booked = 0
00102             for n in histDict.keys() :
00103                 o = histDict[ n ]
00104                 obj = self.hvt.retrieve( n )
00105                 if obj :
00106                     try    :
00107                         aida2root(obj).Add(o)
00108                     except :
00109                         self.log.warning('FAILED TO ADD : %s'%(str(obj)))
00110                         errors += 1
00111                     added += 1
00112                 else :
00113                     if o.__class__.__name__ in self.bookingDict.keys() :
00114                         try    :
00115                             self.bookingDict[o.__class__.__name__](n, o)
00116                         except :
00117                             self.log.warning('FAILED TO REGISTER : %s\tto%s'\
00118                                              %(o.__class__.__name__, n))
00119                             errors += 1
00120                     else :
00121                         self.log.warning( 'No booking method for: %s\t%s\t%s'\
00122                                           %(n,type(o),o.__class__.__name__) )
00123                         errors += 1
00124                     booked += 1
00125         hs = self.hvt.getHistoNames()
00126         self.log.info( 'Histo Store Rebuilt : ' )
00127         self.log.info( '  Contains %i objects.'%(len(hs)) )
00128         self.log.info( '  Errors in Rebuilding : %i'%(errors) )
00129         return SUCCESS
00130 
00131 
00132     def bookDataObject( self, n, o ):
00133         '''
00134         Register a DataObject to the Histo Store
00135         '''
00136         self._gmpc.hvt.registerObject( n, o )
00137 
00138     def bookTH1D( self, n, o ) :
00139         '''
00140         Register a ROOT 1D THisto to the Histo Store
00141         '''
00142         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00143                                      o.GetXaxis().GetNbins(),\
00144                                      o.GetXaxis().GetXmin(),\
00145                                      o.GetXaxis().GetXmax() )
00146         aida2root(obj).Add(o)
00147 
00148     def bookTH2D( self, n, o ) :
00149         '''
00150         Register a ROOT 2D THisto to the Histo Store
00151         '''
00152         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00153                                      o.GetXaxis().GetNbins(),\
00154                                      o.GetXaxis().GetXmin(),\
00155                                      o.GetXaxis().GetXmax(),\
00156                                      o.GetYaxis().GetNbins(),\
00157                                      o.GetYaxis().GetXmin(),\
00158                                      o.GetYaxis().GetXmax() )
00159         aida2root(obj).Add(o)
00160 
00161     def bookTH3D( self, n, o ) :
00162         '''
00163         Register a ROOT 3D THisto to the Histo Store
00164         '''
00165         obj = self.hvt._ihs.book( n, o.GetTitle(),\
00166                                      o.GetXaxis().GetXbins(),\
00167                                      o.GetXaxis().GetXmin(),\
00168                                      o.GetXaxis().GetXmax(),\
00169                                      o.GetYaxis().GetXbins(),\
00170                                      o.GetYaxis().GetXmin(),\
00171                                      o.GetYaxis().GetXmax(),\
00172                                      o.GetZaxis().GetXbins(),\
00173                                      o.GetZaxis().GetXmin(),\
00174                                      o.GetZaxis().GetXmax() )
00175         aida2root(obj).Add(o)
00176 
00177     def bookTProfile( self, n, o ) :
00178         '''
00179         Register a ROOT TProfile to the Histo Store
00180         '''
00181         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00182                                          o.GetXaxis().GetNbins(),\
00183                                          o.GetXaxis().GetXmin(),\
00184                                          o.GetXaxis().GetXmax(),\
00185                                          o.GetOption() )
00186         aida2root(obj).Add(o)
00187 
00188     def bookTProfile2D( self, n, o ) :
00189         '''
00190         Register a ROOT TProfile2D to the Histo Store
00191         '''
00192         obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00193                                          o.GetXaxis().GetNbins(),\
00194                                          o.GetXaxis().GetXmin(),\
00195                                          o.GetXaxis().GetXmax(),\
00196                                          o.GetYaxis().GetNbins(),\
00197                                          o.GetYaxis().GetXmin(),\
00198                                          o.GetYaxis().GetXmax()  )
00199         aida2root(obj).Add(o)
00200 
00201 # =============================================================================
00202 
00203 class FileRecordsAgent( ) :
00204     def __init__( self, gmpComponent ) :
00205         self._gmpc = gmpComponent
00206         self.fsr = self._gmpc.fsr
00207         self.q   = self._gmpc.fq
00208         self.log = self._gmpc.log
00209         self.objectsIn  = []   # used for collecting FSR store objects
00210         self.objectsOut = []
00211 
00212     def localCmp( self, tupA, tupB ) :
00213         # sort tuples by a particular element
00214         # for the sort() method
00215         ind  = 0
00216         valA = tupA[ind]
00217         valB = tupB[ind]
00218         if   valA<valB : return -1
00219         elif valA>valB : return  1
00220         else           : return  0
00221 
00222 
00223     def SendFileRecords( self ) :
00224         # send the FileRecords data as part of finalisation
00225 
00226         # Take Care of FileRecords!
00227         # There are two main things to consider here
00228         # 1) The DataObjects in the FileRecords Transient Store
00229         # 2) The fact that they are Keyed Containers, containing other objects
00230         #
00231         # The Lead Worker, nodeID=0, sends everything in the FSR store, as
00232         #   a completeness guarantee,
00233         #
00234         # send in form ( nodeID, path, object)
00235         self.log.info('Sending FileRecords...')
00236         lst      = self.fsr.getHistoNames()
00237 
00238         # Check Validity
00239         if not lst :
00240             self.log.info('No FileRecords Data to send to Writer.')
00241             self.q.put( 'END_FSR' )
00242             return SUCCESS
00243 
00244         # no need to send the root node
00245         if '/FileRecords' in lst : lst.remove('/FileRecords')
00246 
00247         for l in lst :
00248             o = self.fsr.retrieveObject( l )
00249             if hasattr(o, "configureDirectAccess") :
00250                 o.configureDirectAccess()
00251             # lead worker sends everything, as completeness guarantee
00252             if self._gmpc.nodeID == 0 :
00253                 self.objectsOut.append( (0, l, pickle.dumps(o)) )
00254             else :
00255                 # only add the Event Counter
00256                 # and non-Empty Keyed Containers (ignore empty ones)
00257                 if l == '/FileRecords/EventCountFSR' :
00258                     tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00259                     self.objectsOut.append( tup )
00260                 else :
00261                     # It's a Keyed Container
00262                     assert "KeyedContainer" in o.__class__.__name__
00263                     nObjects = o.numberOfObjects()
00264                     if nObjects :
00265                         self.log.debug("Keyed Container %s with %i objects"\
00266                                        %(l, nObjects))
00267                         tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00268                         self.objectsOut.append( tup )
00269         self.log.debug('Done with FSR store, just to send to Writer.')
00270 
00271         if self.objectsOut :
00272             self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
00273             for ob in self.objectsOut :
00274                 self.log.debug('\t%s'%(ob[0]))
00275             self.q.put( self.objectsOut )
00276         else :
00277             self.log.info('Valid FSR Store, but no data to send to Writer')
00278         self.log.info('SendFSR complete')
00279         self.q.put( 'END_FSR' )
00280         return SUCCESS
00281 
00282     def Receive( self ) :
00283         # Receive contents of all Workers FileRecords Transient Stores
00284         self.log.info('Receiving FSR store data...')
00285         nc = self._gmpc.nWorkers
00286         while nc > 0 :
00287             objects = self.q.get( )
00288             if objects == 'END_FSR' :
00289                 nc -= 1
00290                 continue
00291                 if nc==0 :
00292                     break
00293             # but if it's regular objects...
00294             for o in objects :
00295                 self.objectsIn.append(o)
00296         # Now sort it by which worker it came from
00297         # an object is : (nodeID, path, pickledObject)
00298         self.objectsIn.sort(cmp=self.localCmp)
00299         self.log.info('All FSR data received')
00300         return SUCCESS
00301 
00302     def Rebuild( self ) :
00303         # objects is a list of (path, serializedObject) tuples
00304         for sourceNode, path, serialob in self.objectsIn :
00305             self.log.debug('Working with %s'%(path))
00306             ob = pickle.loads(serialob)
00307             if hasattr( ob, 'update' ) :
00308                 ob.update()
00309             if hasattr( ob, 'numberOfObjects' ) :
00310                 nCont = ob.numberOfObjects()
00311                 self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__,  nCont) )
00312             if sourceNode == 0 :
00313                 self.log.debug('Registering Object to : %s'%(path))
00314                 self.fsr.registerObject( path, ob )
00315             else :
00316                 self.log.debug('Merging Object to : %s'%(path))
00317                 self.MergeFSRobject( sourceNode, path, ob )
00318         # As RecordStream has been split into Worker and Writer parts, the
00319         # count for output is wrong... fix that here, as every event received
00320         # by the writer is written (validation testing occurs on the worker)
00321 
00322         self.log.info('FSR Store Rebuilt.  Correcting EventCountFSR')
00323         if bool( self.fsr._idp ) :   # There might not be an FSR stream (Gauss)
00324             ecount  = '/FileRecords/EventCountFSR'
00325             if self.fsr[ecount] :
00326                 self.fsr[ecount].setOutput( self._gmpc.nIn )
00327                 self.log.info( 'Event Counter Output set : %s : %i'\
00328                                %(ecount, self.fsr[ecount].output()) )
00329             # Do some reporting
00330             self.log.debug('FSR store reconstructed!')
00331             lst = self.fsr.getHistoNames()
00332             if lst :
00333                 for l in lst :
00334                     ob = self.fsr.retrieveObject(l)
00335                     if hasattr( ob, 'configureDirectAccess' ) :
00336                         ob.configureDirectAccess()
00337                     if hasattr( ob, 'containedObjects' ) :
00338                         # if ob.numberOfObjects() :
00339                         self.log.debug('\t%s (cont. objects : %i)'\
00340                                        %(l, ob.numberOfObjects()))
00341                     else :
00342                         self.log.debug('\t%s'%(l))
00343         self.log.info('FSR Store fully rebuilt.')
00344         return SUCCESS
00345 
00346     def MergeFSRobject( self, sourceNode, path, ob ) :
00347         # Merge Non-Empty Keyed Container from Worker>0
00348         if path == '/FileRecords/TimeSpanFSR' :
00349             # TimeSpanFSR is a straightforward case
00350             self.ProcessTimeSpanFSR( path, ob )
00351         elif path == '/FileRecords/EventCountFSR' :
00352             # Event Counter is also easy
00353             self.ProcessEventCountFSR( path, ob )
00354         # now other cases may not be so easy...
00355         elif "KeyedContainer" in ob.__class__.__name__ :
00356             # Keyed Container of LumiFSRs : extract and re-register
00357             # self.ProcessLumiFSR( path, ob )
00358             if "LumiFSR" in ob.__class__.__name__ :
00359                 self.MergeLumiFSR( path, ob )
00360             else:
00361                 self.log.info("Skipping Merge of Keyed Container %s for %s"\
00362                               %(ob.__class__.__name__,path))
00363 
00364     def ProcessTimeSpanFSR( self, path, ob ) :
00365         ob2 = self.fsr.retrieveObject( path )
00366         if ob.containedObjects().size() :
00367             sz = ob.containedObjects().size()
00368             cob = ob2.containedObjects()[0]
00369             min = cob.earliest()
00370             max = cob.latest()
00371             for j in xrange( sz ) :
00372                 cob = ob.containedObjects()[j]
00373                 self.log.debug( 'Adding TimeSpanFSR' )
00374                 if cob.earliest() < min:
00375                   min = cob.earliest()
00376                 if cob.latest() > max:
00377                   max = cob.latest()
00378                 # this is annoying: it has to be rebuilt, without a key & added
00379                 continue
00380             tsfsr = gbl.LHCb.TimeSpanFSR()
00381             tsfsr.setEarliest( min )
00382             tsfsr.setLatest( max )
00383             self.fsr[path].clear()
00384             self.fsr[path].add( tsfsr )
00385 
00386     def ProcessEventCountFSR( self, path, ob ) :
00387         self.log.debug('Event Count Input Addition')
00388         self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
00389 
00390     def MergeLumiFSR( self, path, keyedC ) :
00391         from ROOT import string
00392         # Fetch the first lumi
00393         keyedContainer = self.fsr.retrieveObject(path)
00394         # The LumiFSR KeyedContainers only have one object
00395         assert keyedContainer.numberOfObjects() == 1
00396         l = keyedContainer.containedObject(0)
00397         baseLumi = LumiFSR( l )
00398         # Now deal with the argument Non-empty Keyed Container of LumiFSRs
00399         nCont = keyedC.numberOfObjects()
00400         for i in xrange(nCont) :
00401             obj = keyedC.containedObject(i)
00402             nextLumi = LumiFSR( obj )
00403             baseLumi.merge( nextLumi )
00404         # Now Rebuild and ReRegister
00405         newLumi = gbl.LHCb.LumiFSR()
00406         for r in baseLumi.runs :
00407             newLumi.addRunNumber( r )
00408         for f in baseLumi.files :
00409             newLumi.addFileID( string(f) )
00410         for k in baseLumi.keys :
00411             increment, integral = baseLumi.info[k]
00412             newLumi.addInfo(k, increment, integral)
00413         # clear existing Keyed Container
00414         self.fsr[path].clear()
00415         # Add newly merged lumiFSR
00416         self.fsr[path].add(newLumi)
00417         return SUCCESS
00418 
00419 # =============================================================================
00420 
00421 class LumiFSR( ) :
00422     def __init__(self, lumi) :
00423         # lumi looks like :
00424         # {  runs : 69857 69858
00425         #    files : root:/castor/cer.../069857_0000000006.raw
00426         #    info (key/incr/integral) : 0 8 0 / 1 8 259 / 2 8 76 ... }
00427 
00428         # class variables
00429         self.runs  = []
00430         self.files = []
00431         self.info  = {}
00432         self.keys  = []
00433 
00434         # get run numbers
00435         for r in lumi.runNumbers() :
00436             self.runs.append(r)
00437         # get file ids
00438         for f in lumi.fileIDs() :
00439             self.files.append(f)
00440         # Now the tricky bit, the info is not accessible via Python
00441         # except as a string
00442         s = str(lumi)
00443         sa = s.split("info (key/incr/integral) : ")[-1]
00444         sa = sa.split('/')[:-1]
00445         for rec in sa :
00446             k,i,t = rec.split()
00447             k = int(k)
00448             i = int(i)
00449             t = int(t)
00450             self.info[k] = (i,t)
00451         self.keys = self.info.keys()
00452     def merge( self, otherLumi ) :
00453         assert otherLumi.__class__.__name__ == "LumiFSR"
00454         # add any extra runs
00455         for r in otherLumi.runs :
00456             if r in self.runs :
00457                 pass
00458             else :
00459                 self.runs.append( r )
00460         self.runs.sort()
00461         # add any extra fileIDs
00462         for f in otherLumi.files :
00463             if f in self.files :
00464                 pass
00465             else :
00466                 self.files.append( f )
00467         self.files.sort()
00468         # Now add any extra records
00469         for k in otherLumi.keys :
00470             increment, integral = otherLumi.info[k]
00471             if k in self.keys :
00472                 myIncrement, myIntegral = self.info[k]
00473                 self.info[k] = ( myIncrement+increment, myIntegral+integral )
00474             else :
00475                 self.info[k] = ( increment, integral )
00476         # don't forget to update keys
00477         self.keys = self.info.keys()
00478     def __repr__( self ) :
00479         s  = "LumiFSR Python class\n"
00480         s += "\tRuns : \n"
00481         for r in self.runs :
00482             s += "\t\t%i\n"%(r)
00483         s +=  "\tFiles : \n"
00484         for f in self.files :
00485             s += "\t\t%s\n"%(f)
00486         s += "\tInfo : \n"
00487         for k in self.keys :
00488             increment, integral = self.info[k]
00489             s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
00490         return s
00491 
00492 # =============================================================================
00493 
00494 class PackedCaloHypo() :
00495     def __init__(self, o) :
00496       cl = 'LHCb::PackedCaloHypo'
00497       assert o.__class__.__name__ == cl
00498       self.centX = o.centX
00499       self.centY = o.centY
00500       self.cerr  = (o.cerr00,o.cerr10,o.cerr11)
00501       self.cov   = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
00502       self.firstCluster = o.firstCluster
00503       self.firstDigit   = o.firstDigit
00504       self.firstHypo    = o.firstHypo
00505       self.hypothesis   = o.hypothesis
00506       self.key          = o.key
00507       self.lastCluster  = o.lastCluster
00508       self.lastDigit    = o.lastDigit
00509       self.lastHypo     = o.lastHypo
00510       self.lh           = o.lh
00511       self.pos          = (o.posE, o.posX, o.posY)
00512       self.z            = o.z
00513     def __repr__( self ) :
00514         s  = "PackedCaloHypo : \n"
00515         s += "\tcentX        : %s\n"%( str(self.centX) )
00516         s += "\tcentY        : %s\n"%( str(self.centY) )
00517         s += "\tcerr         : %s\n"%( str(self.cerr ) )
00518         s += "\tcov          : %s\n"%( str(self.cov  ) )
00519         s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
00520         s += "\tfirstDigit   : %s\n"%( str(self.firstDigit) )
00521         s += "\tfirstHypo    : %s\n"%( str(self.firstHypo) )
00522         s += "\thypothesis   : %s\n"%( str(self.hypothesis) )
00523         s += "\tkey          : %s\n"%( str(self.key) )
00524         s += "\tlastCluster  : %s\n"%( str(self.lastCluster) )
00525         s += "\tlastDigit    : %s\n"%( str(self.lastDigit) )
00526         s += "\tlastHypo     : %s\n"%( str(self.lastHypo) )
00527         s += "\tlh           : %s\n"%( str(self.lh   ) )
00528         s += "\tpos          : %s\n"%( str(self.pos  ) )
00529         s += "\tz            : %s\n"%( str(self.z  ) )
00530         s += "---------------------------------------\n"
00531         return s
00532 
00533 # =============================================================================
00534 
00535 class SyncMini( object ) :
00536     def __init__( self, event, lastEvent=None ) :
00537         self.event  = event
00538         self.t      = 0.0
00539         self.lastEvent = None
00540         if lastEvent :
00541             self.lastEvent = lastEvent
00542     def check( self ) :
00543         return self.event.is_set()
00544     def checkLast( self ) :
00545         return self.lastEvent.is_set()
00546     def reset( self ) :
00547         self.event.clear()
00548         self.t = time.time()
00549     def getTime( self ) :
00550         return self.t
00551     def set( self ) :
00552         self.event.set()
00553     def __repr__( self ) :
00554         s  = "---------- SyncMini --------------\n"
00555         s += "    Status : %s\n"%(self.event.is_set())
00556         s += "         t : %5.2f\n"%(self.t)
00557         if self.lastEvent :
00558             s += "Last Event : %s\n"%(self.lastEvent.is_set())
00559         s += "----------------------------------\n"
00560         return s
00561 
00562 # =============================================================================
00563 
00564 class Syncer( object ) :
00565     def __init__( self, nWorkers, log, manyEvents=False,
00566                   limit=None, step=None, firstEvent=None ) :
00567         # Class to help synchronise the sub-processes
00568         self.limit  = limit
00569         self.step   = step
00570         self.d = {}
00571         self.manyEvents = manyEvents
00572         for i in xrange(-2, nWorkers) :
00573             self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
00574             if self.manyEvents :
00575                 self.limitFirst = firstEvent
00576         self.keys       = self.d.keys()
00577         self.nWorkers   = nWorkers
00578         self.log        = log
00579 
00580     def syncAll( self, step="Not specified" ) :
00581         # is it this method, or is it the rolling version needed?
00582         # if so, drop through...
00583 
00584         if self.manyEvents :
00585             sc = self.syncAllRolling( )
00586             return sc
00587 
00588         # Regular version ----------------------------
00589         for i in xrange( 0, self.limit, self.step ) :
00590             if self.checkAll( ) :
00591                 self.log.info('%s : All procs done @ %i s'%(step,i))
00592                 break
00593             else :
00594                 time.sleep(self.step)
00595 
00596         # Now the time limit is up... check the status one final time
00597         if self.checkAll() :
00598             self.log.info("All processes : %s ok."%(step))
00599             return SUCCESS
00600         else :
00601             self.log.critical('Some process is hanging on : %s'%(step))
00602             for k in self.keys :
00603                 hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
00604                 self.log.critical( hangString )
00605             return FAILURE
00606 
00607     def syncAllRolling( self ) :
00608         # Keep track of the progress of Event processing
00609         # Each process syncs after each event, so keep clearing
00610         #  the sync Event, and re-checking
00611         # Note the time between True checks too, if the time
00612         #  between events exceeds singleEvent, this is considered a hang
00613 
00614         # set the initial time
00615         begin   = time.time()
00616         firstEv = {}
00617         timers  = {}
00618         for k in self.keys :
00619             self.d[k].reset()
00620             firstEv[k] = False
00621             timers[k]  = 0.0
00622 
00623         active = self.keys
00624         while True :
00625             # check the status of each sync object
00626             for k in active :
00627                 sMini = self.d[k]
00628 
00629                 if sMini.check() or sMini.checkLast():
00630                     if sMini.checkLast() and sMini.check() :
00631                         # if last Event set,then event loop finished
00632                         active.remove( k )
00633                         alive = time.time()-begin
00634                         self.log.info( "Audit : Node %i alive for %5.2f"\
00635                                        %(k,alive) )
00636                     else :
00637                         sMini.reset()
00638                 else :
00639                     # the event still has not been checked, how long is that?
00640                     # is it the first Event?
00641                     wait = time.time()-sMini.getTime()
00642                     cond = wait > self.limit
00643                     if not firstEv[k] :
00644                         cond       = wait > self.limitFirst
00645                         firstEv[k] = True
00646                     if cond :
00647                         # It is hanging!
00648                         self.log.critical('Single event wait : %5.2f'%(wait))
00649                         self.processHang()
00650                         return FAILURE
00651 
00652             # Termination Criteria : if all procs have been removed, we're done
00653             if self.checkLastEvents() :
00654                 self.log.info('TC met for event loop')
00655                 break
00656             else :
00657                 # sleep, loop again
00658                 time.sleep(self.step)
00659 
00660         self.log.info("All processes Completed all Events ok")
00661         return SUCCESS
00662 
00663     def processHang( self ) :
00664         self.log.critical('Some proc is hanging during Event processing!')
00665         for k in self.keys :
00666             self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
00667         return
00668 
00669     def checkAll( self ) :
00670         # Check the status of each Sync object
00671         # return True or False
00672         currentStatus = [ mini.check() for mini in self.d.values() ]
00673         return all( currentStatus )
00674 
00675     def checkLastEvents( self ) :
00676        # check if all of the lastEvents are set to true in self.d[k][1]
00677        stat    = [ sMini.checkLast() for sMini in self.d.values() ]
00678        return all(stat)
00679 
00680 # =========================== Methods =========================================
00681 
00682 def getEventNumber( evt ) :
00683     # The class-independent version of the Event Number Retrieval method
00684     #
00685     n = None
00686     # First Attempt : Unpacked Event Data
00687     lst = [ '/Event/Gen/Header',
00688             '/Event/Rec/Header' ]
00689     for l in lst :
00690         try :
00691             n = evt[l].evtNumber()
00692             return n
00693         except :
00694             # No evt number at this path
00695             continue
00696 
00697     # second attepmt : try DAQ/RawEvent data
00698     # The Evt Number is in bank type 16, bank 0, data pt 4
00699     try :
00700         n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00701         return n
00702     except :
00703         pass
00704 
00705     # Default Action
00706     return n
00707 
00708 # ================================= EOF =======================================
00709 
00710 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Mon Sep 17 2012 13:49:35 for Gaudi Framework, version v23r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004