Gaudi Framework, version v22r4

Home   Generated: Fri Sep 2 2011

GMPBase.py

Go to the documentation of this file.
00001 from Gaudi.Configuration import *
00002 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE
00003 from ROOT import TBufferFile, TBuffer
00004 from multiprocessing import Process, Queue, JoinableQueue, Event
00005 from multiprocessing import cpu_count, current_process
00006 from multiprocessing.queues import Empty
00007 from pTools import *
00008 import time, sys, os
00009 
00010 # This script contains the bases for the Gaudi MultiProcessing (GMP)
00011 # classes
00012 
00013 # There are three classes :
00014 #   Reader
00015 #   Worker
00016 #   Writer
00017 
00018 # Each class needs to perform communication with the others
00019 # For this, we need a means of communication, which will be based on
00020 # the python multiprocessing package
00021 # This is provided in SPI pytools package
00022 # cmt line : use pytools v1.1 LCG_Interfaces
00023 # The PYTHONPATH env variable may need to be modified, as this might
00024 # still point to 1.0_python2.5
00025 
00026 # Each class will need Queues, and a defined method for using these
00027 # queues.
00028 # For example, as long as there is something in the Queue, both ends
00029 # of the queue must be open
00030 # Also, there needs to be proper Termination flags and criteria
00031 # The System should be error proof.
00032 
00033 
00034 # Constants -------------------------------------------------------------------
00035 NAP = 0.001
00036 MB  = 1024.0*1024.0
00037 # waits to guard against hanging, in seconds
00038 WAIT_INITIALISE   = 60*5
00039 WAIT_FIRST_EVENT  = 60*3
00040 WAIT_SINGLE_EVENT = 60*3
00041 WAIT_FINALISE     = 60*2
00042 STEP_INITIALISE   = 10
00043 STEP_EVENT        = 2
00044 STEP_FINALISE     = 5
00045 
00046 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
00047 SMAPS = False
00048 
00049 # -----------------------------------------------------------------------------
00050 
00051 # definitions
00052 # ----------
00053 # used to convert stored histos (in AIDA format) to ROOT format
00054 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00055 
00056 # used to check which type of histo we are dealing with
00057 # i.e. if currentHisto in aidatypes : pass
00058 aidatypes = ( gbl.AIDA.IHistogram,
00059               gbl.AIDA.IHistogram1D,
00060               gbl.AIDA.IHistogram2D,
00061               gbl.AIDA.IHistogram3D,
00062               gbl.AIDA.IProfile1D,
00063               gbl.AIDA.IProfile2D,
00064               gbl.AIDA.IBaseHistogram  )  # extra?
00065 
00066 # similar to aidatypes
00067 thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00068 
00069 # Types of OutputStream in Gaudi
00070 WRITERTYPES  = {  'EvtCollectionStream'    : "tuples",
00071                   'InputCopyStream'        : "events",
00072                   'OutputStream'           : "events",
00073                   'RecordStream'           : "records",
00074                   'RunRecordStream'        : "records",
00075                   'SequentialOutputStream' : "events",
00076                   'TagCollectionStream'    : "tuples"   }
00077 
00078 # =============================================================================
00079 
00080 class MiniWriter( object ) :
00081     '''
00082     A class to represent a writer in the GaudiPython configuration
00083     It can be non-trivial to access the name of the output file; it may be
00084     specified in the DataSvc, or just on the writer, may be a list, or string
00085     Also, there are three different types of writer (events, records, tuples)
00086     so this bootstrap class provides easy access to this info while configuring
00087     '''
00088     def __init__( self, writer, wType, config ) :
00089         self.w     = writer
00090         self.wType = wType
00091         # set parameters for directly accessing the correct
00092         #   part of the configuration, so that later, we can do
00093         #   config[ key ].Output = modified(output)
00094         self.key          = None
00095         self.output       = None
00096         self.ItemList     = None
00097         self.OptItemList  = None
00098         #
00099         self.wName = writer.getName()
00100         # Now process the Writer, find where the output is named
00101         self.woutput     = None
00102         self.datasvcName = None
00103         self.svcOutput   = None
00104         if hasattr( self.w, "Output" ) :
00105             self.woutput = self.w.Output
00106             self.getItemLists( config )
00107             self.set( self.wName, self.w.Output )
00108             return
00109         else :
00110             # if there is no output file, get it via the datasvc
00111             # (every writer always has a datasvc property)
00112             self.datasvcName = self.w.EvtDataSvc
00113             datasvc = config[ self.datasvcName ]
00114             if hasattr( datasvc, "Output" ) :
00115                 self.getItemLists( config )
00116                 self.set( self.datasvcName, datasvc.Output )
00117                 return
00118 
00119     def getNewName( self, replaceThis, withThis, extra='' ) :
00120         # replace one pattern in the output name string
00121         #  with another, and return the Output name
00122         # It *might* be in a list, so check for this
00123         #
00124         # @param extra : might need to add ROOT flags
00125         #                i.e.: OPT='RECREATE', or such
00126         assert replaceThis.__class__.__name__ == 'str'
00127         assert    withThis.__class__.__name__ == 'str'
00128         old = self.output
00129         lst = False
00130         if old.__class__.__name__ == 'list' :
00131             old = self.output[0]
00132             lst = True
00133         new = old.replace( replaceThis, withThis )
00134         new += extra
00135         if lst :
00136             return [ new ]
00137         else :
00138             return new
00139 
00140     def getItemLists( self, config ) :
00141         # the item list
00142         if hasattr( self.w, "ItemList" ) :
00143             self.ItemList = self.w.ItemList
00144         else :
00145             datasvc = config[ self.w.EvtDataSvc ]
00146             if hasattr( datasvc, "ItemList" ) :
00147                 self.ItemList = datasvc.ItemList
00148         # The option item list; possibly not a valid variable
00149         if hasattr( self.w, "OptItemList" ) :
00150             self.OptItemList = self.w.OptItemList
00151         else :
00152             datasvc = config[ self.w.EvtDataSvc ]
00153             if hasattr( datasvc, "OptItemList" ) :
00154                 self.OptItemList = datasvc.OptItemList
00155         return
00156 
00157     def set( self, key, output ) :
00158         self.key         = key
00159         self.output      = output
00160         return
00161 
00162     def __repr__( self ) :
00163         s  = ""
00164         line = '-'*80
00165         s += (line+'\n')
00166         s += "Writer         : %s\n"%( self.wName  )
00167         s += "Writer Type    : %s\n"%( self.wType  )
00168         s += "Writer Output  : %s\n"%( self.output )
00169         s += "DataSvc        : %s\n"%( self.datasvcName )
00170         s += "DataSvc Output : %s\n"%( self.svcOutput   )
00171         s += '\n'
00172         s += "Key for config : %s\n"%( self.key    )
00173         s += "Output File    : %s\n"%( self.output )
00174         s += "ItemList       : %s\n"%( self.ItemList )
00175         s += "OptItemList    : %s\n"%( self.OptItemList )
00176         s += (line+'\n')
00177         return s
00178 
00179 # =============================================================================
00180 
00181 class CollectHistograms( PyAlgorithm ) :
00182     '''
00183     GaudiPython algorithm used to clean up histos on the Reader and Workers
00184     Only has a finalize method()
00185     This retrieves a dictionary of path:histo objects and sends it to the
00186     writer.  It then waits for a None flag : THIS IS IMPORTANT, as if
00187     the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00188     at the writer end, there will be an error.
00189     '''
00190     def __init__( self, gmpcomponent ) :
00191         PyAlgorithm.__init__( self )
00192         self._gmpc = gmpcomponent
00193         self.log = self._gmpc.log
00194         return None
00195     def execute( self ) :
00196         return SUCCESS
00197     def finalize( self ) :
00198         self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00199         self._gmpc.hDict = self._gmpc.dumpHistograms( )
00200         ks = self._gmpc.hDict.keys()
00201         self.log.info('%i Objects in Histogram Store'%(len(ks)))
00202         # crashes occurred due to Memory Error during the sending of hundreds
00203         # histos on slc5 machines, so instead, break into chunks
00204         # send 100 at a time
00205         chunk = 100
00206         reps = len(ks)/chunk + 1
00207         for i in xrange(reps) :
00208             someKeys = ks[i*chunk : (i+1)*chunk]
00209             smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00210             self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00211         # "finished" Notification
00212         self.log.debug('Signalling end of histos to Writer')
00213         self._gmpc.hq.put( 'HISTOS_SENT' )
00214         self.log.debug( 'Waiting on Sync Event' )
00215         self._gmpc.sEvent.wait()
00216         self.log.debug( 'Histo Sync Event set, clearing and returning' )
00217         self._gmpc.hvt.clearStore()
00218         root = gbl.DataObject()
00219         setOwnership(root, False)
00220         self._gmpc.hvt.setRoot( '/stat', root )
00221         return SUCCESS
00222 
00223 # =============================================================================
00224 
00225 class EventCommunicator( object ) :
00226     # This class is responsible for communicating Gaudi Events via Queues
00227     # Events are communicated as TBufferFiles, filled either by the
00228     # TESSerializer, or the GaudiSvc, "IPCSvc"
00229     def __init__( self, GMPComponent, qin, qout ) :
00230         self._gmpc = GMPComponent
00231         self.log   = self._gmpc.log
00232         # maximum capacity of Queues
00233         self.maxsize = 50
00234         # central variables : Queues
00235         self.qin  = qin
00236         self.qout = qout
00237 
00238         # flags
00239         self.allsent = False
00240         self.allrecv = False
00241 
00242         # statistics storage
00243         self.nSent    = 0    # counter : items sent
00244         self.nRecv    = 0    # counter : items received
00245         self.sizeSent = 0    # measure : size of events sent ( tbuf.Length() )
00246         self.sizeRecv = 0    # measure : size of events in   ( tbuf.Length() )
00247         self.qinTime  = 0    # time    : receiving from self.qin
00248         self.qoutTime = 0    # time    : sending on qout
00249 
00250     def send( self, item ) :
00251         # This class manages the sending of a TBufferFile Event to a Queue
00252         # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
00253         assert item.__class__.__name__ == 'tuple'
00254         startTransmission = time.time()
00255         self.qout.put( item )
00256         # allow the background thread to feed the Queue; not 100% guaranteed to
00257         # finish before next line
00258         while self.qout._buffer : time.sleep( NAP )
00259         self.qoutTime += time.time()-startTransmission
00260         self.sizeSent += item[1].Length()
00261         self.nSent += 1
00262         return SUCCESS
00263 
00264     def receive( self, timeout=None ) :
00265         # Receive items from self.qin
00266         startWait = time.time()
00267         try :
00268             itemIn = self.qin.get( timeout=timeout )
00269         except Empty :
00270             return None
00271         self.qinTime += time.time()-startWait
00272         self.nRecv += 1
00273         if itemIn.__class__.__name__ == 'tuple' :
00274             self.sizeRecv += itemIn[1].Length()
00275         else :
00276             self.nRecv -= 1
00277         try :
00278             self.qin.task_done()
00279         except :
00280             self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00281                                     %(self._gmpc.nodeType))
00282         return itemIn
00283 
00284     def finalize( self ) :
00285         self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00286         # Reader sends one flag for each worker
00287         # Workers send one flag each
00288         # Writer sends nothing (it's at the end of the chain)
00289         if   self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00290         elif self._gmpc.nodeType == 'Writer' : downstream = 0
00291         elif self._gmpc.nodeType == 'Worker' : downstream = 1
00292         for i in xrange(downstream) :
00293             self.qout.put( 'FINISHED' )
00294         if self._gmpc.nodeType != 'Writer' :
00295             self.qout.join()
00296         # Now some reporting...
00297         self.statistics( )
00298 
00299     def statistics( self ) :
00300         self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00301         self.log.info ( 'Items Sent     : %i'%(self.nSent) )
00302         self.log.info ( 'Items Received : %i'%(self.nRecv) )
00303         self.log.info ( 'Data  Sent     : %i'%(self.sizeSent) )
00304         self.log.info ( 'Data  Received : %i'%(self.sizeRecv) )
00305         self.log.info ( 'Q-out Time     : %5.2f'%(self.qoutTime) )
00306         self.log.info ( 'Q-in  Time     : %5.2f'%(self.qinTime ) )
00307 
00308 # =============================================================================
00309 
00310 class TESSerializer( object ) :
00311     def __init__( self, gaudiTESSerializer, evtDataSvc,
00312                         nodeType, nodeID, log ) :
00313         self.T   = gaudiTESSerializer
00314         self.evt = evtDataSvc
00315         self.buffersIn  = []
00316         self.buffersOut = []
00317         self.nIn     = 0
00318         self.nOut    = 0
00319         self.tDump   = 0.0
00320         self.tLoad   = 0.0
00321         # logging
00322         self.nodeType = nodeType
00323         self.nodeID   = nodeID
00324         self.log      = log
00325     def Load( self, tbuf ) :
00326         root = gbl.DataObject()
00327         setOwnership( root, False )
00328         self.evt.setRoot( '/Event', root )
00329         t = time.time()
00330         self.T.loadBuffer( tbuf )
00331         self.tLoad   += (time.time() - t)
00332         self.nIn     += 1
00333         self.buffersIn.append( tbuf.Length() )
00334     def Dump( self ) :
00335         t = time.time()
00336         tb = TBufferFile( TBuffer.kWrite )
00337         self.T.dumpBuffer(tb)
00338         self.tDump += ( time.time()-t )
00339         self.nOut  += 1
00340         self.buffersOut.append( tb.Length() )
00341         return tb
00342     def Report( self ) :
00343         evIn       = "Events Loaded    : %i"%( self.nIn  )
00344         evOut      = "Events Dumped    : %i"%( self.nOut )
00345         din = sum( self.buffersIn )
00346         dataIn     = "Data Loaded      : %i"%(din)
00347         dataInMb   = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00348         if self.nIn :
00349             avgIn      = "Avg Buf Loaded   : %5.2f Mb"\
00350                           %( din/(self.nIn*MB) )
00351             maxIn      = "Max Buf Loaded   : %5.2f Mb"\
00352                           %( max(self.buffersIn)/MB )
00353         else :
00354             avgIn      = "Avg Buf Loaded   : N/A"
00355             maxIn      = "Max Buf Loaded   : N/A"
00356         dout = sum( self.buffersOut )
00357         dataOut    = "Data Dumped      : %i"%(dout)
00358         dataOutMb  = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00359         if self.nOut :
00360             avgOut     = "Avg Buf Dumped   : %5.2f Mb"\
00361                           %( din/(self.nOut*MB) )
00362             maxOut     = "Max Buf Dumped   : %5.2f Mb"\
00363                           %( max(self.buffersOut)/MB )
00364         else :
00365             avgOut     = "Avg Buf Dumped   : N/A"
00366             maxOut     = "Max Buf Dumped   : N/A"
00367         dumpTime   = "Total Dump Time  : %5.2f"%( self.tDump )
00368         loadTime   = "Total Load Time  : %5.2f"%( self.tLoad )
00369 
00370         lines =  evIn     ,\
00371                  evOut    ,\
00372                  dataIn   ,\
00373                  dataInMb ,\
00374                  avgIn    ,\
00375                  maxIn    ,\
00376                  dataOut  ,\
00377                  dataOutMb,\
00378                  avgOut   ,\
00379                  maxOut   ,\
00380                  dumpTime ,\
00381                  loadTime
00382         self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00383         for line in lines :
00384             self.log.info( line )
00385         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00386 
00387 # =============================================================================
00388 
00389 class GMPComponent( object ) :
00390     # This class will be the template for Reader, Worker and Writer
00391     # containing all common components
00392     # nodeId will be a numerical identifier for the node
00393     # -1 for reader
00394     # -2 for writer
00395     # 0,...,nWorkers-1 for the Workers
00396     def __init__( self, nodeType, nodeID, queues, events, params  ) :
00397         # declare a Gaudi MultiProcessing Node
00398         # the nodeType is going to be one of Reader, Worker, Writer
00399         # qPair is going to be a tuple of ( qin, qout )
00400         # for sending and receiving
00401         # if nodeType is "Writer", it will be a list of qPairs,
00402         # as there's one queue-in from each Worker
00403         #
00404         # params is a tuple of (nWorkers, config, log)
00405 
00406         self.nodeType = nodeType
00407         current_process().name = nodeType
00408 
00409         # Synchronisation Event() objects for keeping track of the system
00410         self.initEvent, eventLoopSyncer, self.finalEvent = events
00411         self.eventLoopSyncer, self.lastEvent = eventLoopSyncer   # unpack tuple
00412 
00413         # necessary for knowledge of the system
00414         self.nWorkers, self.sEvent, self.config, self.log = params
00415         self.nodeID   = nodeID
00416 
00417         # describe the state of the node by the current Event Number
00418         self.currentEvent = None
00419 
00420         # Unpack the Queues : (events, histos, filerecords)
00421         qPair, histq, fq = queues
00422 
00423         # Set up the Queue Mechanisms ( Event Communicators )
00424         if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00425             # Reader or Worker Node
00426             qin, qout = qPair
00427             self.evcom = EventCommunicator( self, qin, qout )
00428         else :
00429             # Writer : many queues in, no queue out
00430             assert self.nodeType == 'Writer'
00431             self.evcoms = []
00432             qsin = qPair[0]
00433             for q in qsin :
00434                 ec = EventCommunicator( self, q, None )
00435                 self.evcoms.append( ec )
00436         # Histogram Queue
00437         self.hq = histq
00438         # FileRecords Queue
00439         self.fq = fq
00440 
00441         # Universal Counters (available to all nodes)
00442         # Use sensibly!!!
00443         self.nIn  = 0
00444         self.nOut = 0
00445 
00446         # Status Flag (possibly remove later)
00447         self.stat = SUCCESS
00448 
00449         # Set logger name
00450         self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00451 
00452         # Heuristic variables
00453         # time for init, run, final, firstEventTime, totalTime
00454         self.iTime       = 0.0
00455         self.rTime       = 0.0
00456         self.fTime       = 0.0
00457         self.firstEvTime = 0.0
00458         self.tTime       = 0.0
00459 
00460         # define the separate process
00461         self.proc = Process( target=self.Engine )
00462 
00463     def Start( self ) :
00464         # Fork and start the separate process
00465         self.proc.start()
00466 
00467     def Engine( self ) :
00468         # This will be the method carried out by the Node
00469         # Different for all
00470         pass
00471 
00472     def processConfiguration( self ) :
00473         # Different for all ; customize Configuration for multicore
00474         pass
00475 
00476     def SetupGaudiPython( self ) :
00477         # This method will initialize the GaudiPython Tools
00478         # such as the AppMgr and so on
00479         self.a   = AppMgr()
00480         if SMAPS :
00481             from AlgSmapShot import SmapShot
00482             smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00483             ss = SmapShot( logname=smapsLog )
00484             self.a.addAlgorithm( ss )
00485         self.evt = self.a.evtsvc()
00486         self.hvt = self.a.histsvc()
00487         self.fsr = self.a.filerecordsvc()
00488         self.inc = self.a.service('IncidentSvc','IIncidentSvc')
00489         self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00490         self.ts  = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
00491         self.TS  = TESSerializer( self.ts, self.evt,
00492                                   self.nodeType, self.nodeID, self.log )
00493         return SUCCESS
00494 
00495     def StartGaudiPython( self ) :
00496         self.a.initialize()
00497         self.a.start()
00498         return SUCCESS
00499 
00500     def LoadTES( self, tbufferfile ) :
00501         root = gbl.DataObject()
00502         setOwnership(root, False)
00503         self.evt.setRoot( '/Event', root )
00504         self.ts.loadBuffer(tbufferfile)
00505 
00506     def getEventNumber( self ) :
00507         # Using getList or getHistoNames can result in the EventSelector
00508         # re-initialising connection to RootDBase, which costs a lot of
00509         # time... try to build a set of Header paths??
00510 
00511         # First Attempt : Unpacked Event Data
00512         lst = [ '/Event/Gen/Header',
00513                 '/Event/Rec/Header' ]
00514         for l in lst :
00515             path = l
00516             try :
00517                 n = self.evt[path].evtNumber()
00518                 return n
00519             except :
00520                 # No evt number at this path
00521                 continue
00522 
00523         # second attepmt : try DAQ/RawEvent data
00524         # The Evt Number is in bank type 16, bank 0, data pt 4
00525         try :
00526             n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00527             return n
00528         except :
00529             pass
00530 
00531         # Default Action
00532         if self.nIn > 0 or self.nOut > 0 :
00533             pass
00534         else :
00535             self.log.warning('Could not determine Event Number')
00536         return -1
00537 
00538     def IdentifyWriters( self ) :
00539         #
00540         # Identify Writers in the Configuration
00541         #
00542         d = {}
00543         keys = [ "events", "records", "tuples", "histos" ]
00544         for k in keys :
00545             d[k] = []
00546 
00547         # Identify Writers and Classify
00548         wkeys = WRITERTYPES.keys()
00549         for v in self.config.values() :
00550             if v.__class__.__name__ in wkeys :
00551                 writerType = WRITERTYPES[ v.__class__.__name__ ]
00552                 d[writerType].append( MiniWriter(v, writerType, self.config) )
00553                 if self.nodeID == 0 :
00554                     self.log.info('Writer Found : %s'%(v.name()))
00555 
00556         # Now Check for the Histogram Service
00557         if 'HistogramPersistencySvc' in  self.config.keys() :
00558             hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00559             d[ "histos" ].append( hfile )
00560         return d
00561 
00562     def dumpHistograms( self ) :
00563         '''
00564         Method used by the GaudiPython algorithm CollectHistos
00565         to obtain a dictionary of form { path : object }
00566         representing the Histogram Store
00567         '''
00568         nlist = self.hvt.getHistoNames( )
00569         histDict = {}
00570         objects = 0 ; histos = 0
00571         if nlist :
00572             for n in nlist :
00573                 o = self.hvt[ n ]
00574                 if type(o) in aidatypes :
00575                     o = aida2root(o)
00576                     histos  += 1
00577                 else :
00578                     objects += 1
00579                 histDict[ n ] = o
00580         else :
00581             print 'WARNING : no histograms to recover?'
00582         return histDict
00583 
00584     def Initialize( self ) :
00585         start = time.time()
00586         self.processConfiguration( )
00587         self.SetupGaudiPython( )
00588         # Set the initialisation flag!
00589         self.initEvent.set()
00590         self.StartGaudiPython( )
00591         self.iTime = time.time() - start
00592 
00593     def Finalize( self ) :
00594         start = time.time()
00595         self.a.stop()
00596         self.a.finalize()
00597         self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00598         self.finalEvent.set()
00599         self.fTime = time.time() - start
00600 
00601     def Report( self ) :
00602         self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00603         allTime  = "Alive Time     : %5.2f"%(self.tTime)
00604         initTime = "Init Time      : %5.2f"%(self.iTime)
00605         frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00606         runTime  = "Run Time       : %5.2f"%(self.rTime)
00607         finTime  = "Finalise Time  : %5.2f"%(self.fTime)
00608         tup = ( allTime, initTime, frstTime, runTime, finTime )
00609         for t in tup :
00610             self.log.info( t )
00611         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00612         # and report from the TESSerializer
00613         self.TS.Report()
00614 
00615 # =============================================================================
00616 
00617 class Reader( GMPComponent )  :
00618     def __init__( self, queues, events, params ) :
00619         GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00620 
00621     def processConfiguration( self ) :
00622         # Reader :
00623         #   No algorithms
00624         #   No output
00625         #   No histos
00626         self.config[ 'ApplicationMgr' ].TopAlg    = []
00627         self.config[ 'ApplicationMgr' ].OutStream = []
00628         if "HistogramPersistencySvc" in self.config.keys() :
00629             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00630         self.config['MessageSvc'].Format    = '[Reader]% F%18W%S%7W%R%T %0W%M'
00631         self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
00632 
00633     def DumpEvent( self ) :
00634         tb = TBufferFile( TBuffer.kWrite )
00635         # print '----Reader dumping Buffer!!!'
00636         self.ts.dumpBuffer( tb )
00637         # print '\tBuffer Dumped, size : %i'%( tb.Length() )
00638         return tb
00639 
00640     def DoFirstEvent( self ) :
00641         # Do First Event ------------------------------------------------------
00642         # Check Termination Criteria
00643         startFirst = time.time()
00644         self.log.info('Reader : First Event')
00645         if self.nOut == self.evtMax :
00646             self.log.info('evtMax( %i ) reached'%(self.evtMax))
00647             self.lastEvent.set()
00648             return SUCCESS
00649         else :
00650             # Continue to read, dump and send event
00651             self.a.run(1)
00652             if not bool(self.evt['/Event']) :
00653                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00654                 self.lastEvent.set()
00655                 return SUCCESS
00656             else :
00657                 # Popluate TESSerializer list and send Event
00658                 try :
00659                     lst = self.evt.getList()
00660                 except :
00661                     self.log.critical('Reader could not acquire TES List!')
00662                     self.lastEvent.set()
00663                     return FAILURE
00664                 self.log.info('Reader : TES List : %i items'%(len(lst)))
00665                 for l in lst :
00666                     self.ts.addItem(l)
00667                 self.currentEvent = self.getEventNumber( )
00668                 tb = self.TS.Dump( )
00669                 self.log.info('First Event Sent')
00670                 self.evcom.send( (self.currentEvent, tb) )
00671                 self.nOut += 1
00672                 self.eventLoopSyncer.set()
00673                 self.evt.clearStore( )
00674                 self.firstEvTime = time.time()-startFirst
00675                 return SUCCESS
00676 
00677     def Engine( self ) :
00678 
00679         startEngine = time.time()
00680         self.log.name = 'Reader'
00681         self.log.info('Reader Process starting')
00682 
00683         self.Initialize()
00684 
00685         # add the Histogram Collection Algorithm
00686         self.a.addAlgorithm( CollectHistograms(self) )
00687 
00688         self.log.info('Reader Beginning Distribution')
00689         sc = self.DoFirstEvent( )
00690         if sc.isSuccess() :
00691             self.log.info('Reader First Event OK')
00692         else :
00693             self.log.critical('Reader Failed on First Event')
00694             self.stat = FAILURE
00695 
00696         # Do All Others -------------------------------------------------------
00697         while True :
00698             # Check Termination Criteria
00699             if self.nOut == self.evtMax :
00700                 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00701                 break
00702             # Check Health
00703             if not self.stat.isSuccess() :
00704                 self.log.critical( 'Reader is Damaged!' )
00705                 break
00706             # Continue to read, dump and send event
00707             t = time.time()
00708             self.a.run(1)
00709             self.rTime += (time.time()-t)
00710             if not bool(self.evt['/Event']) :
00711                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00712                 break
00713             self.currentEvent = self.getEventNumber( )
00714             tb = self.TS.Dump( )
00715             self.evcom.send( (self.currentEvent, tb) )
00716             # clean up
00717             self.nOut += 1
00718             self.eventLoopSyncer.set()
00719             self.evt.clearStore( )
00720         self.log.info('Setting <Last> Event')
00721         self.lastEvent.set()
00722 
00723         # Finalize
00724         self.log.info( 'Reader : Event Distribution complete.' )
00725         self.evcom.finalize()
00726         self.Finalize()
00727         self.tTime = time.time() - startEngine
00728         self.Report()
00729 
00730 # =============================================================================
00731 
00732 class Worker( GMPComponent ) :
00733     def __init__( self, workerID, queues, events, params ) :
00734         GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00735         # Identify the writer streams
00736         self.writerDict = self.IdentifyWriters( )
00737         # Identify the accept/veto checks for each event
00738         self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00739         self.log.debug("Worker-%i Created OK"%(self.nodeID))
00740         self.eventOutput = True
00741 
00742     def processConfiguration( self ) :
00743         # Worker :
00744         #   No input
00745         #   No output
00746         #   No Histos
00747         self.config[ 'EventSelector'  ].Input     = []
00748         self.config[ 'ApplicationMgr' ].OutStream = []
00749         if "HistogramPersistencySvc" in self.config.keys() :
00750             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00751         formatHead = '[Worker-%i] '%(self.nodeID)
00752         self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00753 
00754         for key, lst in self.writerDict.iteritems() :
00755             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00756 
00757         for m in self.writerDict[ "tuples" ] :
00758             # rename Tuple output file with an appendix
00759             # based on worker id, for merging later
00760             newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00761             self.config[ m.key ].Output = newName
00762 
00763         # Suppress INFO Output for all but Worker-0
00764         if self.nodeID == 0 :
00765             pass
00766         else                :
00767             self.config[ 'MessageSvc' ].OutputLevel = ERROR
00768 
00769     def Engine( self ) :
00770         startEngine = time.time()
00771         self.log.name = "Worker-%i"%(self.nodeID)
00772         self.log.info("Worker %i starting Engine"%(self.nodeID))
00773         self.Initialize()
00774         self.filerecordsAgent = FileRecordsAgent(self)
00775 
00776         # populate the TESSerializer itemlist
00777         self.log.info('EVT WRITERS ON WORKER : %i'\
00778                        %( len(self.writerDict['events'])))
00779 
00780         nEventWriters = len( self.writerDict[ "events" ] )
00781         if nEventWriters :
00782             for m in self.writerDict[ "events" ] :
00783                 for item in m.ItemList :
00784                     self.log.debug(' adding ItemList Item to ts : %s'%(item))
00785                     self.ts.addItem( item )
00786                 for item in m.OptItemList :
00787                     self.log.debug(' adding Optional Item to ts : %s'%(item))
00788                     self.ts.addOptItem( item )
00789         else :
00790             self.log.info( 'There is no Event Output for this app' )
00791             self.eventOutput = False
00792 
00793         # add the Histogram Collection Algorithm
00794         self.a.addAlgorithm( CollectHistograms(self) )
00795 
00796         # Begin processing
00797         self.log.name = "Worker-%i"%(self.nodeID)
00798         Go = True
00799         while Go :
00800             packet = self.evcom.receive( )
00801             if packet : pass
00802             else      : continue
00803             if packet == 'FINISHED' : break
00804             evtNumber, tbin = packet    # unpack
00805             self.nIn += 1
00806             self.TS.Load( tbin )
00807             # print 'Worker-%i : Event %i'%(self.nodeID, evtNumber)
00808             t = time.time()
00809             sc = self.a.executeEvent()
00810             if self.nIn == 1 :
00811                 self.firstEvTime = time.time()-t
00812             else :
00813                 self.rTime += (time.time()-t)
00814             if sc.isSuccess() :
00815                 pass
00816             else :
00817                 self.log.warning('Did not Execute Event')
00818                 self.evt.clearStore()
00819                 continue
00820             if self.isEventPassed() :
00821                 pass
00822             else :
00823                 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00824                 self.evt.clearStore()
00825                 continue
00826             if self.eventOutput :
00827                 # It may be the case of generating Event Tags; hence
00828                 #   no event output
00829                 self.currentEvent = self.getEventNumber( )
00830                 tb = self.TS.Dump( )
00831                 self.evcom.send( (self.currentEvent, tb) )
00832                 self.nOut += 1
00833             self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
00834             self.eventLoopSyncer.set()
00835             self.evt.clearStore( )
00836         self.log.info('Setting <Last> Event')
00837         self.lastEvent.set()
00838 
00839         self.evcom.finalize()
00840         self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00841         # Now send the FileRecords and stop/finalize the appMgr
00842         self.filerecordsAgent.SendFileRecords()
00843         self.Finalize()
00844         self.tTime = time.time()-startEngine
00845         self.Report()
00846 
00847     def getCheckAlgs( self ) :
00848         '''
00849         For some output writers, a check is performed to see if the event has
00850         executed certain algorithms.
00851         These reside in the AcceptAlgs property for those writers
00852         '''
00853         acc = []
00854         req = []
00855         vet = []
00856         for m in self.writerDict[ "events" ] :
00857             if hasattr(m.w, 'AcceptAlgs')  : acc += m.w.AcceptAlgs
00858             if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00859             if hasattr(m.w, 'VetoAlgs')    : vet += m.w.VetoAlgs
00860         return (acc, req, vet)
00861 
00862 
00863     def checkExecutedPassed( self, algName ) :
00864         if  self.a.algorithm( algName )._ialg.isExecuted()\
00865         and self.a.algorithm( algName )._ialg.filterPassed() :
00866             return True
00867         else :
00868             return False
00869 
00870     def isEventPassed( self ) :
00871         '''
00872         Check the algorithm status for an event.
00873         Depending on output writer settings, the event
00874           may be declined based on various criteria.
00875         This is a transcript of the check that occurs in GaudiSvc::OutputStream
00876         '''
00877         passed = False
00878 
00879         self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00880         if self.acceptAlgs :
00881             for name in self.acceptAlgs :
00882                 if self.checkExecutedPassed( name ) :
00883                     passed = True
00884                     break
00885         else :
00886             passed = True
00887 
00888         self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00889         for name in self.requireAlgs :
00890             if self.checkExecutedPassed( name ) :
00891                 pass
00892             else :
00893                 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00894                 passed = False
00895 
00896         self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00897         for name in self.vetoAlgs :
00898             if self.checkExecutedPassed( name ) :
00899                 pass
00900             else :
00901                 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00902                 passed = False
00903         return passed
00904 
00905 # =============================================================================
00906 
00907 class Writer( GMPComponent ) :
00908     def __init__( self, queues, events, params ) :
00909         GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00910         # Identify the writer streams
00911         self.writerDict = self.IdentifyWriters( )
00912         # This keeps track of workers as they finish
00913         self.status = [False]*self.nWorkers
00914         self.log.name = "Writer--2"
00915 
00916     def processConfiguration( self ) :
00917         # Writer :
00918         #   No input
00919         #   No Algs
00920         self.config[ 'ApplicationMgr' ].TopAlg = []
00921         self.config[ 'EventSelector'  ].Input  = []
00922 
00923         self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00924 
00925         # Now process the output writers
00926         for key, lst in self.writerDict.iteritems() :
00927             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00928 
00929         # Modify the name of the output file to reflect that it came
00930         #  from a parallel processing
00931         #
00932         # Event Writers
00933         for m in self.writerDict[ "events" ] :
00934             self.log.debug( 'Processing Event Writer : %s'%(m) )
00935             newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00936             self.config[ m.key ].Output = newName
00937 
00938         # Now, if there are no event writers, the FileRecords file
00939         #   will fail to open, as it only opens an UPDATE version
00940         #   of the existing Event Output File
00941         # So, if there are no event writers, edit the string of the
00942         #   FileRecord Writer
00943 
00944         # FileRecords Writers
00945         for m in self.writerDict[ "records" ] :
00946             self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
00947             newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
00948                                        extra=" OPT='RECREATE'" )
00949             self.config[ m.key ].Output = newName
00950 
00951         # same for histos
00952         hs = "HistogramPersistencySvc"
00953         n = None
00954         if hs in self.config.keys() :
00955             n = self.config[ hs ].OutputFile
00956         if n :
00957             newName=self.config[hs].OutputFile.replace('.',\
00958                                                        '.p%i.'%(self.nWorkers))
00959             self.config[ hs ].OutputFile = newName
00960 
00961     def Engine( self ) :
00962         startEngine = time.time()
00963         self.Initialize()
00964         self.histoAgent = HistoAgent( self )
00965         self.filerecordsAgent = FileRecordsAgent( self )
00966 
00967         # Begin processing
00968         Go      = True
00969         current = -1
00970         stopCriteria = self.nWorkers
00971         while Go :
00972             current = (current+1)%self.nWorkers
00973             packet = self.evcoms[current].receive( timeout=0.01 )
00974             if packet == None :
00975                 continue
00976             if packet == 'FINISHED' :
00977                 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
00978                 self.status[current] = True
00979                 if all(self.status) :
00980                     self.log.info('FINISHED recd from all workers, break loop')
00981                     break
00982                 continue
00983             # otherwise, continue as normal
00984             self.nIn += 1    # update central count (maybe needed by FSR store)
00985             evtNumber, tbin = packet    # unpack
00986             self.TS.Load( tbin )
00987             t  = time.time()
00988             self.a.executeEvent()
00989             self.rTime += ( time.time()-t )
00990             self.currentEvent = self.getEventNumber( )
00991             self.evt.clearStore( )
00992             self.eventLoopSyncer.set()
00993         self.log.name = "Writer--2"
00994         self.log.info('Setting <Last> Event')
00995         self.lastEvent.set()
00996 
00997         # finalisation steps
00998         [ e.finalize() for e in self.evcoms ]
00999         # Now do Histograms
01000         sc = self.histoAgent.Receive()
01001         sc = self.histoAgent.RebuildHistoStore()
01002         if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01003         else              : self.log.warning( 'Histo Store Error in Rebuild' )
01004 
01005         # Now do FileRecords
01006         sc = self.filerecordsAgent.Receive()
01007         self.filerecordsAgent.Rebuild()
01008         self.Finalize()
01009         self.rTime = time.time()-startEngine
01010         self.Report()
01011 
01012 # =============================================================================
01013 
01014 
01015 
01016 # =============================================================================
01017 
01018 class Coord( object ) :
01019     def __init__( self, nWorkers, config, log ) :
01020 
01021         self.log = log
01022         self.config = config
01023         # set up Logging
01024         self.log.name = 'GaudiPython-Parallel-Logger'
01025         self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01026 
01027         if nWorkers == -1 :
01028             # The user has requested all available cpus in the machine
01029             self.nWorkers = cpu_count()
01030         else :
01031             self.nWorkers = nWorkers
01032 
01033 
01034         self.qs = self.SetupQueues( )    # a dictionary of queues (for Events)
01035         self.hq = JoinableQueue( )       # for Histogram data
01036         self.fq = JoinableQueue( )       # for FileRecords data
01037 
01038         # Make a Syncer for Initalise, Run, and Finalise
01039         self.sInit = Syncer( self.nWorkers, self.log,
01040                              limit=WAIT_INITIALISE,
01041                              step=STEP_INITIALISE    )
01042         self.sRun  = Syncer( self.nWorkers, self.log,
01043                              manyEvents=True,
01044                              limit=WAIT_SINGLE_EVENT,
01045                              step=STEP_EVENT,
01046                              firstEvent=WAIT_FIRST_EVENT )
01047         self.sFin  = Syncer( self.nWorkers, self.log,
01048                              limit=WAIT_FINALISE,
01049                              step=STEP_FINALISE )
01050         # and one final one for Histogram Transfer
01051         self.histSyncEvent = Event()
01052 
01053         # params are common to al subprocesses
01054         params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01055 
01056         # Declare SubProcesses!
01057         self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01058         self.workers = []
01059         for i in xrange( self.nWorkers ) :
01060             wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01061             self.workers.append( wk )
01062         self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01063 
01064         self.system = []
01065         self.system.append(self.writer)
01066         [ self.system.append(w) for w in self.workers ]
01067         self.system.append(self.reader)
01068 
01069     def getSyncEvents( self, nodeID ) :
01070         init = self.sInit.d[nodeID].event
01071         run  = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01072         fin  = self.sFin.d[nodeID].event
01073         return ( init, run, fin )
01074 
01075     def getQueues( self, nodeID ) :
01076         eventQ = self.qs[ nodeID ]
01077         histQ  = self.hq
01078         fsrQ   = self.fq
01079         return ( eventQ, histQ, fsrQ )
01080 
01081     def Go( self ) :
01082 
01083         # Initialise
01084         self.log.name = 'GaudiPython-Parallel-Logger'
01085         self.log.info( 'INITIALISING SYSTEM' )
01086         for p in self.system :
01087             p.Start()
01088         sc = self.sInit.syncAll(step="Initialise")
01089         if sc : pass
01090         else  : self.Terminate() ; return FAILURE
01091 
01092         # Run
01093         self.log.name = 'GaudiPython-Parallel-Logger'
01094         self.log.info( 'RUNNING SYSTEM' )
01095         sc = self.sRun.syncAll(step="Run")
01096         if sc : pass
01097         else  : self.Terminate() ; return FAILURE
01098 
01099         # Finalise
01100         self.log.name = 'GaudiPython-Parallel-Logger'
01101         self.log.info( 'FINALISING SYSTEM' )
01102         sc = self.sFin.syncAll(step="Finalise")
01103         if sc : pass
01104         else  : self.Terminate() ; return FAILURE
01105 
01106         # if we've got this far, finally report SUCCESS
01107         self.log.info( "Cleanly join all Processes" )
01108         self.Stop()
01109         self.log.info( "Report Total Success to Main.py" )
01110         return SUCCESS
01111 
01112     def Terminate( self ) :
01113         # Brutally kill sub-processes
01114         self.writer.proc.terminate()
01115         [ w.proc.terminate() for w in self.workers]
01116         self.reader.proc.terminate()
01117 
01118     def Stop( self ) :
01119         # procs should be joined in reverse order to launch
01120         self.system.reverse()
01121         for s in self.system :
01122             s.proc.join()
01123         return SUCCESS
01124 
01125     def SetupQueues( self ) :
01126         # This method will set up the network of Queues needed
01127         # N Queues = nWorkers + 1
01128         # Each Worker has a Queue in, and a Queue out
01129         # Reader has Queue out only
01130         # Writer has nWorkers Queues in
01131 
01132         # one queue from Reader-Workers
01133         rwk = JoinableQueue()
01134         # one queue from each worker to writer
01135         workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01136         d = {}
01137         d[-1] = (None, rwk)              # Reader
01138         d[-2] = (workersWriter, None)    # Writer
01139         for i in xrange(self.nWorkers) : d[i] =  (rwk, workersWriter[i])
01140         return d
01141 
01142 # ============================= EOF ===========================================
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Fri Sep 2 2011 16:24:46 for Gaudi Framework, version v22r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004