Gaudi Framework, version v23r4

Home   Generated: Mon Sep 17 2012

GMPBase.py

Go to the documentation of this file.
00001 from Gaudi.Configuration import *
00002 from Configurables import EvtCounter
00003 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
00004 from ROOT import TBufferFile, TBuffer
00005 import multiprocessing
00006 from multiprocessing import Process, Queue, JoinableQueue, Event
00007 from multiprocessing import cpu_count, current_process
00008 from multiprocessing.queues import Empty
00009 from pTools import *
00010 import time, sys, os
00011 
00012 # This script contains the bases for the Gaudi MultiProcessing (GMP)
00013 # classes
00014 
00015 # There are three classes :
00016 #   Reader
00017 #   Worker
00018 #   Writer
00019 
00020 # Each class needs to perform communication with the others
00021 # For this, we need a means of communication, which will be based on
00022 # the python multiprocessing package
00023 # This is provided in SPI pytools package
00024 # cmt line : use pytools v1.1 LCG_Interfaces
00025 # The PYTHONPATH env variable may need to be modified, as this might
00026 # still point to 1.0_python2.5
00027 
00028 # Each class will need Queues, and a defined method for using these
00029 # queues.
00030 # For example, as long as there is something in the Queue, both ends
00031 # of the queue must be open
00032 # Also, there needs to be proper Termination flags and criteria
00033 # The System should be error proof.
00034 
00035 
00036 # Constants -------------------------------------------------------------------
00037 NAP = 0.001
00038 MB  = 1024.0*1024.0
00039 # waits to guard against hanging, in seconds
00040 WAIT_INITIALISE   = 60*5
00041 WAIT_FIRST_EVENT  = 60*3
00042 WAIT_SINGLE_EVENT = 60*6
00043 WAIT_FINALISE     = 60*2
00044 STEP_INITIALISE   = 10
00045 STEP_EVENT        = 2
00046 STEP_FINALISE     = 5
00047 
00048 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
00049 SMAPS = False
00050 
00051 # -----------------------------------------------------------------------------
00052 
00053 # definitions
00054 # ----------
00055 # used to convert stored histos (in AIDA format) to ROOT format
00056 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00057 
00058 # used to check which type of histo we are dealing with
00059 # i.e. if currentHisto in aidatypes : pass
00060 aidatypes = ( gbl.AIDA.IHistogram,
00061               gbl.AIDA.IHistogram1D,
00062               gbl.AIDA.IHistogram2D,
00063               gbl.AIDA.IHistogram3D,
00064               gbl.AIDA.IProfile1D,
00065               gbl.AIDA.IProfile2D,
00066               gbl.AIDA.IBaseHistogram  )  # extra?
00067 
00068 # similar to aidatypes
00069 thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00070 
00071 # Types of OutputStream in Gaudi
00072 WRITERTYPES  = {  'EvtCollectionStream'    : "tuples",
00073                   'InputCopyStream'        : "events",
00074                   'OutputStream'           : "events",
00075                   'RecordStream'           : "records",
00076                   'RunRecordStream'        : "records",
00077                   'SequentialOutputStream' : "events",
00078                   'TagCollectionStream'    : "tuples"   }
00079 
00080 # =============================================================================
00081 
00082 class MiniWriter( object ) :
00083     '''
00084     A class to represent a writer in the GaudiPython configuration
00085     It can be non-trivial to access the name of the output file; it may be
00086     specified in the DataSvc, or just on the writer, may be a list, or string
00087     Also, there are three different types of writer (events, records, tuples)
00088     so this bootstrap class provides easy access to this info while configuring
00089     '''
00090     def __init__( self, writer, wType, config ) :
00091         self.w     = writer
00092         self.wType = wType
00093         # set parameters for directly accessing the correct
00094         #   part of the configuration, so that later, we can do
00095         #   config[ key ].Output = modified(output)
00096         self.key          = None
00097         self.output       = None
00098         self.ItemList     = None
00099         self.OptItemList  = None
00100         #
00101         self.wName = writer.getName()
00102         # Now process the Writer, find where the output is named
00103         self.woutput     = None
00104         self.datasvcName = None
00105         self.svcOutput   = None
00106         if hasattr( self.w, "Output" ) :
00107             self.woutput = self.w.Output
00108             self.getItemLists( config )
00109             self.set( self.wName, self.w.Output )
00110             return
00111         else :
00112             # if there is no output file, get it via the datasvc
00113             # (every writer always has a datasvc property)
00114             self.datasvcName = self.w.EvtDataSvc
00115             datasvc = config[ self.datasvcName ]
00116             if hasattr( datasvc, "Output" ) :
00117                 self.getItemLists( config )
00118                 self.set( self.datasvcName, datasvc.Output )
00119                 return
00120 
00121     def getNewName( self, replaceThis, withThis, extra='' ) :
00122         # replace one pattern in the output name string
00123         #  with another, and return the Output name
00124         # It *might* be in a list, so check for this
00125         #
00126         # @param extra : might need to add ROOT flags
00127         #                i.e.: OPT='RECREATE', or such
00128         assert replaceThis.__class__.__name__ == 'str'
00129         assert    withThis.__class__.__name__ == 'str'
00130         old = self.output
00131         lst = False
00132         if old.__class__.__name__ == 'list' :
00133             old = self.output[0]
00134             lst = True
00135         new = old.replace( replaceThis, withThis )
00136         new += extra
00137         if lst :
00138             return [ new ]
00139         else :
00140             return new
00141 
00142     def getItemLists( self, config ) :
00143         # the item list
00144         if hasattr( self.w, "ItemList" ) :
00145             self.ItemList = self.w.ItemList
00146         else :
00147             datasvc = config[ self.w.EvtDataSvc ]
00148             if hasattr( datasvc, "ItemList" ) :
00149                 self.ItemList = datasvc.ItemList
00150         # The option item list; possibly not a valid variable
00151         if hasattr( self.w, "OptItemList" ) :
00152             self.OptItemList = self.w.OptItemList
00153         else :
00154             datasvc = config[ self.w.EvtDataSvc ]
00155             if hasattr( datasvc, "OptItemList" ) :
00156                 self.OptItemList = datasvc.OptItemList
00157         return
00158 
00159     def set( self, key, output ) :
00160         self.key         = key
00161         self.output      = output
00162         return
00163 
00164     def __repr__( self ) :
00165         s  = ""
00166         line = '-'*80
00167         s += (line+'\n')
00168         s += "Writer         : %s\n"%( self.wName  )
00169         s += "Writer Type    : %s\n"%( self.wType  )
00170         s += "Writer Output  : %s\n"%( self.output )
00171         s += "DataSvc        : %s\n"%( self.datasvcName )
00172         s += "DataSvc Output : %s\n"%( self.svcOutput   )
00173         s += '\n'
00174         s += "Key for config : %s\n"%( self.key    )
00175         s += "Output File    : %s\n"%( self.output )
00176         s += "ItemList       : %s\n"%( self.ItemList )
00177         s += "OptItemList    : %s\n"%( self.OptItemList )
00178         s += (line+'\n')
00179         return s
00180 
00181 # =============================================================================
00182 
00183 class CollectHistograms( PyAlgorithm ) :
00184     '''
00185     GaudiPython algorithm used to clean up histos on the Reader and Workers
00186     Only has a finalize method()
00187     This retrieves a dictionary of path:histo objects and sends it to the
00188     writer.  It then waits for a None flag : THIS IS IMPORTANT, as if
00189     the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00190     at the writer end, there will be an error.
00191     '''
00192     def __init__( self, gmpcomponent ) :
00193         PyAlgorithm.__init__( self )
00194         self._gmpc = gmpcomponent
00195         self.log = self._gmpc.log
00196         return None
00197     def execute( self ) :
00198         return SUCCESS
00199     def finalize( self ) :
00200         self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00201         self._gmpc.hDict = self._gmpc.dumpHistograms( )
00202         ks = self._gmpc.hDict.keys()
00203         self.log.info('%i Objects in Histogram Store'%(len(ks)))
00204         # crashes occurred due to Memory Error during the sending of hundreds
00205         # histos on slc5 machines, so instead, break into chunks
00206         # send 100 at a time
00207         chunk = 100
00208         reps = len(ks)/chunk + 1
00209         for i in xrange(reps) :
00210             someKeys = ks[i*chunk : (i+1)*chunk]
00211             smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00212             self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00213         # "finished" Notification
00214         self.log.debug('Signalling end of histos to Writer')
00215         self._gmpc.hq.put( 'HISTOS_SENT' )
00216         self.log.debug( 'Waiting on Sync Event' )
00217         self._gmpc.sEvent.wait()
00218         self.log.debug( 'Histo Sync Event set, clearing and returning' )
00219         self._gmpc.hvt.clearStore()
00220         root = gbl.DataObject()
00221         setOwnership(root, False)
00222         self._gmpc.hvt.setRoot( '/stat', root )
00223         return SUCCESS
00224 
00225 # =============================================================================
00226 
00227 class EventCommunicator( object ) :
00228     # This class is responsible for communicating Gaudi Events via Queues
00229     # Events are communicated as TBufferFiles, filled either by the
00230     # TESSerializer, or the GaudiSvc, "IPCSvc"
00231     def __init__( self, GMPComponent, qin, qout ) :
00232         self._gmpc = GMPComponent
00233         self.log   = self._gmpc.log
00234         # maximum capacity of Queues
00235         self.maxsize = 50
00236         # central variables : Queues
00237         self.qin  = qin
00238         self.qout = qout
00239 
00240         # flags
00241         self.allsent = False
00242         self.allrecv = False
00243 
00244         # statistics storage
00245         self.nSent    = 0    # counter : items sent
00246         self.nRecv    = 0    # counter : items received
00247         self.sizeSent = 0    # measure : size of events sent ( tbuf.Length() )
00248         self.sizeRecv = 0    # measure : size of events in   ( tbuf.Length() )
00249         self.qinTime  = 0    # time    : receiving from self.qin
00250         self.qoutTime = 0    # time    : sending on qout
00251 
00252     def send( self, item ) :
00253         # This class manages the sending of a TBufferFile Event to a Queue
00254         # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
00255         assert item.__class__.__name__ == 'tuple'
00256         startTransmission = time.time()
00257         self.qout.put( item )
00258         # allow the background thread to feed the Queue; not 100% guaranteed to
00259         # finish before next line
00260         while self.qout._buffer : time.sleep( NAP )
00261         self.qoutTime += time.time()-startTransmission
00262         self.sizeSent += item[1].Length()
00263         self.nSent += 1
00264         return SUCCESS
00265 
00266     def receive( self, timeout=None ) :
00267         # Receive items from self.qin
00268         startWait = time.time()
00269         try :
00270             itemIn = self.qin.get( timeout=timeout )
00271         except Empty :
00272             return None
00273         self.qinTime += time.time()-startWait
00274         self.nRecv += 1
00275         if itemIn.__class__.__name__ == 'tuple' :
00276             self.sizeRecv += itemIn[1].Length()
00277         else :
00278             self.nRecv -= 1
00279         try :
00280             self.qin.task_done()
00281         except :
00282             self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00283                                     %(self._gmpc.nodeType))
00284         return itemIn
00285 
00286     def finalize( self ) :
00287         self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00288         # Reader sends one flag for each worker
00289         # Workers send one flag each
00290         # Writer sends nothing (it's at the end of the chain)
00291         if   self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00292         elif self._gmpc.nodeType == 'Writer' : downstream = 0
00293         elif self._gmpc.nodeType == 'Worker' : downstream = 1
00294         for i in xrange(downstream) :
00295             self.qout.put( 'FINISHED' )
00296         if self._gmpc.nodeType != 'Writer' :
00297             self.qout.join()
00298         # Now some reporting...
00299         self.statistics( )
00300 
00301     def statistics( self ) :
00302         self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00303         self.log.info ( 'Items Sent     : %i'%(self.nSent) )
00304         self.log.info ( 'Items Received : %i'%(self.nRecv) )
00305         self.log.info ( 'Data  Sent     : %i'%(self.sizeSent) )
00306         self.log.info ( 'Data  Received : %i'%(self.sizeRecv) )
00307         self.log.info ( 'Q-out Time     : %5.2f'%(self.qoutTime) )
00308         self.log.info ( 'Q-in  Time     : %5.2f'%(self.qinTime ) )
00309 
00310 # =============================================================================
00311 
00312 class TESSerializer( object ) :
00313     def __init__( self, gaudiTESSerializer, evtDataSvc,
00314                         nodeType, nodeID, log ) :
00315         self.T   = gaudiTESSerializer
00316         self.evt = evtDataSvc
00317         self.buffersIn  = []
00318         self.buffersOut = []
00319         self.nIn     = 0
00320         self.nOut    = 0
00321         self.tDump   = 0.0
00322         self.tLoad   = 0.0
00323         # logging
00324         self.nodeType = nodeType
00325         self.nodeID   = nodeID
00326         self.log      = log
00327     def Load( self, tbuf ) :
00328         root = gbl.DataObject()
00329         setOwnership( root, False )
00330         self.evt.setRoot( '/Event', root )
00331         t = time.time()
00332         self.T.loadBuffer( tbuf )
00333         self.tLoad   += (time.time() - t)
00334         self.nIn     += 1
00335         self.buffersIn.append( tbuf.Length() )
00336     def Dump( self ) :
00337         t = time.time()
00338         tb = TBufferFile( TBuffer.kWrite )
00339         self.T.dumpBuffer(tb)
00340         self.tDump += ( time.time()-t )
00341         self.nOut  += 1
00342         self.buffersOut.append( tb.Length() )
00343         return tb
00344     def Report( self ) :
00345         evIn       = "Events Loaded    : %i"%( self.nIn  )
00346         evOut      = "Events Dumped    : %i"%( self.nOut )
00347         din = sum( self.buffersIn )
00348         dataIn     = "Data Loaded      : %i"%(din)
00349         dataInMb   = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00350         if self.nIn :
00351             avgIn      = "Avg Buf Loaded   : %5.2f Mb"\
00352                           %( din/(self.nIn*MB) )
00353             maxIn      = "Max Buf Loaded   : %5.2f Mb"\
00354                           %( max(self.buffersIn)/MB )
00355         else :
00356             avgIn      = "Avg Buf Loaded   : N/A"
00357             maxIn      = "Max Buf Loaded   : N/A"
00358         dout = sum( self.buffersOut )
00359         dataOut    = "Data Dumped      : %i"%(dout)
00360         dataOutMb  = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00361         if self.nOut :
00362             avgOut     = "Avg Buf Dumped   : %5.2f Mb"\
00363                           %( din/(self.nOut*MB) )
00364             maxOut     = "Max Buf Dumped   : %5.2f Mb"\
00365                           %( max(self.buffersOut)/MB )
00366         else :
00367             avgOut     = "Avg Buf Dumped   : N/A"
00368             maxOut     = "Max Buf Dumped   : N/A"
00369         dumpTime   = "Total Dump Time  : %5.2f"%( self.tDump )
00370         loadTime   = "Total Load Time  : %5.2f"%( self.tLoad )
00371 
00372         lines =  evIn     ,\
00373                  evOut    ,\
00374                  dataIn   ,\
00375                  dataInMb ,\
00376                  avgIn    ,\
00377                  maxIn    ,\
00378                  dataOut  ,\
00379                  dataOutMb,\
00380                  avgOut   ,\
00381                  maxOut   ,\
00382                  dumpTime ,\
00383                  loadTime
00384         self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00385         for line in lines :
00386             self.log.info( line )
00387         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00388 
00389 # =============================================================================
00390 
00391 class GMPComponent( object ) :
00392     # This class will be the template for Reader, Worker and Writer
00393     # containing all common components
00394     # nodeId will be a numerical identifier for the node
00395     # -1 for reader
00396     # -2 for writer
00397     # 0,...,nWorkers-1 for the Workers
00398     def __init__( self, nodeType, nodeID, queues, events, params  ) :
00399         # declare a Gaudi MultiProcessing Node
00400         # the nodeType is going to be one of Reader, Worker, Writer
00401         # qPair is going to be a tuple of ( qin, qout )
00402         # for sending and receiving
00403         # if nodeType is "Writer", it will be a list of qPairs,
00404         # as there's one queue-in from each Worker
00405         #
00406         # params is a tuple of (nWorkers, config, log)
00407 
00408         self.nodeType = nodeType
00409         current_process().name = nodeType
00410 
00411         # Synchronisation Event() objects for keeping track of the system
00412         self.initEvent, eventLoopSyncer, self.finalEvent = events
00413         self.eventLoopSyncer, self.lastEvent = eventLoopSyncer   # unpack tuple
00414 
00415         # necessary for knowledge of the system
00416         self.nWorkers, self.sEvent, self.config, self.log = params
00417         self.nodeID   = nodeID
00418 
00419         # describe the state of the node by the current Event Number
00420         self.currentEvent = None
00421 
00422         # Unpack the Queues : (events, histos, filerecords)
00423         qPair, histq, fq = queues
00424 
00425         # Set up the Queue Mechanisms ( Event Communicators )
00426         if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00427             # Reader or Worker Node
00428             qin, qout = qPair
00429             self.evcom = EventCommunicator( self, qin, qout )
00430         else :
00431             # Writer : many queues in, no queue out
00432             assert self.nodeType == 'Writer'
00433             self.evcoms = []
00434             qsin = qPair[0]
00435             for q in qsin :
00436                 ec = EventCommunicator( self, q, None )
00437                 self.evcoms.append( ec )
00438         # Histogram Queue
00439         self.hq = histq
00440         # FileRecords Queue
00441         self.fq = fq
00442 
00443         # Universal Counters (available to all nodes)
00444         # Use sensibly!!!
00445         self.nIn  = 0
00446         self.nOut = 0
00447 
00448         # Status Flag (possibly remove later)
00449         self.stat = SUCCESS
00450 
00451         # Set logger name
00452         self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00453 
00454         # Heuristic variables
00455         # time for init, run, final, firstEventTime, totalTime
00456         self.iTime       = 0.0
00457         self.rTime       = 0.0
00458         self.fTime       = 0.0
00459         self.firstEvTime = 0.0
00460         self.tTime       = 0.0
00461 
00462         # define the separate process
00463         self.proc = Process( target=self.Engine )
00464         
00465         self.num = 0    
00466 
00467         ks = self.config.keys()
00468         self.app = None
00469         list = ["Brunel", "DaVinci", "Boole", "Gauss"]
00470         for k in list:
00471            if k in ks: self.app = k
00472 
00473     def Start( self ) :
00474         # Fork and start the separate process
00475         self.proc.start()
00476 
00477     def Engine( self ) :
00478         # This will be the method carried out by the Node
00479         # Different for all
00480         pass
00481 
00482     def processConfiguration( self ) :
00483         # Different for all ; customize Configuration for multicore
00484         pass
00485 
00486     def SetupGaudiPython( self ) :
00487         # This method will initialize the GaudiPython Tools
00488         # such as the AppMgr and so on
00489         self.a   = AppMgr()
00490         if SMAPS :
00491             from AlgSmapShot import SmapShot
00492             smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00493             ss = SmapShot( logname=smapsLog )
00494             self.a.addAlgorithm( ss )
00495         self.evt = self.a.evtsvc()
00496         self.hvt = self.a.histsvc()
00497         self.fsr = self.a.filerecordsvc()
00498         self.inc = self.a.service('IncidentSvc','IIncidentSvc')
00499         self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00500         self.ts  = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
00501         self.TS  = TESSerializer( self.ts, self.evt,
00502                                   self.nodeType, self.nodeID, self.log )
00503         return SUCCESS
00504 
00505     def StartGaudiPython( self ) :
00506         self.a.initialize()
00507         self.a.start()
00508         return SUCCESS
00509 
00510     def LoadTES( self, tbufferfile ) :
00511         root = gbl.DataObject()
00512         setOwnership(root, False)
00513         self.evt.setRoot( '/Event', root )
00514         self.ts.loadBuffer(tbufferfile)
00515 
00516     def getEventNumber( self ) :
00517       if self.app != 'Gauss':
00518         # Using getList or getHistoNames can result in the EventSelector
00519         # re-initialising connection to RootDBase, which costs a lot of
00520         # time... try to build a set of Header paths??
00521 
00522         # First Attempt : Unpacked Event Data
00523         lst = [ '/Event/Gen/Header',
00524                 '/Event/Rec/Header' ]
00525         for l in lst :
00526             path = l
00527             try :
00528                 n = self.evt[path].evtNumber()
00529                 return n
00530             except :
00531                 # No evt number at this path
00532                 continue
00533 
00534         # second attepmt : try DAQ/RawEvent data
00535         # The Evt Number is in bank type 16, bank 0, data pt 4
00536         try :
00537             n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00538             return n
00539         except :
00540             pass
00541 
00542         # Default Action
00543         if self.nIn > 0 or self.nOut > 0 :
00544             pass
00545         else :
00546             self.log.warning('Could not determine Event Number')
00547         return -1
00548       else:
00549            if self.nodeID == -1:
00550              self.num = self.num + 1
00551              self.log.info( 'Evt Number %d' % self.num )
00552              return self.num
00553 
00554     def IdentifyWriters( self ) :
00555         #
00556         # Identify Writers in the Configuration
00557         #
00558         d = {}
00559         keys = [ "events", "records", "tuples", "histos" ]
00560         for k in keys :
00561             d[k] = []
00562 
00563         # Identify Writers and Classify
00564         wkeys = WRITERTYPES.keys()
00565         for v in self.config.values() :
00566             if v.__class__.__name__ in wkeys :
00567                 writerType = WRITERTYPES[ v.__class__.__name__ ]
00568                 d[writerType].append( MiniWriter(v, writerType, self.config) )
00569                 if self.nodeID == 0 :
00570                     self.log.info('Writer Found : %s'%(v.name()))
00571 
00572         # Now Check for the Histogram Service
00573         if 'HistogramPersistencySvc' in  self.config.keys() :
00574             hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00575             d[ "histos" ].append( hfile )
00576         return d
00577 
00578     def dumpHistograms( self ) :
00579         '''
00580         Method used by the GaudiPython algorithm CollectHistos
00581         to obtain a dictionary of form { path : object }
00582         representing the Histogram Store
00583         '''
00584         nlist = self.hvt.getHistoNames( )
00585         histDict = {}
00586         objects = 0 ; histos = 0
00587         if nlist :
00588             for n in nlist :
00589                 o = self.hvt[ n ]
00590                 if type(o) in aidatypes :
00591                     o = aida2root(o)
00592                     histos  += 1
00593                 else :
00594                     objects += 1
00595                 histDict[ n ] = o
00596         else :
00597             print 'WARNING : no histograms to recover?'
00598         return histDict
00599 
00600     def Initialize( self ) :
00601         start = time.time()
00602         self.processConfiguration( )
00603         self.SetupGaudiPython( )
00604         self.initEvent.set()
00605         self.StartGaudiPython( )
00606 
00607         if self.app == 'Gauss':
00608             
00609             tool = self.a.tool( "ToolSvc.EvtCounter" )
00610             self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
00611             self.log.info("interface %s" %self.cntr)
00612         else:
00613             self.cntr = None
00614 
00615         self.iTime = time.time() - start
00616 
00617     def Finalize( self ) :
00618         start = time.time()
00619         self.a.stop()
00620         self.a.finalize()
00621         self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00622         self.finalEvent.set()
00623         self.fTime = time.time() - start
00624 
00625     def Report( self ) :
00626         self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00627         allTime  = "Alive Time     : %5.2f"%(self.tTime)
00628         initTime = "Init Time      : %5.2f"%(self.iTime)
00629         frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00630         runTime  = "Run Time       : %5.2f"%(self.rTime)
00631         finTime  = "Finalise Time  : %5.2f"%(self.fTime)
00632         tup = ( allTime, initTime, frstTime, runTime, finTime )
00633         for t in tup :
00634             self.log.info( t )
00635         self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00636         # and report from the TESSerializer
00637         self.TS.Report()
00638 
00639 # =============================================================================
00640 
00641 class Reader( GMPComponent )  :
00642     def __init__( self, queues, events, params ) :
00643         GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00644 
00645     def processConfiguration( self ) :
00646         # Reader :
00647         #   No algorithms
00648         #   No output
00649         #   No histos
00650         self.config[ 'ApplicationMgr' ].TopAlg    = []
00651         self.config[ 'ApplicationMgr' ].OutStream = []
00652         if "HistogramPersistencySvc" in self.config.keys() :
00653             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00654         self.config['MessageSvc'].Format    = '[Reader]% F%18W%S%7W%R%T %0W%M'
00655         self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax    
00656 
00657     def DumpEvent( self ) :
00658         tb = TBufferFile( TBuffer.kWrite )
00659         # print '----Reader dumping Buffer!!!'
00660         self.ts.dumpBuffer( tb )
00661         # print '\tBuffer Dumped, size : %i'%( tb.Length() )
00662         return tb
00663 
00664     def DoFirstEvent( self ) :
00665         # Do First Event ------------------------------------------------------
00666         # Check Termination Criteria
00667         startFirst = time.time()
00668         self.log.info('Reader : First Event')
00669         if self.nOut == self.evtMax :
00670             self.log.info('evtMax( %i ) reached'%(self.evtMax))
00671             self.lastEvent.set()
00672             return SUCCESS
00673         else :
00674             # Continue to read, dump and send event
00675             self.a.run(1)
00676             if not bool(self.evt['/Event']) :
00677                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00678                 self.lastEvent.set()
00679                 return SUCCESS
00680             else :
00681                 # Popluate TESSerializer list and send Event
00682                 if self.app == "Gauss":
00683                    lst = self.evt.getHistoNames()
00684                 else:
00685                   try :
00686                     lst = self.evt.getList()
00687                     if self.app == "DaVinci":
00688                       daqnode = self.evt.retrieveObject( '/Event/DAQ' ).registry()
00689                       setOwnership( daqnode, False )
00690                       self.evt.getList( daqnode, lst, daqnode.address().par() )
00691                   except :
00692                     self.log.critical('Reader could not acquire TES List!')
00693                     self.lastEvent.set()
00694                     return FAILURE
00695                 self.log.info('Reader : TES List : %i items'%(len(lst)))
00696                 for l in lst :
00697                     self.ts.addItem(l)
00698                 self.currentEvent = self.getEventNumber( )
00699                 tb = self.TS.Dump( )
00700                 self.log.info('First Event Sent')
00701                 self.evcom.send( (self.currentEvent, tb) )
00702                 self.nOut += 1
00703                 self.eventLoopSyncer.set()
00704                 self.evt.clearStore( )
00705                 self.firstEvTime = time.time()-startFirst
00706                 return SUCCESS
00707 
00708     def Engine( self ) :
00709 
00710         startEngine = time.time()
00711         self.log.name = 'Reader'
00712         self.log.info('Reader Process starting')
00713 
00714         self.Initialize()
00715 
00716         # add the Histogram Collection Algorithm
00717         self.a.addAlgorithm( CollectHistograms(self) )
00718 
00719         self.log.info('Reader Beginning Distribution')
00720         sc = self.DoFirstEvent( )
00721         if sc.isSuccess() :
00722             self.log.info('Reader First Event OK')
00723         else :
00724             self.log.critical('Reader Failed on First Event')
00725             self.stat = FAILURE
00726 
00727         # Do All Others -------------------------------------------------------
00728         while True :
00729             # Check Termination Criteria
00730             if self.nOut == self.evtMax :
00731                 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00732                 break
00733             # Check Health
00734             if not self.stat.isSuccess() :
00735                 self.log.critical( 'Reader is Damaged!' )
00736                 break
00737             # Continue to read, dump and send event
00738             t = time.time()
00739             self.a.run(1)
00740             self.rTime += (time.time()-t)
00741             if not bool(self.evt['/Event']) :
00742                 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00743                 break
00744             self.currentEvent = self.getEventNumber( )
00745             tb = self.TS.Dump( )
00746             self.evcom.send( (self.currentEvent, tb) )
00747             # clean up
00748             self.nOut += 1
00749             self.eventLoopSyncer.set()
00750             self.evt.clearStore( )
00751         self.log.info('Setting <Last> Event')
00752         self.lastEvent.set()
00753 
00754         # Finalize
00755         self.log.info( 'Reader : Event Distribution complete.' )
00756         self.evcom.finalize()
00757         self.Finalize()
00758         self.tTime = time.time() - startEngine
00759         self.Report()
00760 
00761 # =============================================================================
00762 
00763 class Worker( GMPComponent ) :
00764     def __init__( self, workerID, queues, events, params ) :
00765         GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00766         # Identify the writer streams
00767         self.writerDict = self.IdentifyWriters( )
00768         # Identify the accept/veto checks for each event
00769         self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00770         self.log.debug("Worker-%i Created OK"%(self.nodeID))
00771         self.eventOutput = True
00772 
00773     def processConfiguration( self ) :
00774         # Worker :
00775         #   No input
00776         #   No output
00777         #   No Histos
00778         self.config[ 'EventSelector'  ].Input     = []
00779         self.config[ 'ApplicationMgr' ].OutStream = []
00780         if "HistogramPersistencySvc" in self.config.keys() :
00781             self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00782         formatHead = '[Worker-%i] '%(self.nodeID)
00783         self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00784 
00785         for key, lst in self.writerDict.iteritems() :
00786             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00787 
00788         for m in self.writerDict[ "tuples" ] :
00789             # rename Tuple output file with an appendix
00790             # based on worker id, for merging later
00791             newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00792             self.config[ m.key ].Output = newName
00793 
00794         # Suppress INFO Output for all but Worker-0
00795         #if self.nodeID == 0 :
00796         #    pass
00797         #else                :
00798         #    self.config[ 'MessageSvc' ].OutputLevel = ERROR
00799         
00800         try:
00801             if "ToolSvc.EvtCounter" not in self.config:
00802                 from Configurables import EvtCounter
00803                 counter = EvtCounter()
00804             else:
00805                 counter = self.config["ToolSvc.EvtCounter"]
00806             counter.UseIncident = False
00807         except:
00808             # ignore errors when trying to change the configuration of the EvtCounter
00809             self.log.warning('Cannot configure EvtCounter')
00810 
00811     def Engine( self ) :
00812         startEngine = time.time()
00813         self.log.name = "Worker-%i"%(self.nodeID)
00814         self.log.info("Worker %i starting Engine"%(self.nodeID))
00815         self.Initialize()
00816         self.filerecordsAgent = FileRecordsAgent(self)
00817 
00818         # populate the TESSerializer itemlist
00819         self.log.info('EVT WRITERS ON WORKER : %i'\
00820                        %( len(self.writerDict['events'])))
00821 
00822         nEventWriters = len( self.writerDict[ "events" ] )
00823         if nEventWriters :
00824             itemList = set()
00825             optItemList = set()
00826             for m in self.writerDict[ "events" ] :
00827                 for item in m.ItemList :
00828                     hsh = item.find( '#' )
00829                     if hsh != -1:
00830                       item = item[:hsh]
00831                     itemList.add( item )
00832                 for item in m.OptItemList :
00833                     hsh = item.find( '#' )
00834                     if hsh != -1:
00835                       item = item[:hsh]
00836                     optItemList.add( item )
00837             # If an item is mandatory and optional, keep it only in the optional list
00838             itemList -= optItemList
00839             for item in sorted( itemList ):
00840                 self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
00841                 self.ts.addItem( item )
00842             for item in sorted( optItemList ):
00843                 self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
00844                 self.ts.addOptItem( item )
00845         else :
00846             self.log.info( 'There is no Event Output for this app' )
00847             self.eventOutput = False
00848 
00849         # add the Histogram Collection Algorithm
00850         self.a.addAlgorithm( CollectHistograms(self) )
00851         
00852         # Begin processing
00853         self.log.name = "Worker-%i"%(self.nodeID)
00854         Go = True
00855         while Go :
00856             packet = self.evcom.receive( )
00857             if packet : pass
00858             else      : continue
00859             if packet == 'FINISHED' : break
00860             evtNumber, tbin = packet    # unpack
00861             if self.cntr != None: 
00862                 
00863                 self.cntr.setEventCounter( evtNumber )
00864             
00865             self.nIn += 1
00866             self.TS.Load( tbin )
00867             # print 'Worker-%i : Event %i'%(self.nodeID, evtNumber)
00868             t = time.time()
00869             sc = self.a.executeEvent()
00870             if self.nIn == 1 :
00871                 self.firstEvTime = time.time()-t
00872             else :
00873                 self.rTime += (time.time()-t)
00874             if sc.isSuccess() :
00875                 pass
00876             else :
00877                 self.log.warning('Did not Execute Event')
00878                 self.evt.clearStore()
00879                 continue
00880             if self.isEventPassed() :
00881                 pass
00882             else :
00883                 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00884                 self.evt.clearStore()
00885                 continue
00886             if self.eventOutput :
00887                 # It may be the case of generating Event Tags; hence
00888                 #   no event output
00889                 self.currentEvent = self.getEventNumber( )
00890                 tb = self.TS.Dump( )
00891                 self.evcom.send( (self.currentEvent, tb) )
00892                 self.nOut += 1
00893             self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
00894             self.eventLoopSyncer.set()
00895             self.evt.clearStore( )
00896         self.log.info('Setting <Last> Event')
00897         self.lastEvent.set()
00898 
00899         self.evcom.finalize()
00900         self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00901         # Now send the FileRecords and stop/finalize the appMgr
00902         self.filerecordsAgent.SendFileRecords()
00903         self.Finalize()
00904         self.tTime = time.time()-startEngine
00905         self.Report()
00906 
00907     def getCheckAlgs( self ) :
00908         '''
00909         For some output writers, a check is performed to see if the event has
00910         executed certain algorithms.
00911         These reside in the AcceptAlgs property for those writers
00912         '''
00913         acc = []
00914         req = []
00915         vet = []
00916         for m in self.writerDict[ "events" ] :
00917             if hasattr(m.w, 'AcceptAlgs')  : acc += m.w.AcceptAlgs
00918             if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00919             if hasattr(m.w, 'VetoAlgs')    : vet += m.w.VetoAlgs
00920         return (acc, req, vet)
00921 
00922 
00923     def checkExecutedPassed( self, algName ) :
00924         if  self.a.algorithm( algName )._ialg.isExecuted()\
00925         and self.a.algorithm( algName )._ialg.filterPassed() :
00926             return True
00927         else :
00928             return False
00929 
00930     def isEventPassed( self ) :
00931         '''
00932         Check the algorithm status for an event.
00933         Depending on output writer settings, the event
00934           may be declined based on various criteria.
00935         This is a transcript of the check that occurs in GaudiSvc::OutputStream
00936         '''
00937         passed = False
00938 
00939         self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00940         if self.acceptAlgs :
00941             for name in self.acceptAlgs :
00942                 if self.checkExecutedPassed( name ) :
00943                     passed = True
00944                     break
00945         else :
00946             passed = True
00947 
00948         self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00949         for name in self.requireAlgs :
00950             if self.checkExecutedPassed( name ) :
00951                 pass
00952             else :
00953                 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00954                 passed = False
00955 
00956         self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00957         for name in self.vetoAlgs :
00958             if self.checkExecutedPassed( name ) :
00959                 pass
00960             else :
00961                 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00962                 passed = False
00963         return passed
00964 
00965 # =============================================================================
00966 
00967 class Writer( GMPComponent ) :
00968     def __init__( self, queues, events, params ) :
00969         GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00970         # Identify the writer streams
00971         self.writerDict = self.IdentifyWriters( )
00972         # This keeps track of workers as they finish
00973         self.status = [False]*self.nWorkers
00974         self.log.name = "Writer--2"
00975 
00976     def processConfiguration( self ) :
00977         # Writer :
00978         #   No input
00979         #   No Algs
00980         self.config[ 'ApplicationMgr' ].TopAlg = []
00981         self.config[ 'EventSelector'  ].Input  = []
00982 
00983         self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00984 
00985         # Now process the output writers
00986         for key, lst in self.writerDict.iteritems() :
00987             self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00988 
00989         # Modify the name of the output file to reflect that it came
00990         #  from a parallel processing
00991         #
00992         # Event Writers
00993         for m in self.writerDict[ "events" ] :
00994             self.log.debug( 'Processing Event Writer : %s'%(m) )
00995             newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00996             self.config[ m.key ].Output = newName
00997 
00998         # Now, if there are no event writers, the FileRecords file
00999         #   will fail to open, as it only opens an UPDATE version
01000         #   of the existing Event Output File
01001         # So, if there are no event writers, edit the string of the
01002         #   FileRecord Writer
01003 
01004         # FileRecords Writers
01005         for m in self.writerDict[ "records" ] :
01006             self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
01007             newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
01008                                        extra=" OPT='RECREATE'" )
01009             self.config[ m.key ].Output = newName
01010 
01011         # same for histos
01012         hs = "HistogramPersistencySvc"
01013         n = None
01014         if hs in self.config.keys() :
01015             n = self.config[ hs ].OutputFile
01016         if n :
01017             newName=self.config[hs].OutputFile.replace('.',\
01018                                                        '.p%i.'%(self.nWorkers))
01019             self.config[ hs ].OutputFile = newName
01020     
01021     def Engine( self ) :
01022         startEngine = time.time()
01023         self.Initialize()
01024         self.histoAgent = HistoAgent( self )
01025         self.filerecordsAgent = FileRecordsAgent( self )
01026 
01027         # Begin processing
01028         Go      = True
01029         current = -1
01030         stopCriteria = self.nWorkers
01031         while Go :
01032             current = (current+1)%self.nWorkers
01033             packet = self.evcoms[current].receive( timeout=0.01 )
01034             if packet == None :
01035                 continue
01036             if packet == 'FINISHED' :
01037                 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
01038                 self.status[current] = True
01039                 if all(self.status) :
01040                     self.log.info('FINISHED recd from all workers, break loop')
01041                     break
01042                 continue
01043             # otherwise, continue as normal
01044             self.nIn += 1    # update central count (maybe needed by FSR store)
01045             evtNumber, tbin = packet    # unpack
01046             self.TS.Load( tbin )
01047             t  = time.time()
01048             self.a.executeEvent()
01049             self.rTime += ( time.time()-t )
01050             self.currentEvent = self.getEventNumber( )
01051             self.evt.clearStore( )
01052             self.eventLoopSyncer.set()
01053         self.log.name = "Writer--2"
01054         self.log.info('Setting <Last> Event')
01055         self.lastEvent.set()
01056 
01057         # finalisation steps
01058         [ e.finalize() for e in self.evcoms ]
01059         # Now do Histograms
01060         sc = self.histoAgent.Receive()
01061         sc = self.histoAgent.RebuildHistoStore()
01062         if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01063         else              : self.log.warning( 'Histo Store Error in Rebuild' )
01064 
01065         # Now do FileRecords
01066         sc = self.filerecordsAgent.Receive()
01067         self.filerecordsAgent.Rebuild()
01068         self.Finalize()
01069         #self.rTime = time.time()-startEngine
01070         self.Report()
01071 
01072 # =============================================================================
01073 
01074 
01075 
01076 # =============================================================================
01077 
01078 class Coord( object ) :
01079     def __init__( self, nWorkers, config, log ) :
01080 
01081         self.log = log
01082         self.config = config
01083         # set up Logging
01084         self.log.name = 'GaudiPython-Parallel-Logger'
01085         self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01086 
01087         if nWorkers == -1 :
01088             # The user has requested all available cpus in the machine
01089             self.nWorkers = cpu_count()
01090         else :
01091             self.nWorkers = nWorkers
01092 
01093 
01094         self.qs = self.SetupQueues( )    # a dictionary of queues (for Events)
01095         self.hq = JoinableQueue( )       # for Histogram data
01096         self.fq = JoinableQueue( )       # for FileRecords data
01097 
01098         # Make a Syncer for Initalise, Run, and Finalise
01099         self.sInit = Syncer( self.nWorkers, self.log,
01100                              limit=WAIT_INITIALISE,
01101                              step=STEP_INITIALISE    )
01102         self.sRun  = Syncer( self.nWorkers, self.log,
01103                              manyEvents=True,
01104                              limit=WAIT_SINGLE_EVENT,
01105                              step=STEP_EVENT,
01106                              firstEvent=WAIT_FIRST_EVENT )
01107         self.sFin  = Syncer( self.nWorkers, self.log,
01108                              limit=WAIT_FINALISE,
01109                              step=STEP_FINALISE )
01110         # and one final one for Histogram Transfer
01111         self.histSyncEvent = Event()
01112 
01113         # params are common to al subprocesses
01114         params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01115 
01116         # Declare SubProcesses!
01117         self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01118         self.workers = []
01119         for i in xrange( self.nWorkers ) :
01120             wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01121             self.workers.append( wk )
01122         self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01123 
01124         self.system = []
01125         self.system.append(self.writer)
01126         [ self.system.append(w) for w in self.workers ]
01127         self.system.append(self.reader)
01128 
01129     def getSyncEvents( self, nodeID ) :
01130         init = self.sInit.d[nodeID].event
01131         run  = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01132         fin  = self.sFin.d[nodeID].event
01133         return ( init, run, fin )
01134 
01135     def getQueues( self, nodeID ) :
01136         eventQ = self.qs[ nodeID ]
01137         histQ  = self.hq
01138         fsrQ   = self.fq
01139         return ( eventQ, histQ, fsrQ )
01140 
01141     def Go( self ) :
01142 
01143         # Initialise
01144         self.log.name = 'GaudiPython-Parallel-Logger'
01145         self.log.info( 'INITIALISING SYSTEM' )
01146         for p in self.system :
01147             p.Start()
01148         sc = self.sInit.syncAll(step="Initialise")
01149         if sc == SUCCESS: pass
01150         else  : self.Terminate() ; return FAILURE
01151 
01152         # Run
01153         self.log.name = 'GaudiPython-Parallel-Logger'
01154         self.log.info( 'RUNNING SYSTEM' )
01155         sc = self.sRun.syncAll(step="Run")
01156         if sc == SUCCESS: pass
01157         else  : self.Terminate() ; return FAILURE
01158 
01159         # Finalise
01160         self.log.name = 'GaudiPython-Parallel-Logger'
01161         self.log.info( 'FINALISING SYSTEM' )
01162         sc = self.sFin.syncAll(step="Finalise")
01163         if sc == SUCCESS: pass
01164         else  : self.Terminate() ; return FAILURE
01165 
01166         # if we've got this far, finally report SUCCESS
01167         self.log.info( "Cleanly join all Processes" )
01168         self.Stop()
01169         self.log.info( "Report Total Success to Main.py" )
01170         return SUCCESS
01171 
01172     def Terminate( self ) :
01173         # Brutally kill sub-processes
01174         children = multiprocessing.active_children()
01175         for i in children:
01176             i.terminate()
01177 
01178         #self.writer.proc.terminate()
01179         #[ w.proc.terminate() for w in self.workers]
01180         #self.reader.proc.terminate()
01181 
01182     def Stop( self ) :
01183         # procs should be joined in reverse order to launch
01184         self.system.reverse()
01185         for s in self.system :
01186             s.proc.join()
01187         return SUCCESS
01188 
01189     def SetupQueues( self ) :
01190         # This method will set up the network of Queues needed
01191         # N Queues = nWorkers + 1
01192         # Each Worker has a Queue in, and a Queue out
01193         # Reader has Queue out only
01194         # Writer has nWorkers Queues in
01195 
01196         # one queue from Reader-Workers
01197         rwk = JoinableQueue()
01198         # one queue from each worker to writer
01199         workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01200         d = {}
01201         d[-1] = (None, rwk)              # Reader
01202         d[-2] = (workersWriter, None)    # Writer
01203         for i in xrange(self.nWorkers) : d[i] =  (rwk, workersWriter[i])
01204         return d
01205 
01206 # ============================= EOF ===========================================
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Mon Sep 17 2012 13:49:35 for Gaudi Framework, version v23r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004