Gaudi Framework, version v22r0

Home   Generated: 9 Feb 2011

GMPBase.py

Go to the documentation of this file.
00001 from Gaudi.Configuration import *
00002 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE
00003 from ROOT import TBufferFile, TBuffer
00004 from multiprocessing import Process, Queue, JoinableQueue, Event
00005 from multiprocessing import cpu_count, current_process
00006 from multiprocessing.queues import Empty
00007 from pTools import *
00008 import time, sys, os
00009 
00010 # This script contains the bases for the Gaudi MultiProcessing (GMP)
00011 # classes
00012 
00013 # There are three classes :
00014 #   Reader
00015 #   Worker
00016 #   Writer
00017 
00018 # Each class needs to perform communication with the others
00019 # For this, we need a means of communication, which will be based on
00020 # the python multiprocessing package
00021 # This is provided in SPI pytools package
00022 # cmt line : use pytools v1.1 LCG_Interfaces
00023 # The PYTHONPATH env variable may need to be modified, as this might
00024 # still point to 1.0_python2.5
00025 
00026 # Each class will need Queues, and a defined method for using these
00027 # queues.
00028 # For example, as long as there is something in the Queue, both ends
00029 # of the queue must be open
00030 # Also, there needs to be proper Termination flags and criteria
00031 # The System should be error proof.
00032 
00033 
00034 # Constants -------------------------------------------------------------------
00035 NAP = 0.001
00036 MB  = 1024.0*1024.0
00037 # waits to guard against hanging, in seconds
00038 WAIT_INITIALISE   = 60*5
00039 WAIT_FIRST_EVENT  = 60*3
00040 WAIT_SINGLE_EVENT = 60*3
00041 WAIT_FINALISE     = 60*2
00042 STEP_INITIALISE   = 10
00043 STEP_EVENT        = 2
00044 STEP_FINALISE     = 5
00045 
00046 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
00047 SMAPS = False
00048 
00049 # -----------------------------------------------------------------------------
00050 
00051 # definitions
00052 # ----------
00053 # used to convert stored histos (in AIDA format) to ROOT format
00054 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00055 
00056 # used to check which type of histo we are dealing with
00057 # i.e. if currentHisto in aidatypes : pass
00058 aidatypes = ( gbl.AIDA.IHistogram,
00059               gbl.AIDA.IHistogram1D,
00060               gbl.AIDA.IHistogram2D,
00061               gbl.AIDA.IHistogram3D,
00062               gbl.AIDA.IProfile1D,
00063               gbl.AIDA.IProfile2D,
00064               gbl.AIDA.IBaseHistogram  )  # extra?
00065 
00066 # similar to aidatypes
00067 thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00068 
00069 # Types of OutputStream in Gaudi
00070 WRITERTYPES  = {  'EvtCollectionStream'    : "tuples",
00071                   'InputCopyStream'        : "events",
00072                   'OutputStream'           : "events",
00073                   'RecordStream'           : "records",
00074                   'RunRecordStream'        : "records",
00075                   'SequentialOutputStream' : "events",
00076                   'TagCollectionStream'    : "tuples"   }
00077 
00078 # =============================================================================
00079 
00080 class MiniWriter( object ) :
00081     '''
00082     A class to represent a writer in the GaudiPython configuration
00083     It can be non-trivial to access the name of the output file; it may be
00084     specified in the DataSvc, or just on the writer, may be a list, or string
00085     Also, there are three different types of writer (events, records, tuples)
00086     so this bootstrap class provides easy access to this info while configuring
00087     '''
00088     def __init__( self, writer, wType, config ) :
00089         self.w     = writer
00090         self.wType = wType
00091         # set parameters for directly accessing the correct
00092         #   part of the configuration, so that later, we can do
00093         #   config[ key ].Output = modified(output)
00094         self.key          = None
00095         self.output       = None
00096         self.ItemList     = None
00097         self.OptItemList  = None
00098         #
00099         self.wName = writer.getName()
00100         # Now process the Writer, find where the output is named
00101         self.woutput     = None
00102         self.datasvcName = None
00103         self.svcOutput   = None
00104         if hasattr( self.w, "Output" ) :
00105             self.woutput = self.w.Output
00106             self.getItemLists( config )
00107             self.set( self.wName, self.w.Output )
00108             return
00109         else :
00110             # if there is no output file, get it via the datasvc
00111             # (every writer always has a datasvc property)
00112             self.datasvcName = self.w.EvtDataSvc
00113             datasvc = config[ self.datasvcName ]
00114             if hasattr( datasvc, "Output" ) :
00115                 self.getItemLists( config )
00116                 self.set( self.datasvcName, datasvc.Output )
00117                 return
00118 
00119     def getNewName( self, replaceThis, withThis, extra='' ) :
00120         # replace one pattern in the output name string
00121         #  with another, and return the Output name
00122         # It *might* be in a list, so check for this
00123         #
00124         # @param extra : might need to add ROOT flags
00125         #                i.e.: OPT='RECREATE', or such
00126         assert replaceThis.__class__.__name__ == 'str'
00127         assert    withThis.__class__.__name__ == 'str'
00128         old = self.output
00129         lst = False
00130         if old.__class__.__name__ == 'list' :
00131             old = self.output[0]
00132             lst = True
00133         new = old.replace( replaceThis, withThis )
00134         new += extra
00135         if lst :
00136             return [ new ]
00137         else :
00138             return new
00139 
00140     def getItemLists( self, config ) :
00141         # the item list
00142         if hasattr( self.w, "ItemList" ) :
00143             self.ItemList = self.w.ItemList
00144         else :
00145             datasvc = config[ self.w.EvtDataSvc ]
00146             if hasattr( datasvc, "ItemList" ) :
00147                 self.ItemList = datasvc.ItemList
00148         # The option item list; possibly not a valid variable
00149         if hasattr( self.w, "OptItemList" ) :
00150             self.OptItemList = self.w.OptItemList
00151         else :
00152             datasvc = config[ self.w.EvtDataSvc ]
00153             if hasattr( datasvc, "OptItemList" ) :
00154                 self.OptItemList = datasvc.OptItemList
00155         return
00156 
00157     def set( self, key, output ) :
00158         self.key         = key
00159         self.output      = output
00160         return
00161 
00162     def __repr__( self ) :
00163         s  = ""
00164         line = '-'*80
00165         s += (line+'\n')
00166         s += "Writer         : %s\n"%( self.wName  )
00167         s += "Writer Type    : %s\n"%( self.wType  )
00168         s += "Writer Output  : %s\n"%( self.output )
00169         s += "DataSvc        : %s\n"%( self.datasvcName )
00170         s += "DataSvc Output : %s\n"%( self.svcOutput   )
00171         s += '\n'
00172         s += "Key for config : %s\n"%( self.key    )
00173         s += "Output File    : %s\n"%( self.output )
00174         s += "ItemList       : %s\n"%( self.ItemList )
00175         s += "OptItemList    : %s\n"%( self.OptItemList )
00176         s += (line+'\n')
00177         return s
00178 
00179 # =============================================================================
00180 
00181 class CollectHistograms( PyAlgorithm ) :
00182     '''
00183     GaudiPython algorithm used to clean up histos on the Reader and Workers
00184     Only has a finalize method()
00185     This retrieves a dictionary of path:histo objects and sends it to the
00186     writer.  It then waits for a None flag : THIS IS IMPORTANT, as if
00187     the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00188     at the writer end, there will be an error.
00189     '''
00190     def __init__( self, gmpcomponent ) :
00191         PyAlgorithm.__init__( self )
00192         self._gmpc = gmpcomponent
00193         self.log = self._gmpc.log
00194         return None
00195     def execute( self ) :
00196         return SUCCESS
00197     def finalize( self ) :
00198         self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00199         self._gmpc.hDict = self._gmpc.dumpHistograms( )
00200         ks = self._gmpc.hDict.keys()
00201         self.log.info('%i Objects in Histogram Store'%(len(ks)))
00202         # crashes occurred due to Memory Error during the sending of hundreds
00203         # histos on slc5 machines, so instead, break into chunks
00204         # send 100 at a time
00205         chunk = 100
00206         reps = len(ks)/chunk + 1
00207         for i in xrange(reps) :
00208             someKeys = ks[i*chunk : (i+1)*chunk]
00209             smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00210             self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00211         # "finished" Notification
00212         self.log.debug('Signalling end of histos to Writer')
00213         self._gmpc.hq.put( 'HISTOS_SENT' )
00214         self.log.debug( 'Waiting on Sync Event' )
00215         self._gmpc.sEvent.wait()
00216         self.log.debug( 'Histo Sync Event set, clearing and returning' )
00217         self._gmpc.hvt.clearStore()
00218         root = gbl.DataObject()
00219         setOwnership(root, False)
00220         self._gmpc.hvt.setRoot( '/stat', root )
00221         return SUCCESS
00222 
00223 # =============================================================================
00224 
00225 class EventCommunicator( object ) :
00226     # This class is responsible for communicating Gaudi Events via Queues
00227     # Events are communicated as TBufferFiles, filled either by the
00228     # TESSerializer, or the GaudiSvc, "IPCSvc"
00229     def __init__( self, GMPComponent, qin, qout ) :
00230         self._gmpc = GMPComponent
00231         self.log   = self._gmpc.log
00232         # maximum capacity of Queues
00233         self.maxsize = 50
00234         # central variables : Queues
00235         self.qin  = qin
00236         self.qout = qout
00237 
00238         # flags
00239         self.allsent = False
00240         self.allrecv = False
00241 
00242         # statistics storage
00243         self.nSent    = 0    # counter : items sent
00244         self.nRecv    = 0    # counter : items received
00245         self.sizeSent = 0    # measure : size of events sent ( tbuf.Length() )
00246         self.sizeRecv = 0    # measure : size of events in   ( tbuf.Length() )
00247         self.qinTime  = 0    # time    : receiving from self.qin
00248         self.qoutTime = 0    # time    : sending on qout
00249 
00250     def send( self, item ) :
00251         # This class manages the sending of a TBufferFile Event to a Queue
00252         # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
00253         assert item.__class__.__name__ == 'tuple'
00254         startTransmission = time.time()
00255         self.qout.put( item )
00256         # allow the background thread to feed the Queue; not 100% guaranteed to
00257         # finish before next line
00258         while self.qout._buffer : time.sleep( NAP )
00259         self.qoutTime += time.time()-startTransmission
00260         self.sizeSent += item[1].Length()
00261         self.nSent += 1
00262         return SUCCESS
00263 
00264     def receive( self, timeout=None ) :
00265         # Receive items from self.qin
00266         startWait = time.time()
00267         try :
00268             itemIn = self.qin.get( timeout=timeout )
00269         except Empty :
00270             return None
00271         self.qinTime += time.time()-startWait
00272         self.nRecv += 1
00273         if itemIn.__class__.__name__ == 'tuple' :
00274             self.sizeRecv += itemIn[1].Length()
00275         else :
00276             self.nRecv -= 1
00277         try :
00278             self.qin.task_done()
00279         except :
00280             self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00281                                     %(self._gmpc.nodeType))
00282         return itemIn
00283 
00284     def finalize( self ) :
00285         self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00286         # Reader sends one flag for each worker
00287         # Workers send one flag each
00288         # Writer sends nothing (it's at the end of the chain)
00289         if   self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00290         elif self._gmpc.nodeType == 'Writer' : downstream = 0
00291         elif self._gmpc.nodeType == 'Worker' : downstream = 1
00292         for i in xrange(downstream) :
00293             self.qout.put( 'FINISHED' )
00294         if self._gmpc.nodeType != 'Writer' :
00295             self.qout.join()
00296         # Now some reporting...
00297         self.statistics( )
00298 
00299     def statistics( self ) :
00300         self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00301         self.log.info ( 'Items Sent     : %i'%(self.nSent) )
00302         self.log.info ( 'Items Received : %i'%(self.nRecv) )
00303         self.log.info ( 'Data  Sent     : %i'%(self.sizeSent) )
00304         self.log.info ( 'Data  Received : %i'%(self.sizeRecv) )
00305         self.log.info ( 'Q-out Time     : %5.2f'%(self.qoutTime) )
00306         self.log.info ( 'Q-in  Time     : %5.2f'%(self.qinTime ) )
00307 
00308 # =============================================================================
00309 
00310 class TESSerializer( object ) :
00311     def __init__( self, gaudiTESSerializer, evtDataSvc,
00312                         nodeType, nodeID, log ) :
00313         self.T   = gaudiTESSerializer
00314         self.evt = evtDataSvc
00315         self.buffersIn  = []
00316         self.buffersOut = []
00317         self.nIn     = 0
00318         self.nOut    = 0
00319         self.tDump   = 0.0
00320         self.tLoad   = 0.0
00321         # logging
00322         self.nodeType = nodeType
00323         self.nodeID   = nodeID
00324         self.log      = log
00325     def Load( self, tbuf ) :
00326         root = gbl.DataObject()
00327         setOwnership( root, False )
00328         self.evt.setRoot( '/Event', root )
00329         t = time.time()
00330         self.T.loadBuffer( tbuf )
00331         self.tLoad   += (time.time() - t)
00332         self.nIn     += 1
00333         self.buffersIn.append( tbuf.Length() )
00334     def Dump( self ) :
00335         t = time.time()
00336         tb = TBufferFile( TBuffer.kWrite )
00337         self.T.dumpBuffer(tb)
00338         self.tDump += ( time.time()-t )
00339         self.nOut  += 1
00340         self.buffersOut.append( tb.Length() )
00341         return tb
00342     def Report( self ) :
00343         evIn       = "Events Loaded    : %i"%( self.nIn  )
00344         evOut      = "Events Dumped    : %i"%( self.nOut )
00345         din = sum( self.buffersIn )
00346         dataIn     = "Data Loaded      : %i"%(din)
00347         dataInMb   = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00348         if self.nIn :
00349             avgIn      = "Avg Buf Loaded   : %5.2f Mb"\
00350                           %( din/(self.nIn*MB) )
00351             maxIn      = "Max Buf Loaded   : %5.2f Mb"\
00352                           %( max(self.buffersIn)/MB )
00353         else :
00354             avgIn      = "Avg Buf Loaded   : N/A"
00355             maxIn      = "Max Buf Loaded   : N/A"
00356         dout = sum( self.buffersOut )
00357         dataOut    = "Data Dumped      : %i"%(dout)
00358         dataOutMb  = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00359         if self.nOut :
00360             avgOut     = "Avg Buf Dumped   : %5.2f Mb"\
00361                           %( din/(self.nOut*MB) )
00362             maxOut     = "Max Buf Dumped   : %5.2f Mb"\
00363                           %( max(self.buffersOut)/MB )
00364         else :
00365             avgOut     = "Avg Buf Dumped   : N/A"
00366             maxOut     = "Max Buf Dumped   : N/A"
00367         dumpTime   = "Total Dump Time  : %5.2f"%( self.tDump )
00368         loadTime   = "Total Load Time  : %5.2f"%( self.tLoad )
00369 
00370         lines =  evIn     ,\
00371                  evOut    ,\
00372                  dataIn   ,\
00373                  dataInMb ,\
00374                  avgIn    ,\
00375                  maxIn    ,\
00376                  dataOut  ,\
00377                  dataOutMb,\
00378                  avgOut   ,\
00379                  maxOut   ,\
00380                  dumpTime ,\
00381                  loadTime
00382         self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00383         for line in lines :
00384             self.log.info( line )
00385         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00386 
00387 # =============================================================================
00388 
00389 class GMPComponent( object ) :
00390     # This class will be the template for Reader, Worker and Writer
00391     # containing all common components
00392     # nodeId will be a numerical identifier for the node
00393     # -1 for reader
00394     # -2 for writer
00395     # 0,...,nWorkers-1 for the Workers
00396     def __init__( self, nodeType, nodeID, queues, events, params  ) :
00397         # declare a Gaudi MultiProcessing Node
00398         # the nodeType is going to be one of Reader, Worker, Writer
00399         # qPair is going to be a tuple of ( qin, qout )
00400         # for sending and receiving
00401         # if nodeType is "Writer", it will be a list of qPairs,
00402         # as there's one queue-in from each Worker
00403         #
00404         # params is a tuple of (nWorkers, config, log)
00405 
00406         self.nodeType = nodeType
00407         current_process().name = nodeType
00408 
00409         # Synchronisation Event() objects for keeping track of the system
00410         self.initEvent, eventLoopSyncer, self.finalEvent = events
00411         self.eventLoopSyncer, self.lastEvent = eventLoopSyncer   # unpack tuple
00412 
00413         # necessary for knowledge of the system
00414         self.nWorkers, self.sEvent, self.config, self.log = params
00415         self.nodeID   = nodeID
00416 
00417         # describe the state of the node by the current Event Number
00418         self.currentEvent = None
00419 
00420         # Unpack the Queues : (events, histos, filerecords)
00421         qPair, histq, fq = queues
00422 
00423         # Set up the Queue Mechanisms ( Event Communicators )
00424         if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00425             # Reader or Worker Node
00426             qin, qout = qPair
00427             self.evcom = EventCommunicator( self, qin, qout )
00428         else :
00429             # Writer : many queues in, no queue out
00430             assert self.nodeType == 'Writer'
00431             self.evcoms = []
00432             qsin = qPair[0]
00433             for q in qsin :
00434                 ec = EventCommunicator( self, q, None )
00435                 self.evcoms.append( ec )
00436         # Histogram Queue
00437         self.hq = histq
00438         # FileRecords Queue
00439         self.fq = fq
00440 
00441         # Universal Counters (available to all nodes)
00442         # Use sensibly!!!
00443         self.nIn  = 0
00444         self.nOut = 0
00445 
00446         # Status Flag (possibly remove later)
00447         self.stat = SUCCESS
00448 
00449         # Set logger name
00450         self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00451 
00452         # Heuristic variables
00453         # time for init, run, final, firstEventTime, totalTime
00454         self.iTime       = 0.0
00455         self.rTime       = 0.0
00456         self.fTime       = 0.0
00457         self.firstEvTime = 0.0
00458         self.tTime       = 0.0
00459 
00460         # define the separate process
00461         self.proc = Process( target=self.Engine )
00462 
00463     def Start( self ) :
00464         # Fork and start the separate process
00465         self.proc.start()
00466 
00467     def Engine( self ) :
00468         # This will be the method carried out by the Node
00469         # Different for all
00470         pass
00471 
00472     def processConfiguration( self ) :
00473         # Different for all ; customize Configuration for multicore
00474         pass
00475 
00476     def SetupGaudiPython( self ) :
00477         # This method will initialize the GaudiPython Tools
00478         # such as the AppMgr and so on
00479         self.a   = AppMgr()
00480         if SMAPS :
00481             from AlgSmapShot import SmapShot
00482             smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00483             ss = SmapShot( logname=smapsLog )
00484             self.a.addAlgorithm( ss )
00485         self.evt = self.a.evtsvc()
00486         self.hvt = self.a.histsvc()
00487         self.fsr = self.a.filerecordsvc()
00488         self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00489         self.ts  = gbl.GaudiPython.TESSerializer( self.evt._idp, self.pers )
00490         self.TS  = TESSerializer( self.ts, self.evt,
00491                                   self.nodeType, self.nodeID, self.log )
00492         return SUCCESS
00493 
00494     def StartGaudiPython( self ) :
00495         self.a.initialize()
00496         self.a.start()
00497         return SUCCESS
00498 
00499     def LoadTES( self, tbufferfile ) :
00500         root = gbl.DataObject()
00501         setOwnership(root, False)
00502         self.evt.setRoot( '/Event', root )
00503         self.ts.loadBuffer(tbufferfile)
00504 
00505     def getEventNumber( self ) :
00506         # Using getList or getHistoNames can result in the EventSelector
00507         # re-initialising connection to RootDBase, which costs a lot of
00508         # time... try to build a set of Header paths??
00509 
00510         # First Attempt : Unpacked Event Data
00511         lst = [ '/Event/Gen/Header',
00512                 '/Event/Rec/Header' ]
00513         for l in lst :
00514             path = l
00515             try :
00516                 n = self.evt[path].evtNumber()
00517                 return n
00518             except :
00519                 # No evt number at this path
00520                 continue
00521 
00522         # second attepmt : try DAQ/RawEvent data
00523         # The Evt Number is in bank type 16, bank 0, data pt 4
00524         try :
00525             n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00526             return n
00527         except :
00528             pass
00529 
00530         # Default Action
00531         if self.nIn > 0 or self.nOut > 0 :
00532             pass
00533         else :
00534             self.log.warning('Could not determine Event Number')
00535         return -1
00536 
00537     def IdentifyWriters( self ) :
00538         #
00539         # Identify Writers in the Configuration
00540         #
00541         d = {}
00542         keys = [ "events", "records", "tuples", "histos" ]
00543         for k in keys :
00544             d[k] = []
00545 
00546         # Identify Writers and Classify
00547         wkeys = WRITERTYPES.keys()
00548         for v in self.config.values() :
00549             if v.__class__.__name__ in wkeys :
00550                 writerType = WRITERTYPES[ v.__class__.__name__ ]
00551                 d[writerType].append( MiniWriter(v, writerType, self.config) )
00552                 if self.nodeID == 0 :
00553                     self.log.info('Writer Found : %s'%(v.name()))
00554 
00555         # Now Check for the Histogram Service
00556         if 'HistogramPersistencySvc' in  self.config.keys() :
00557             hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00558             d[ "histos" ].append( hfile )
00559         return d
00560 
00561     def dumpHistograms( self ) :
00562         '''
00563         Method used by the GaudiPython algorithm CollectHistos
00564         to obtain a dictionary of form { path : object }
00565         representing the Histogram Store
00566         '''
00567         nlist = self.hvt.getHistoNames( )
00568         histDict = {}
00569         objects = 0 ; histos = 0
00570         if nlist :
00571             for n in nlist :
00572                 o = self.hvt[ n ]
00573                 if type(o) in aidatypes :
00574                     o = aida2root(o)
00575                     histos  += 1
00576                 else :
00577                     objects += 1
00578                 histDict[ n ] = o
00579         else :
00580             print 'WARNING : no histograms to recover?'
00581         return histDict
00582 
00583     def Initialize( self ) :
00584         start = time.time()
00585         self.processConfiguration( )
00586         self.SetupGaudiPython( )
00587         # Set the initialisation flag!
00588         self.initEvent.set()
00589         self.StartGaudiPython( )
00590         self.iTime = time.time() - start
00591 
00592     def Finalize( self ) :
00593         start = time.time()
00594         self.a.stop()
00595         self.a.finalize()
00596         self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00597         self.finalEvent.set()
00598         self.fTime = time.time() - start
00599 
00600     def Report( self ) :
00601         self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00602         allTime  = "Alive Time     : %5.2f"%(self.tTime)
00603         initTime = "Init Time      : %5.2f"%(self.iTime)
00604         frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00605         runTime  = "Run Time       : %5.2f"%(self.rTime)
00606         finTime  = "Finalise Time  : %5.2f"%(self.fTime)
00607         tup = ( allTime, initTime, frstTime, runTime, finTime )
00608         for t in tup :
00609             self.log.info( t )
00610         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00611         # and report from the TESSerializer
00612         self.TS.Report()
00613 
00614 # =============================================================================
00615 
00616 class Reader( GMPComponent )  :
00617     def __init__( self, queues, events, params ) :
00618         GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00619 
00620     def processConfiguration( self ) :
00621         # Reader :
00622         #   No algorithms
00623         #   No output
00624         #   No histos
00625         self.config[ 'ApplicationMgr' ].TopAlg    = []
00626         self.config[ 'ApplicationMgr' ].OutStream = []
00627         if "HistogramPersistencySvc" in self.config.keys() :
00628             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00629         self.config['MessageSvc'].Format    = '[Reader]% F%18W%S%7W%R%T %0W%M'
00630         self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
00631 
00632     def DumpEvent( self ) :
00633         tb = TBufferFile( TBuffer.kWrite )
00634         # print '----Reader dumping Buffer!!!'
00635         self.ts.dumpBuffer( tb )
00636         # print '\tBuffer Dumped, size : %i'%( tb.Length() )
00637         return tb
00638 
00639     def DoFirstEvent( self ) :
00640         # Do First Event ------------------------------------------------------
00641         # Check Termination Criteria
00642         startFirst = time.time()
00643         self.log.info('Reader : First Event')
00644         if self.nOut == self.evtMax :
00645             self.log.info('evtMax( %i ) reached'%(self.evtMax))
00646             self.lastEvent.set()
00647             return SUCCESS
00648         else :
00649             # Continue to read, dump and send event
00650             self.a.run(1)
00651             if not bool(self.evt['/Event']) :
00652                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00653                 self.lastEvent.set()
00654                 return SUCCESS
00655             else :
00656                 # Popluate TESSerializer list and send Event
00657                 try :
00658                     lst = self.evt.getList()
00659                 except :
00660                     self.log.critical('Reader could not acquire TES List!')
00661                     self.lastEvent.set()
00662                     return FAILURE
00663                 self.log.info('Reader : TES List : %i items'%(len(lst)))
00664                 for l in lst :
00665                     self.ts.addItem(l)
00666                 self.currentEvent = self.getEventNumber( )
00667                 tb = self.TS.Dump( )
00668                 self.log.info('First Event Sent')
00669                 self.evcom.send( (self.currentEvent, tb) )
00670                 self.nOut += 1
00671                 self.eventLoopSyncer.set()
00672                 self.evt.clearStore( )
00673                 self.firstEvTime = time.time()-startFirst
00674                 return SUCCESS
00675 
00676     def Engine( self ) :
00677 
00678         startEngine = time.time()
00679         self.log.name = 'Reader'
00680         self.log.info('Reader Process starting')
00681 
00682         self.Initialize()
00683 
00684         # add the Histogram Collection Algorithm
00685         self.a.addAlgorithm( CollectHistograms(self) )
00686 
00687         self.log.info('Reader Beginning Distribution')
00688         sc = self.DoFirstEvent( )
00689         if sc.isSuccess() :
00690             self.log.info('Reader First Event OK')
00691         else :
00692             self.log.critical('Reader Failed on First Event')
00693             self.stat = FAILURE
00694 
00695         # Do All Others -------------------------------------------------------
00696         while True :
00697             # Check Termination Criteria
00698             if self.nOut == self.evtMax :
00699                 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00700                 break
00701             # Check Health
00702             if not self.stat.isSuccess() :
00703                 self.log.critical( 'Reader is Damaged!' )
00704                 break
00705             # Continue to read, dump and send event
00706             t = time.time()
00707             self.a.run(1)
00708             self.rTime += (time.time()-t)
00709             if not bool(self.evt['/Event']) :
00710                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00711                 break
00712             self.currentEvent = self.getEventNumber( )
00713             tb = self.TS.Dump( )
00714             self.evcom.send( (self.currentEvent, tb) )
00715             # clean up
00716             self.nOut += 1
00717             self.eventLoopSyncer.set()
00718             self.evt.clearStore( )
00719         self.log.info('Setting <Last> Event')
00720         self.lastEvent.set()
00721 
00722         # Finalize
00723         self.log.info( 'Reader : Event Distribution complete.' )
00724         self.evcom.finalize()
00725         self.Finalize()
00726         self.tTime = time.time() - startEngine
00727         self.Report()
00728 
00729 # =============================================================================
00730 
00731 class Worker( GMPComponent ) :
00732     def __init__( self, workerID, queues, events, params ) :
00733         GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00734         # Identify the writer streams
00735         self.writerDict = self.IdentifyWriters( )
00736         # Identify the accept/veto checks for each event
00737         self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00738         self.log.debug("Worker-%i Created OK"%(self.nodeID))
00739         self.eventOutput = True
00740 
00741     def processConfiguration( self ) :
00742         # Worker :
00743         #   No input
00744         #   No output
00745         #   No Histos
00746         self.config[ 'EventSelector'  ].Input     = []
00747         self.config[ 'ApplicationMgr' ].OutStream = []
00748         if "HistogramPersistencySvc" in self.config.keys() :
00749             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00750         formatHead = '[Worker-%i] '%(self.nodeID)
00751         self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00752 
00753         for key, lst in self.writerDict.iteritems() :
00754             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00755 
00756         for m in self.writerDict[ "tuples" ] :
00757             # rename Tuple output file with an appendix
00758             # based on worker id, for merging later
00759             newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00760             self.config[ m.key ].Output = newName
00761 
00762         # Suppress INFO Output for all but Worker-0
00763         if self.nodeID == 0 :
00764             pass
00765         else                :
00766             self.config[ 'MessageSvc' ].OutputLevel = ERROR
00767 
00768     def Engine( self ) :
00769         startEngine = time.time()
00770         self.log.name = "Worker-%i"%(self.nodeID)
00771         self.log.info("Worker %i starting Engine"%(self.nodeID))
00772         self.Initialize()
00773         self.filerecordsAgent = FileRecordsAgent(self)
00774 
00775         # populate the TESSerializer itemlist
00776         self.log.info('EVT WRITERS ON WORKER : %i'\
00777                        %( len(self.writerDict['events'])))
00778 
00779         nEventWriters = len( self.writerDict[ "events" ] )
00780         if nEventWriters :
00781             for m in self.writerDict[ "events" ] :
00782                 for item in m.ItemList :
00783                     self.log.debug(' adding ItemList Item to ts : %s'%(item))
00784                     self.ts.addItem( item )
00785                 for item in m.OptItemList :
00786                     self.log.debug(' adding Optional Item to ts : %s'%(item))
00787                     self.ts.addOptItem( item )
00788         else :
00789             self.log.info( 'There is no Event Output for this app' )
00790             self.eventOutput = False
00791 
00792         # add the Histogram Collection Algorithm
00793         self.a.addAlgorithm( CollectHistograms(self) )
00794 
00795         # Begin processing
00796         self.log.name = "Worker-%i"%(self.nodeID)
00797         Go = True
00798         while Go :
00799             packet = self.evcom.receive( )
00800             if packet : pass
00801             else      : continue
00802             if packet == 'FINISHED' : break
00803             evtNumber, tbin = packet    # unpack
00804             self.nIn += 1
00805             self.TS.Load( tbin )
00806             # print 'Worker-%i : Event %i'%(self.nodeID, evtNumber)
00807             t = time.time()
00808             sc = self.a.executeEvent()
00809             if self.nIn == 1 :
00810                 self.firstEvTime = time.time()-t
00811             else :
00812                 self.rTime += (time.time()-t)
00813             if sc.isSuccess() :
00814                 pass
00815             else :
00816                 self.log.warning('Did not Execute Event')
00817                 self.evt.clearStore()
00818                 continue
00819             if self.isEventPassed() :
00820                 pass
00821             else :
00822                 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00823                 self.evt.clearStore()
00824                 continue
00825             if self.eventOutput :
00826                 # It may be the case of generating Event Tags; hence
00827                 #   no event output
00828                 self.currentEvent = self.getEventNumber( )
00829                 tb = self.TS.Dump( )
00830                 self.evcom.send( (self.currentEvent, tb) )
00831                 self.nOut += 1
00832             self.eventLoopSyncer.set()
00833             self.evt.clearStore( )
00834         self.log.info('Setting <Last> Event')
00835         self.lastEvent.set()
00836 
00837         self.evcom.finalize()
00838         self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00839         # Now send the FileRecords and stop/finalize the appMgr
00840         self.filerecordsAgent.SendFileRecords()
00841         self.Finalize()
00842         self.tTime = time.time()-startEngine
00843         self.Report()
00844 
00845     def getCheckAlgs( self ) :
00846         '''
00847         For some output writers, a check is performed to see if the event has
00848         executed certain algorithms.
00849         These reside in the AcceptAlgs property for those writers
00850         '''
00851         acc = []
00852         req = []
00853         vet = []
00854         for m in self.writerDict[ "events" ] :
00855             if hasattr(m.w, 'AcceptAlgs')  : acc += m.w.AcceptAlgs
00856             if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00857             if hasattr(m.w, 'VetoAlgs')    : vet += m.w.VetoAlgs
00858         return (acc, req, vet)
00859 
00860 
00861     def checkExecutedPassed( self, algName ) :
00862         if  self.a.algorithm( algName )._ialg.isExecuted()\
00863         and self.a.algorithm( algName )._ialg.filterPassed() :
00864             return True
00865         else :
00866             return False
00867 
00868     def isEventPassed( self ) :
00869         '''
00870         Check the algorithm status for an event.
00871         Depending on output writer settings, the event
00872           may be declined based on various criteria.
00873         This is a transcript of the check that occurs in GaudiSvc::OutputStream
00874         '''
00875         passed = False
00876 
00877         self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00878         if self.acceptAlgs :
00879             for name in self.acceptAlgs :
00880                 if self.checkExecutedPassed( name ) :
00881                     passed = True
00882                     break
00883         else :
00884             passed = True
00885 
00886         self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00887         for name in self.requireAlgs :
00888             if self.checkExecutedPassed( name ) :
00889                 pass
00890             else :
00891                 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00892                 passed = False
00893 
00894         self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00895         for name in self.vetoAlgs :
00896             if self.checkExecutedPassed( name ) :
00897                 pass
00898             else :
00899                 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00900                 passed = False
00901         return passed
00902 
00903 # =============================================================================
00904 
00905 class Writer( GMPComponent ) :
00906     def __init__( self, queues, events, params ) :
00907         GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00908         # Identify the writer streams
00909         self.writerDict = self.IdentifyWriters( )
00910         # This keeps track of workers as they finish
00911         self.status = [False]*self.nWorkers
00912         self.log.name = "Writer--2"
00913 
00914     def processConfiguration( self ) :
00915         # Writer :
00916         #   No input
00917         #   No Algs
00918         self.config[ 'ApplicationMgr' ].TopAlg = []
00919         self.config[ 'EventSelector'  ].Input  = []
00920 
00921         self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00922 
00923         # Now process the output writers
00924         for key, lst in self.writerDict.iteritems() :
00925             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00926 
00927         # Modify the name of the output file to reflect that it came
00928         #  from a parallel processing
00929         #
00930         # Event Writers
00931         for m in self.writerDict[ "events" ] :
00932             self.log.debug( 'Processing Event Writer : %s'%(m) )
00933             newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00934             self.config[ m.key ].Output = newName
00935 
00936         # Now, if there are no event writers, the FileRecords file
00937         #   will fail to open, as it only opens an UPDATE version
00938         #   of the existing Event Output File
00939         # So, if there are no event writers, edit the string of the
00940         #   FileRecord Writer
00941 
00942         # FileRecords Writers
00943         for m in self.writerDict[ "records" ] :
00944             self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
00945             newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
00946                                        extra=" OPT='RECREATE'" )
00947             self.config[ m.key ].Output = newName
00948 
00949         # same for histos
00950         hs = "HistogramPersistencySvc"
00951         n = None
00952         if hs in self.config.keys() :
00953             n = self.config[ hs ].OutputFile
00954         if n :
00955             newName=self.config[hs].OutputFile.replace('.',\
00956                                                        '.p%i.'%(self.nWorkers))
00957             self.config[ hs ].OutputFile = newName
00958 
00959     def Engine( self ) :
00960         startEngine = time.time()
00961         self.Initialize()
00962         self.histoAgent = HistoAgent( self )
00963         self.filerecordsAgent = FileRecordsAgent( self )
00964 
00965         # Begin processing
00966         Go      = True
00967         current = -1
00968         stopCriteria = self.nWorkers
00969         while Go :
00970             current = (current+1)%self.nWorkers
00971             packet = self.evcoms[current].receive( timeout=0.01 )
00972             if packet == None :
00973                 continue
00974             if packet == 'FINISHED' :
00975                 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
00976                 self.status[current] = True
00977                 if all(self.status) :
00978                     self.log.info('FINISHED recd from all workers, break loop')
00979                     break
00980                 continue
00981             # otherwise, continue as normal
00982             self.nIn += 1    # update central count (maybe needed by FSR store)
00983             evtNumber, tbin = packet    # unpack
00984             self.TS.Load( tbin )
00985             t  = time.time()
00986             self.a.executeEvent()
00987             self.rTime += ( time.time()-t )
00988             self.currentEvent = self.getEventNumber( )
00989             self.evt.clearStore( )
00990             self.eventLoopSyncer.set()
00991         self.log.name = "Writer--2"
00992         self.log.info('Setting <Last> Event')
00993         self.lastEvent.set()
00994 
00995         # finalisation steps
00996         [ e.finalize() for e in self.evcoms ]
00997         # Now do Histograms
00998         sc = self.histoAgent.Receive()
00999         sc = self.histoAgent.RebuildHistoStore()
01000         if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01001         else              : self.log.warning( 'Histo Store Error in Rebuild' )
01002 
01003         # Now do FileRecords
01004         sc = self.filerecordsAgent.Receive()
01005         self.filerecordsAgent.Rebuild()
01006         self.Finalize()
01007         self.rTime = time.time()-startEngine
01008         self.Report()
01009 
01010 # =============================================================================
01011 
01012 
01013 
01014 # =============================================================================
01015 
01016 class Coord( object ) :
01017     def __init__( self, nWorkers, config, log ) :
01018 
01019         self.log = log
01020         self.config = config
01021         # set up Logging
01022         self.log.name = 'GaudiPython-Parallel-Logger'
01023         self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01024 
01025         if nWorkers == -1 :
01026             # The user has requested all available cpus in the machine
01027             self.nWorkers = cpu_count()
01028         else :
01029             self.nWorkers = nWorkers
01030 
01031 
01032         self.qs = self.SetupQueues( )    # a dictionary of queues (for Events)
01033         self.hq = JoinableQueue( )       # for Histogram data
01034         self.fq = JoinableQueue( )       # for FileRecords data
01035 
01036         # Make a Syncer for Initalise, Run, and Finalise
01037         self.sInit = Syncer( self.nWorkers, self.log,
01038                              limit=WAIT_INITIALISE,
01039                              step=STEP_INITIALISE    )
01040         self.sRun  = Syncer( self.nWorkers, self.log,
01041                              manyEvents=True,
01042                              limit=WAIT_SINGLE_EVENT,
01043                              step=STEP_EVENT,
01044                              firstEvent=WAIT_FIRST_EVENT )
01045         self.sFin  = Syncer( self.nWorkers, self.log,
01046                              limit=WAIT_FINALISE,
01047                              step=STEP_FINALISE )
01048         # and one final one for Histogram Transfer
01049         self.histSyncEvent = Event()
01050 
01051         # params are common to al subprocesses
01052         params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01053 
01054         # Declare SubProcesses!
01055         self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01056         self.workers = []
01057         for i in xrange( self.nWorkers ) :
01058             wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01059             self.workers.append( wk )
01060         self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01061 
01062         self.system = []
01063         self.system.append(self.writer)
01064         [ self.system.append(w) for w in self.workers ]
01065         self.system.append(self.reader)
01066 
01067     def getSyncEvents( self, nodeID ) :
01068         init = self.sInit.d[nodeID].event
01069         run  = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01070         fin  = self.sFin.d[nodeID].event
01071         return ( init, run, fin )
01072 
01073     def getQueues( self, nodeID ) :
01074         eventQ = self.qs[ nodeID ]
01075         histQ  = self.hq
01076         fsrQ   = self.fq
01077         return ( eventQ, histQ, fsrQ )
01078 
01079     def Go( self ) :
01080 
01081         # Initialise
01082         self.log.name = 'GaudiPython-Parallel-Logger'
01083         self.log.info( 'INITIALISING SYSTEM' )
01084         for p in self.system :
01085             p.Start()
01086         sc = self.sInit.syncAll(step="Initialise")
01087         if sc : pass
01088         else  : self.Terminate() ; return FAILURE
01089 
01090         # Run
01091         self.log.name = 'GaudiPython-Parallel-Logger'
01092         self.log.info( 'RUNNING SYSTEM' )
01093         sc = self.sRun.syncAll(step="Run")
01094         if sc : pass
01095         else  : self.Terminate() ; return FAILURE
01096 
01097         # Finalise
01098         self.log.name = 'GaudiPython-Parallel-Logger'
01099         self.log.info( 'FINALISING SYSTEM' )
01100         sc = self.sFin.syncAll(step="Finalise")
01101         if sc : pass
01102         else  : self.Terminate() ; return FAILURE
01103 
01104         # if we've got this far, finally report SUCCESS
01105         self.log.info( "Cleanly join all Processes" )
01106         self.Stop()
01107         self.log.info( "Report Total Success to Main.py" )
01108         return SUCCESS
01109 
01110     def Terminate( self ) :
01111         # Brutally kill sub-processes
01112         self.writer.proc.terminate()
01113         [ w.proc.terminate() for w in self.workers]
01114         self.reader.proc.terminate()
01115 
01116     def Stop( self ) :
01117         # procs should be joined in reverse order to launch
01118         self.system.reverse()
01119         for s in self.system :
01120             s.proc.join()
01121         return SUCCESS
01122 
01123     def SetupQueues( self ) :
01124         # This method will set up the network of Queues needed
01125         # N Queues = nWorkers + 1
01126         # Each Worker has a Queue in, and a Queue out
01127         # Reader has Queue out only
01128         # Writer has nWorkers Queues in
01129 
01130         # one queue from Reader-Workers
01131         rwk = JoinableQueue()
01132         # one queue from each worker to writer
01133         workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01134         d = {}
01135         d[-1] = (None, rwk)              # Reader
01136         d[-2] = (workersWriter, None)    # Writer
01137         for i in xrange(self.nWorkers) : d[i] =  (rwk, workersWriter[i])
01138         return d
01139 
01140 # ============================= EOF ===========================================
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Wed Feb 9 16:24:57 2011 for Gaudi Framework, version v22r0 by Doxygen version 1.6.2 written by Dimitri van Heesch, © 1997-2004