Gaudi Framework, version v21r9

Home   Generated: 3 May 2010

Parallel.py

Go to the documentation of this file.
00001 # File: GaudiPython/Parallel.py
00002 # Author: Pere Mato (pere.mato@cern.ch)
00003 
00004 """ GaudiPython.Parallel module.
00005     This module provides 'parallel' processing support for GaudiPyhton.
00006     It is adding some sugar on top of public domain packages such as
00007     the 'processing' or the 'pp' packages. The interface can be made
00008     independent of the underlying implementation package.
00009     Two main class are defined: Task and WorkManager
00010 """
00011 
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014 
00015 import sys, os, time, copy
00016 
00017 # == Eoin's adds for Parallel Mode ====
00018 from processing import Process, Queue, Pool, currentProcess
00019 from ROOT import TBufferFile, TBuffer
00020 from Gaudi.Configuration import appendPostConfigAction, Configurable, INFO, ERROR, VERBOSE
00021 from GaudiPython import AppMgr, gbl, setOwnership, SUCCESS, PyAlgorithm
00022 # from ParallelStats import recordReaderStats, recordWorkerStats, recordWriterStats
00023 # =====================================
00024 
00025 def _prefunction( f, task, item) :
00026     return f((task,item))
00027 def _ppfunction( args ) :
00028     #--- Unpack arguments
00029     task, item = args
00030     stat = Statistics()
00031     #--- Initialize the remote side (at least once)
00032     if not task.__class__._initializeDone :
00033         for k,v in task.environ.items() :
00034             if k not in excluded_varnames : os.environ[k] = v
00035         task.initializeRemote()
00036         task.__class__._initializeDone = True
00037     #--- Reset the task output
00038     task._resetOutput()
00039     #--- Call processing
00040     task.process(item)
00041     #--- Collect statistics
00042     stat.stop()
00043     return (copy.deepcopy(task.output), stat)
00044 
00045 def _detect_ncpus():
00046     """Detects the number of effective CPUs in the system"""
00047     #for Linux, Unix and MacOS
00048     if hasattr(os, "sysconf"):
00049         if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
00050             #Linux and Unix
00051             ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
00052             if isinstance(ncpus, int) and ncpus > 0:
00053                 return ncpus
00054             else:
00055                 #MacOS X
00056                 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
00057     #for Windows
00058     if os.environ.has_key("NUMBER_OF_PROCESSORS"):
00059         ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
00060         if ncpus > 0:
00061             return ncpus
00062     #return the default value
00063     return 1
00064 
00065 class Statistics(object):
00066     def __init__(self):
00067         self.name  = os.getenv('HOSTNAME')
00068         self.start = time.time()
00069         self.time  = 0.0
00070         self.njob  = 0
00071     def stop(self):
00072         self.time = time.time() - self.start
00073 
00074 class Task(object) :
00075     """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00076         User class much inherit from it and implement the methods initializeLocal,
00077         initializeRemote, process and finalize.   """
00078     _initializeDone = False
00079     def __new__ ( cls, *args, **kwargs ):
00080         task = object.__new__( cls, *args, **kwargs )
00081         task.output = ()
00082         task.environ = {}
00083         for k,v in os.environ.items(): task.environ[k] = v
00084         task.cwd = os.getcwd()
00085         return task
00086     def initializeLocal(self):
00087         pass
00088     def initializeRemote(self):
00089         pass
00090     def process(self, item):
00091         pass
00092     def finalize(self) :
00093         pass
00094     def _mergeResults(self, result) :
00095         if type(result) is not type(self.output) :
00096             raise TypeError("output type is not same as obtained result")
00097         #--No iteratable---
00098         if not hasattr( result , '__iter__' ):
00099             if hasattr(self.output,'Add') : self.output.Add(result)
00100             elif hasattr(self.output,'__iadd__') : self.output += result
00101             elif hasattr(self.output,'__add__') : self.output = self.output + result
00102             else : raise TypeError('result cannot be added')
00103         #--Dictionary---
00104         elif type(result) is dict :
00105             if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00106             else: minkeys = result.keys()
00107             for key in result.keys() :
00108                 if key in self.output :
00109                     if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00110                     elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00111                     elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00112                     else : raise TypeError('result cannot be added')
00113                 else :
00114                     self.output[key] = result[key]
00115         #--Anything else (list)
00116         else :
00117             for i in range( min( len(self.output) , len(result)) ):
00118                 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00119                 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00120                 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00121                 else : raise TypeError('result cannot be added')
00122     def _resetOutput(self):
00123         output =  (type(self.output) is dict) and self.output.values() or self.output
00124         for o in output :
00125             if hasattr(o, 'Reset'): o.Reset()
00126 
00127 
00128 class WorkManager(object) :
00129     """ Class to in charge of managing the tasks and distributing them to
00130         the workers. They can be local (using other cores) or remote
00131         using other nodes in the local cluster """
00132 
00133     def __init__( self, ncpus='autodetect', ppservers=None) :
00134         if ncpus == 'autodetect' : self.ncpus = _detect_ncpus()
00135         else :                     self.ncpus = ncpus
00136         if ppservers :
00137             import pp
00138             self.ppservers = ppservers
00139             self.sessions = [ SshSession(srv) for srv in ppservers ]
00140             self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00141             self.mode = 'cluster'
00142         else :
00143             import processing
00144             self.pool = processing.Pool(self.ncpus)
00145             self.mode = 'multicore'
00146         self.stats = {}
00147 
00148     def __del__(self):
00149         if hasattr(self,'server') : self.server.destroy()
00150 
00151     def process(self, task, items, timeout=90000):
00152         if not isinstance(task,Task) :
00153             raise TypeError("task argument needs to be an 'Task' instance")
00154         # --- Call the Local initialialization
00155         task.initializeLocal()
00156         # --- Schedule all the jobs ....
00157         if self.mode == 'cluster' :
00158             jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items]
00159             for job in jobs :
00160                 result, stat = job()
00161                 task._mergeResults(result)
00162                 self._mergeStatistics(stat)
00163             self._printStatistics()
00164             self.server.print_stats()
00165         elif self.mode == 'multicore' :
00166             start = time.time()
00167             jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00168             for result, stat in  jobs.get(timeout) :
00169                 task._mergeResults(result)
00170                 self._mergeStatistics(stat)
00171             end = time.time()
00172             self._printStatistics()
00173             print 'Time elapsed since server creation %f' %(end-start)
00174         # --- Call the Local Finalize
00175         task.finalize()
00176     def _printStatistics(self):
00177         njobs = 0
00178         for stat in self.stats.values():
00179             njobs += stat.njob
00180         print 'Job execution statistics:'
00181         print 'job count | % of all jobs | job time sum | time per job | job server'
00182         for name, stat  in self.stats.items():
00183             print '       %d |        %6.2f |     %8.3f |    %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00184 
00185     def _mergeStatistics(self, stat):
00186         if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00187         s = self.stats[stat.name]
00188         s.time += stat.time
00189         s.njob += 1
00190 
00191 
00192 class SshSession(object) :
00193     def __init__(self, hostname):
00194         import pyssh
00195         import pp
00196         self.host = hostname
00197         ppprefix =  os.path.dirname(os.path.dirname(pp.__file__))
00198         self.session = pyssh.Ssh(host=hostname)
00199         self.session.open()
00200         self.session.read_lazy()
00201         self.session.write('cd %s\n' % os.getcwd())
00202         self.session.read_lazy()
00203         self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00204         self.session.read_lazy()
00205         self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00206         self.session.read_lazy()
00207         self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00208         self.session.read_lazy()
00209         self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00210         self.session.read_lazy()
00211         self.session.read_lazy()
00212         print 'started ppserver in ', hostname
00213     def __del__(self):
00214         self.session.close()
00215         print 'killed ppserver in ', self.host
00216 
00217 # ==== Extra stuff for the Event-Parallel Model =======================
00218 
00219 # ===========================================================================================
00220 # Miscellaneous Tools
00221 # ===========================================================================================
00222 
00223 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00224 aidatypes = ( gbl.AIDA.IHistogram,
00225               gbl.AIDA.IHistogram1D,
00226               gbl.AIDA.IHistogram2D,
00227               gbl.AIDA.IHistogram3D,
00228               gbl.AIDA.IProfile1D,
00229               gbl.AIDA.IProfile2D )
00230 thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00231 gppHead   = '[ GaudiPythonParallel ] '
00232 line      = '-'*80
00233 
00234 def setupSystem(nWorkers, config) :
00235     qLimit = 50
00236     inq         = Queue( )                                        # the Queue from Reader->(Workers)
00237     outQs       = [ Queue( qLimit ) for i in xrange(nWorkers) ]   # One outQueue for each Worker->Writer
00238 
00239     commonQueue = Queue( )                                        # Common Queue from setupSystem->(everyone) (for signals)
00240     qToParent   = Queue( )                                        # Common Queue from (everyone)->setupSystem (for signals)
00241 
00242     cStatQueue  = Queue( )                                        # A common queue, shared for (Workers)->Writer
00243     rHistQ      = Queue( )                                        # A special Queue from Reader->Writer (bypassing workers)
00244 
00245     # keys of the config dictionary
00246     ks = config.keys()
00247     ks.sort()
00248 
00249     # determine application
00250     apps = ['Gauss', 'Boole', 'Brunel', 'DaVinci']
00251     _app = None
00252     for app in apps :
00253         if app in ks : _app = app
00254 
00255     # determine output streams
00256     outs = [ 'Output', 'OutputFile', 'OutStream' ]
00257     outlist = []
00258     for k in ks :
00259         for o in outs :
00260             if hasattr( config[k], o ) :  outlist.append(k)
00261 
00262     itmlst = getOutputList( config )
00263 
00264     # create Reader/Worker/Writer classes
00265     w   =  Writer(        commonQueue, outQs,    cStatQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app )
00266     wks = [Worker(i, inq, commonQueue, outQs[i], cStatQueue,         qToParent, nWorkers, config, qLimit, _app, itemlist=itmlst) for i in xrange(nWorkers)]
00267     r   =  Reader(   inq, commonQueue,                       rHistQ, qToParent, nWorkers, config, qLimit, _app )
00268 
00269     nProcesses = nWorkers+2   # workers + (reader + writer)
00270     nSenders   = nWorkers+1   # workers + reader, the writer doesn't send anything
00271     nc = 0
00272 
00273     # collect statistics dicts from each worker
00274     store = [] ; hFlagCt = 0
00275     while nc < nProcesses :
00276         item = qToParent.get()
00277         if item    == 'h' : print 'HFLAG RECEIVED BY SETUPSYSTEM' ; [commonQueue.put(None) for i in xrange(nSenders)]
00278         if not item : nc += 1 # ; print 'Parent : NC now: %i'%( nc )
00279     # print ' PARENT : all None flags received'
00280     wks = []
00281     for dictionary in store :
00282         if dictionary['name'] == 'Worker' : wks.append(dictionary)
00283         if dictionary['name'] == 'Reader' : reader = dictionary
00284         if dictionary['name'] == 'Writer' : writer = dictionary
00285     # print 'PARENT recording stats'
00286     # recordReaderStats( reader )
00287     # recordWorkerStats( wks )
00288     # recordWriterStats( writer )
00289     # print ' ==================================================== PARENT FINISHED, RETURNING'
00290     return True
00291 
00292 def getOutputList( configuration ) :
00293     writers = configuration['ApplicationMgr'].OutStream
00294     print 'setupSystem: OutStreams identified : ', writers
00295     # check which tes items are for writing by looking at the writer algorithm settings
00296     # remove the trailing '#n' (sub-level notation for writer algorithms)
00297     outct = 0
00298     itmlst = []
00299     for w in writers :
00300         if hasattr(w,'Output') :
00301             for itm in w.ItemList + w.OptItemList :
00302                 hsh = itm.find('#')
00303                 itmlst.append(itm[:hsh])
00304     return itmlst
00305 
00306 def dumpHistograms( hvt, node='Unspecified', omitList=[] ) :
00307     nlist = hvt.getHistoNames( )
00308     throw = []
00309     for om in omitList :
00310         for name in nlist :
00311             if name.startswith(om) : throw.append(name)
00312     [ nlist.remove(name) for name in throw ]
00313     del throw
00314     histDict = {}
00315     objects = 0 ; histos = 0
00316     if nlist :
00317         for n in nlist :
00318             o = hvt[ n ]
00319             if type(o) in aidatypes :
00320                 o = aida2root(o)
00321                 histos  += 1
00322             else :
00323                 objects += 1
00324             histDict[ n ] = o
00325     else :
00326         print head+'WARNING : no histograms to recover?'
00327     # print line
00328     # print '%s : Histos collected'%(node)
00329     # print 'Objects : %i'%( objects    )
00330     # print 'Histos  : %i'%( histos     )
00331     # print 'Total   : %i'%( len(histDict.keys()) )
00332     # print line
00333     return histDict
00334 
00335 def dumpTES( someClass ) :
00336     buf = TBufferFile(TBuffer.kWrite)
00337     someClass.ts.dumpBuffer(buf)
00338     return buf
00339 
00340 def dumpTEStoMessage( self ) :
00341     buf = TMessage()
00342     self.ts.dumpBuffer(buf)
00343     return buf
00344 
00345 def loadTES( someClass, tbuf ) :
00346     # Using TMessages/TSockets, we DON'T do SetBufferOffset/ReadMode.  But we are using TBufferFiles
00347     # so it must be done always
00348     # if someClass.__class__.__name__ == 'Worker'  : tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00349     # if someClass.__class__.__name__ == 'Writer'  : tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00350     tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00351     root = gbl.DataObject()
00352     someClass.evt.setRoot('/Event', root)
00353     setOwnership(root, False)
00354     someClass.ts.loadBuffer(tbuf)
00355 
00356 class CollectHistograms( PyAlgorithm ) :
00357     def __init__( self, node ) :
00358         self.node = node
00359         self.omit = node.omitHistos
00360         PyAlgorithm.__init__( self )
00361         return None
00362     def execute( self ) :
00363         return SUCCESS
00364     def finalize( self ) :
00365         header = 'CollectHistograms : '
00366         w = self.node
00367         nodeName = w.__class__.__name__
00368         # print '*'*80
00369         w.histDict = dumpHistograms( w.hvt, node=nodeName, omitList= self.omit )
00370         # print header+'%s %i : about to send %i histos on output queue'%( nodeName, w.id, len(w.histDict.keys()) )
00371         # w.cstatq.put( (w.id, w.histDict) )
00372         ks = w.histDict.keys()
00373         # send 100 at a time
00374         chunk = 100
00375         reps = len(ks)/chunk + 1
00376         for i in xrange(reps) :
00377             someKeys = ks[i*chunk : (i+1)*chunk]
00378             smalld = dict( [(key, w.histDict[key]) for key in someKeys] )
00379             w.cstatq.put( (w.id, smalld) )
00380         w.cstatq.put( None )
00381         # print header+'%s %i: Histos queued to writer'%( nodeName, w.id )
00382         for item in iter(w.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(w.id)
00383         # now clear the histogram store
00384         # print 'Clearing Histo store'
00385         w.hvt.clearStore()
00386         root = gbl.DataObject()
00387         w.hvt.setRoot('/stat', root)
00388         w.hvt.dump()
00389         # print '*'*80
00390         return SUCCESS
00391 
00392 # ===========================================================================================
00393 # The Reader
00394 # ===========================================================================================
00395 
00396 class Reader( ) :
00397     def __init__( self, inq, commonQueue, rstatq, qToParent, workers, config, qLimit, _app ) :
00398         self.inq       = inq
00399         self.c         = config
00400         self.commonQ   = commonQueue
00401         self.cstatq    = rstatq
00402         self.workers   = workers
00403         self.qToParent = qToParent
00404         self.qLimit    = qLimit
00405         self.id        = -1
00406         self._app      = _app
00407 
00408         # from ParallelStats import constructReaderDict
00409         # self.constructReaderDict = constructReaderDict
00410 
00411         r = Process( target=self.read )
00412         r.start()
00413 
00414     def readerConfig( self ) :
00415         ks = self.c.keys()
00416         if 'ApplicationMgr' in ks :
00417             self.c['ApplicationMgr'].OutStream = []
00418             if self._app == 'Gauss' : pass
00419             else : self.c['ApplicationMgr'].TopAlg  = []
00420         else :
00421             self.qToParent.put(None)
00422             print 'Reader : readerConfig : ApplicationMgr not available for configuration?'
00423 
00424         try    : self.c['HistogramPersistencySvc'].OutputFile = ''
00425         except : print 'Reader : No Histogram output to cancel!  Config continues...'
00426 
00427         if 'MessageSvc' in ks :
00428             self.c['MessageSvc'].Format      = '[Reader]% F%18W%S%7W%R%T %0W%M'
00429             self.c['MessageSvc'].OutputLevel = ERROR
00430         else :
00431             self.qToParent.put(None)
00432             print 'Reader : readerConfig : MessageSvc not available for configuration?'
00433         if self._app == 'Gauss' :
00434             gs = self.c['GaussSequencer']
00435             # Sequencer has two stages : generation and simulation, so let
00436             # the reader generate events one by one, and pass to workers for full Simulation step
00437             ed = [gs.Members[0]]
00438             gs.Members = ed
00439 
00440     def read( self ):
00441         currentProcess().setName('+Reader+')
00442         appendPostConfigAction( self.readerConfig() )
00443 
00444         # print '[ GaudiPython.Parallel ] Reader Started : Process %i'%( os.getpid() )
00445 
00446         self.ct       = 0
00447 
00448         # GaudiPython Tools
00449         self.a = AppMgr()
00450         self.evt = self.a.evtsvc()
00451         self.hvt = self.a.histsvc()
00452         self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00453         self.omitHistos = ['/stat/CaloPIDs']
00454         collectHistos = CollectHistograms( self )
00455         self.a.addAlgorithm( collectHistos )
00456         self.a.initialize()
00457         self.a.start()
00458 
00459         for i in xrange( self.c['ApplicationMgr'].EvtMax ) :
00460             self.a.run(1)
00461             self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00462 
00463             if self._app == 'Gauss' :
00464                 if self.evt.getHistoNames() :
00465                     for i in self.evt.getHistoNames() : self.ts.addOptItem(i,1)
00466             else :
00467                 if self.evt.getList() :
00468                     for i in self.evt.getList() : self.ts.addOptItem(i,1)
00469             buf = dumpTES( self )
00470             sent = False
00471             while sent == False :
00472                 try :
00473                     self.inq.put( buf, block=True )
00474                     while self.inq._buffer : pass # try pause until the background thread is complete
00475                     sent = True ; self.ct += 1
00476                 except :
00477                     self.errs += 1
00478             self.evt.clearStore()
00479         # signal the end
00480         for w in range(self.workers) : self.inq.put(None)
00481 
00482 
00483         # Termination
00484         self.a.stop()
00485         self.a.finalize()
00486 
00487         # print '='*80
00488         # print 'Reader Finished, AppMgr stopped, finalized.'
00489         # print line
00490         # print 'Events Sent : %i'%( self.ct )
00491         # print '='*80
00492 
00493         self.qToParent.put(None) # ; print 'READER has returned None Flag to setupSystem'
00494 
00495 # ===========================================================================================
00496 # The Worker
00497 # ===========================================================================================
00498 
00499 class Worker( ) :
00500     def __init__( self, wid, inq, cq, outq, cstatq, qToParent, nprocs, config, qLimit, _app, itemlist=None ) :
00501 
00502         # wid      : an integer (0...Nworkers-1) identifying the worker
00503         # in_q     : the common queue from reader to workers
00504         # out_q    : the queue from a worker to the writer (unique queue for each worker)
00505         # common_q : the common queue from workers to writer, along which 'id' will be sent on event completion
00506         self.id           = wid
00507         self.c            = config
00508         self.inq          = inq
00509         self.qLimit       = qLimit
00510         self.commonQ      = cq
00511         self.cstatq       = cstatq
00512         self.qToParent    = qToParent
00513         self.nprocs       = nprocs
00514         self.outList      = []
00515         self.tempCt       = 0
00516         self.putErrs      = 0
00517         self.completed    = 0
00518         self.KeepGoing    = True
00519         self.itmlst       = itemlist
00520         self._app      = _app
00521 
00522         self.outq         = outq
00523 
00524         # from ParallelStats import constructWorkerDict
00525         # self.constructWorkerDict = constructWorkerDict
00526 
00527         if self.itmlst : self.eventOutput = True
00528         else           : self.eventOutput = False
00529 
00530         w = Process( target=self.work )
00531         w.setDaemon(True)
00532         w.start()
00533 
00534     def workerConfig( self ) :
00535         ks = self.c.keys()
00536         if 'ApplicationMgr' in ks :
00537             self.c['ApplicationMgr'].OutStream = []
00538         else :
00539             self.qToParent.put(None)
00540             print 'Worker %i : workerConfig : ApplicationMgr not available for configuration?'%( self.id )
00541         if 'EventSelector' in ks :
00542             self.c['EventSelector'].Input      = []
00543         else :
00544             print 'Worker %i : workerConfig : EventSelector not available for configuration?'%( self.id )
00545         try    : self.c['HistogramPersistencySvc'].OutputFile = ''
00546         except : print 'Worker-%i: No Histogram output to cancel! Config continues...'%self.id
00547         formatHead = '[Worker %i]'%self.id
00548         if 'MessageSvc' in ks :
00549             self.c['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00550             self.c['MessageSvc'].OutputLevel = INFO
00551         else :
00552             print 'Worker %i : workerConfig : MessageSvc not available for configuration?'%( self.id )
00553 
00554         if self.id :
00555             for k in ks :
00556                 if hasattr( self.c[k], 'OutputLevel' ) : self.c[k].OutputLevel = ERROR
00557 
00558         if self._app == 'Gauss' :
00559             gs = self.c[ 'GaussSequencer' ]
00560             # Sequencer has two stages : generation and simulation, so let
00561             # the reader generate events one by one, and pass to workers for full Simulation step
00562             ed = [ gs.Members[1] ]
00563             gs.Members = ed
00564 
00565         if self._app == 'DaVinci' :
00566             if 'NTupleSvc' in self.c.keys() :
00567                 self.c['NTupleSvc'].Output = ["FILE1 DATAFILE='Worker-%i-Hlt12-StatsTuple.root' TYP='ROOT' OPT='NEW'"%self.id]
00568                 self.c['NTupleSvc'].OutputLevel = VERBOSE
00569 
00570     def workerExecuteWithOutput( self, tbuf ) :
00571         loadTES( self, tbuf )
00572         self.a._evtpro.executeEvent()
00573         buf = dumpTES( self )
00574         self.outq.put(buf)
00575         # allow the background thread to feed the Queue; not 100% guaranteed to finish before next line
00576         while self.outq._buffer : pass
00577         self.evt.clearStore()
00578         self.completed += 1
00579         return True
00580 
00581     def workerExecuteNoOutput( self, tbuf ) :
00582         loadTES( self, tbuf )
00583         self.a._evtpro.executeEvent()
00584         self.outq.put('dummy')
00585         self.evt.clearStore()
00586         self.completed += 1
00587         return True
00588 
00589     def work( self ):
00590         # print 'Worker %d: starting...%d, at %5.6f' % (self.id, os.getpid(), alive)
00591         cName = currentProcess().getName()
00592         currentProcess().setName('Worker '+cName)
00593         appendPostConfigAction( self.workerConfig() )
00594 
00595         self.ct = 0
00596 
00597         # take first event
00598         buf = self.inq.get()
00599         if buf is not None :
00600             # set up GaudiPython tools
00601             self.a   = AppMgr()
00602             self.evt = self.a.evtsvc()
00603             self.hvt = self.a.histsvc()
00604             self.nvt = self.a.ntuplesvc()
00605             first = True
00606             self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00607             self.omitHistos = ['/stat/CaloPIDs']
00608             collectHistos = CollectHistograms( self )
00609             self.a.addAlgorithm( collectHistos )
00610             self.a.initialize()
00611             self.a.start()
00612 
00613             # worker execution changes depending on output/no-output scenario
00614             if self.eventOutput :
00615                 [ self.ts.addOptItem(itm,1) for itm in self.itmlst ]
00616                 wFunction = self.workerExecuteWithOutput
00617             else :
00618                 wFunction = self.workerExecuteNoOutput
00619 
00620 
00621             # iter over the Queue to workers, receiving serialized events. When done, workers place None on queue
00622             while self.KeepGoing :
00623                 if first : pass
00624                 else : buf = self.inq.get()
00625                 if buf is not None :
00626                     sc = wFunction( buf )
00627                     if sc : pass
00628                     self.ct += 1
00629                     if first : first = False
00630                 else :
00631                     self.KeepGoing = False
00632 
00633             # print '='*80
00634             # print 'Worker %i Finished'%( self.id )
00635             self.outq.put(None)
00636             # print line
00637             # print 'Events Recd : %i'%( self.ct )
00638             # print '='*80
00639 
00640             sc = self.finalize()
00641             self.qToParent.put(None)
00642             # print '[ GaudiPython Parallel ] : Worker has sent dict and None flag back to Parent'
00643         else : # if the buffer is None...
00644             self.outq.put(None)
00645             sc = self.finalize()
00646             self.qToParent.put(None)
00647             self.cstatq.put( (self.id, {}) )
00648             self.cstatq.put( None )
00649             for item in iter(self.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(self.id)
00650 
00651     def finalize( self ) :
00652         self.a.stop()
00653         self.a.finalize()
00654         self.Finished = True
00655         return True
00656 
00657 # ===========================================================================================
00658 # The Writer
00659 # ===========================================================================================
00660 
00661 class Writer( ) :
00662     def __init__( self, common_queue, out_qList, cstatq, rstatq, qToParent, workers, config, qLimit, _app ) :
00663         self.qList     = out_qList
00664         self.cq        = common_queue
00665         self.cstatq    = cstatq
00666         self.rstatq    = rstatq
00667         self.qToParent = qToParent
00668         self.qLimit    = qLimit
00669         self.workers   = workers
00670         self.output    = True
00671         self.flags     = 0
00672         self.c         = config
00673         self._app      = _app
00674 
00675         # from ParallelStats import constructWriterDict
00676         # self.constructWriterDict = constructWriterDict
00677 
00678         self.bookingDict = {}
00679         self.bookingDict['DataObject']        = self.bookDataObject
00680         self.bookingDict['NTuple::Directory'] = self.bookDataObject
00681         self.bookingDict['NTuple::File']      = self.bookDataObject
00682         self.bookingDict['TH1D']       = self.bookTH1D
00683         self.bookingDict['TH2D']       = self.bookTH2D
00684         self.bookingDict['TH3D']       = self.bookTH3D
00685         self.bookingDict['TProfile']   = self.bookTProfile
00686         self.bookingDict['TProfile2D'] = self.bookTProfile2D
00687 
00688         w = Process( target=self.write )
00689         w.start()
00690 
00691     def bookDataObject( self, n, o ):
00692         self.hvt.registerObject( n, o )
00693 
00694     def bookTH1D( self, n, o ) :
00695         obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax())
00696         aida2root(obj).Add(o)
00697 
00698     def bookTH2D( self, n, o ) :
00699         obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax())
00700         aida2root(obj).Add(o)
00701 
00702     def bookTH3D( self, n, o ) :
00703         obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetXbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(),
00704                                                   o.GetYaxis().GetXbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(),
00705                                                   o.GetZaxis().GetXbins(), o.GetZaxis().GetXmin(), o.GetZaxis().GetXmax() )
00706         aida2root(obj).Add(o)
00707 
00708     def bookTProfile( self, n, o ) :
00709         obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetOption())
00710         aida2root(obj).Add(o)
00711 
00712     def bookTProfile2D( self, n, o ) :
00713         obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(), o.GetOption())
00714         aida2root(obj).Add(o)
00715 
00716 
00717     def writerConfig( self ) :
00718         ks = self.c.keys()
00719         if 'EventSelector' in ks :
00720             self.c['EventSelector'].Input      = []
00721         else :
00722             print 'Writer : writerConfig : EventSelector not available for configuration?'
00723         if 'ApplicationMgr' in ks :
00724             self.c['ApplicationMgr'].TopAlg    = []
00725         else :
00726             print 'Writer : writerConfig : ApplicationMgr not available for configuration?'
00727         if 'MessageSvc' in ks :
00728             self.c['MessageSvc'].Format        = '[Writer]% F%18W%S%7W%R%T %0W%M'
00729         else :
00730             print 'Writer : writerConfig : MessageSvc not available for configuration?'
00731         # sometimes the outstreams require that certain algs have completed
00732         # obviously, these algs aren't going to be run on our independent writer
00733         for k in self.c.keys() :
00734             if hasattr(self.c[k], 'RequireAlgs') : self.c[k].RequireAlgs = []
00735 
00736     def write( self ):
00737         currentProcess().setName('+Writer+')
00738         print 'WRITER : applying PostConfigAction'
00739         appendPostConfigAction( self.writerConfig() )
00740         print '[ GaudiPython.Parallel ] Writer started : Process %i'%( os.getpid() )
00741 
00742         self.a = AppMgr()
00743         self.a.initialize()
00744         self.a.start()
00745         self.evt = self.a.evtsvc()
00746         self.hvt = self.a.histsvc()
00747         self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00748 
00749         self.ct = 0
00750         status = [True]*self.workers
00751         recvd  = [0]*self.workers
00752         cqnc = 0  # none count from the common queue
00753         gotOK = True
00754 
00755         ind = 0 ; gotWhich = False ; ifblock = [] ; tryBlock = [] ; starttry = None ; outs = []
00756 
00757         waitForFlag = True ; whichQ = 0
00758         while sum(status) > 0 :
00759             whichQ = (whichQ+1)%self.workers
00760             outq = self.qList[whichQ]
00761             trying = True
00762             try :
00763                 tbuf = outq.get(timeout=0.01,block=False)
00764                 trying = False
00765             except :
00766                 continue
00767             if tbuf :
00768                 recvd[whichQ] += 1
00769                 try :
00770                     loadTES( self, tbuf )
00771                     self.a._evtpro.executeEvent()   # fire the writing of TES to output file
00772                     self.evt.clearStore()           # and clear out the TES
00773                     self.ct += 1
00774                     if not self.ct%20 : print '[ GaudiPython.Parallel ] Writer Progress (n. Events) : %i'%self.ct
00775                 except :
00776                     print '[ GaudiPython.Parallel ] Writer trying to load a ', type(tbuf), tbuf[:10] ,  '??... skipping to next'
00777             else :
00778                 status[whichQ] = False   # that worker is finished...
00779                 # print 'Writer received None from worker %i'%( whichQ )
00780 
00781         # print '[ GaudiPython.Parallel ] Writer Complete : %i Events received'%self.ct
00782         if self.output : self.finalize()
00783         else           : self.a.stop()   ; self.a.finalize()
00784         # self.constructWriterDict()
00785         # self.qToParent.put(self.d)
00786         self.qToParent.put(None)
00787 
00788     def composition( self ) :
00789         lst = self.hvt.getHistoNames()
00790         record = []
00791         objects = 0 ; histos = 0
00792         if lst :
00793             for n in lst :
00794                 o = self.hvt[ n ]
00795                 if type(o) in aidatypes :
00796                     histos  += 1
00797                     record.append( (n, o.entries()) )
00798                 else :
00799                     objects += 1
00800         else :
00801             print 'Empty list!'
00802         # print line
00803         # print 'Size of Histo Store : %i'%( len(lst) )
00804         # print ' - Histos           : %i'%( histos   )
00805         # print ' - Objects          : %i'%( objects  )
00806         # print line
00807 
00808     def finalize( self ) :
00809         nc = 0
00810         self.HistoCollection = []
00811         for item in iter(self.rstatq.get, None) : self.HistoCollection.append( item )
00812         while nc < self.workers :
00813             tup = self.cstatq.get()
00814             if tup : self.HistoCollection.append( tup )   # tup is (worker-id, histoDict)
00815             else   : nc += 1
00816         self.HistoCollection.sort()
00817         # send signal to Parent that all Histos have been received
00818         self.qToParent.put('h')
00819         self.RebuildHistoStore()
00820 
00821         self.a.stop()
00822         self.a.finalize()
00823         # print '[ GaudiPython.Parallel ] Writer Complete.'
00824 
00825     def RebuildHistoStore( self ) :
00826         for tup in self.HistoCollection :
00827             workerID, histDict = tup
00828             added = 0 ; registered = 0; booked = 0
00829             for n in histDict.keys() :
00830                 o = histDict[ n ]
00831                 obj = self.hvt.retrieve( n )
00832                 if obj :
00833                     aida2root(obj).Add(o)
00834                     added += 1
00835                 else :
00836                     if o.__class__.__name__ in self.bookingDict.keys() :
00837                         self.bookingDict[o.__class__.__name__](n, o)
00838                     else :
00839                         print 'No booking method for: ', n, o, type(o), o.__class__.__name__
00840                     booked += 1
00841             # print '='*80
00842             # print 'Set of Histos complete'
00843             # print 'Added      (histos)  : %i'%( added      )
00844             # print 'Registered (objects) : %i'%( registered )
00845             # print 'Booked     (histos)  : %i'%( booked     )
00846             # print line
00847             # print 'Size of Histo Store  : %i'%( len( self.hvt.getHistoNames() ) )
00848             # print '='*80
00849         # print 'o'*80
00850         # print 'Writer : All Histogram sets added.'
00851         # self.composition( )
00852         # print 'o'*80
00853 
00854 # == EOF ====================================================================================

Generated at Mon May 3 12:14:39 2010 for Gaudi Framework, version v21r9 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004