Gaudi Framework, version v21r6

Home   Generated: 11 Nov 2009

GaudiPython::Parallel Namespace Reference


Classes

class  Statistics
class  Task
class  WorkManager
class  SshSession
class  CollectHistograms
class  Reader
class  Worker
class  Writer

Functions

def _prefunction
def _ppfunction
def _detect_ncpus
def setupSystem
def getOutputList
def dumpHistograms
def dumpTES
def dumpTEStoMessage
def loadTES

Variables

list __all__ = [ 'Task','WorkManager' ]
list excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
tuple aidatypes
tuple thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
string gppHead = '[ GaudiPythonParallel ] '
string line = '-'


Function Documentation

def GaudiPython::Parallel::_detect_ncpus (  )  [private]

Detects the number of effective CPUs in the system

Definition at line 45 of file Parallel.py.

00045                    :
00046     """Detects the number of effective CPUs in the system"""
00047     #for Linux, Unix and MacOS
00048     if hasattr(os, "sysconf"):
00049         if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
00050             #Linux and Unix
00051             ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
00052             if isinstance(ncpus, int) and ncpus > 0:
00053                 return ncpus
00054             else:
00055                 #MacOS X
00056                 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
00057     #for Windows
00058     if os.environ.has_key("NUMBER_OF_PROCESSORS"):
00059         ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
00060         if ncpus > 0:
00061             return ncpus
00062     #return the default value
00063     return 1
00064 
class Statistics(object):

def GaudiPython::Parallel::_ppfunction (   args  )  [private]

Definition at line 27 of file Parallel.py.

00027                         :
00028     #--- Unpack arguments
00029     task, item = args
00030     stat = Statistics()
00031     #--- Initialize the remote side (at least once)
00032     if not task.__class__._initializeDone :
00033         for k,v in task.environ.items() :
00034             if k not in excluded_varnames : os.environ[k] = v
00035         task.initializeRemote()
00036         task.__class__._initializeDone = True
00037     #--- Reset the task output
00038     task._resetOutput()
00039     #--- Call processing
00040     task.process(item)
00041     #--- Collect statistics
00042     stat.stop()
00043     return (copy.deepcopy(task.output), stat)
00044 
def _detect_ncpus():

def GaudiPython::Parallel::_prefunction (   f,
  task,
  item 
) [private]

Definition at line 25 of file Parallel.py.

00025                                  :
00026     return f((task,item))
def _ppfunction( args ) :

def GaudiPython::Parallel::dumpHistograms (   hvt,
  node = 'Unspecified',
  omitList = [] 
)

Definition at line 306 of file Parallel.py.

00306                                                            :
00307     nlist = hvt.getHistoNames( )
00308     throw = []
00309     for om in omitList :
00310         for name in nlist :
00311             if name.startswith(om) : throw.append(name)
00312     [ nlist.remove(name) for name in throw ]
00313     del throw
00314     histDict = {}
00315     objects = 0 ; histos = 0
00316     if nlist :
00317         for n in nlist :
00318             o = hvt[ n ]
00319             if type(o) in aidatypes :
00320                 o = aida2root(o)
00321                 histos  += 1
00322             else :
00323                 objects += 1
00324             histDict[ n ] = o
00325     else :
00326         print head+'WARNING : no histograms to recover?'
00327     # print line
00328     # print '%s : Histos collected'%(node)
00329     # print 'Objects : %i'%( objects    )
00330     # print 'Histos  : %i'%( histos     )
00331     # print 'Total   : %i'%( len(histDict.keys()) )
00332     # print line
00333     return histDict
00334 
def dumpTES( someClass ) :

def GaudiPython::Parallel::dumpTES (   someClass  ) 

Definition at line 335 of file Parallel.py.

00335                          :
00336     buf = TBufferFile(TBuffer.kWrite)
00337     someClass.ts.dumpBuffer(buf)
00338     return buf
00339 
def dumpTEStoMessage( self ) :

def GaudiPython::Parallel::dumpTEStoMessage (   self  ) 

Definition at line 340 of file Parallel.py.

00340                              :
00341     buf = TMessage()
00342     self.ts.dumpBuffer(buf)
00343     return buf
00344 
def loadTES( someClass, tbuf ) :

def GaudiPython::Parallel::getOutputList (   configuration  ) 

Definition at line 292 of file Parallel.py.

00292                                    :
00293     writers = configuration['ApplicationMgr'].OutStream
00294     print 'setupSystem: OutStreams identified : ', writers
00295     # check which tes items are for writing by looking at the writer algorithm settings
00296     # remove the trailing '#n' (sub-level notation for writer algorithms)
00297     outct = 0
00298     itmlst = []
00299     for w in writers :
00300         if hasattr(w,'Output') :
00301             for itm in w.ItemList + w.OptItemList :
00302                 hsh = itm.find('#')
00303                 itmlst.append(itm[:hsh])
00304     return itmlst
00305 
def dumpHistograms( hvt, node='Unspecified', omitList=[] ) :

def GaudiPython::Parallel::loadTES (   someClass,
  tbuf 
)

Definition at line 345 of file Parallel.py.

00345                                :
00346     # Using TMessages/TSockets, we DON'T do SetBufferOffset/ReadMode.  But we are using TBufferFiles
00347     # so it must be done always
00348     # if someClass.__class__.__name__ == 'Worker'  : tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00349     # if someClass.__class__.__name__ == 'Writer'  : tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00350     tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00351     root = gbl.DataObject()
00352     someClass.evt.setRoot('/Event', root)
00353     setOwnership(root, False)
00354     someClass.ts.loadBuffer(tbuf)
00355 
class CollectHistograms( PyAlgorithm ) :

def GaudiPython::Parallel::setupSystem (   nWorkers,
  config 
)

Definition at line 234 of file Parallel.py.

00234                                   :
00235     qLimit = 50
00236     inq         = Queue( )                                        # the Queue from Reader->(Workers)
00237     outQs       = [ Queue( qLimit ) for i in xrange(nWorkers) ]   # One outQueue for each Worker->Writer
00238 
00239     commonQueue = Queue( )                                        # Common Queue from setupSystem->(everyone) (for signals)
00240     qToParent   = Queue( )                                        # Common Queue from (everyone)->setupSystem (for signals)
00241 
00242     cStatQueue  = Queue( )                                        # A common queue, shared for (Workers)->Writer
00243     rHistQ      = Queue( )                                        # A special Queue from Reader->Writer (bypassing workers)
00244 
00245     # keys of the config dictionary
00246     ks = config.keys()
00247     ks.sort()
00248 
00249     # determine application
00250     apps = ['Gauss', 'Boole', 'Brunel', 'DaVinci']
00251     _app = None
00252     for app in apps :
00253         if app in ks : _app = app
00254 
00255     # determine output streams
00256     outs = [ 'Output', 'OutputFile', 'OutStream' ]
00257     outlist = []
00258     for k in ks :
00259         for o in outs :
00260             if hasattr( config[k], o ) :  outlist.append(k)
00261 
00262     itmlst = getOutputList( config )
00263 
00264     # create Reader/Worker/Writer classes
00265     w   =  Writer(        commonQueue, outQs,    cStatQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app )
00266     wks = [Worker(i, inq, commonQueue, outQs[i], cStatQueue,         qToParent, nWorkers, config, qLimit, _app, itemlist=itmlst) for i in xrange(nWorkers)]
00267     r   =  Reader(   inq, commonQueue,                       rHistQ, qToParent, nWorkers, config, qLimit, _app )
00268 
00269     nProcesses = nWorkers+2   # workers + (reader + writer)
00270     nSenders   = nWorkers+1   # workers + reader, the writer doesn't send anything
00271     nc = 0
00272 
00273     # collect statistics dicts from each worker
00274     store = [] ; hFlagCt = 0
00275     while nc < nProcesses :
00276         item = qToParent.get()
00277         if item    == 'h' : print 'HFLAG RECEIVED BY SETUPSYSTEM' ; [commonQueue.put(None) for i in xrange(nSenders)]
00278         if not item : nc += 1 # ; print 'Parent : NC now: %i'%( nc )
00279     # print ' PARENT : all None flags received'
00280     wks = []
00281     for dictionary in store :
00282         if dictionary['name'] == 'Worker' : wks.append(dictionary)
00283         if dictionary['name'] == 'Reader' : reader = dictionary
00284         if dictionary['name'] == 'Writer' : writer = dictionary
00285     # print 'PARENT recording stats'
00286     # recordReaderStats( reader )
00287     # recordWorkerStats( wks )
00288     # recordWriterStats( writer )
00289     # print ' ==================================================== PARENT FINISHED, RETURNING'
00290     return True
00291 
def getOutputList( configuration ) :


Variable Documentation

Definition at line 12 of file Parallel.py.

GaudiPython::Parallel::aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root

Definition at line 223 of file Parallel.py.

Initial value:

( gbl.AIDA.IHistogram,
              gbl.AIDA.IHistogram1D,
              gbl.AIDA.IHistogram2D,
              gbl.AIDA.IHistogram3D,
              gbl.AIDA.IProfile1D,
              gbl.AIDA.IProfile2D )

Definition at line 224 of file Parallel.py.

list GaudiPython::Parallel::excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']

Definition at line 13 of file Parallel.py.

string GaudiPython::Parallel::gppHead = '[ GaudiPythonParallel ] '

Definition at line 231 of file Parallel.py.

Definition at line 232 of file Parallel.py.

tuple GaudiPython::Parallel::thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )

Definition at line 230 of file Parallel.py.


Generated at Wed Nov 11 16:37:03 2009 for Gaudi Framework, version v21r6 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004