|
Gaudi Framework, version v21r6 |
| Home | Generated: 11 Nov 2009 |
Classes | |
| class | Statistics |
| class | Task |
| class | WorkManager |
| class | SshSession |
| class | CollectHistograms |
| class | Reader |
| class | Worker |
| class | Writer |
Functions | |
| def | _prefunction |
| def | _ppfunction |
| def | _detect_ncpus |
| def | setupSystem |
| def | getOutputList |
| def | dumpHistograms |
| def | dumpTES |
| def | dumpTEStoMessage |
| def | loadTES |
Variables | |
| list | __all__ = [ 'Task','WorkManager' ] |
| list | excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY'] |
| aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root | |
| tuple | aidatypes |
| tuple | thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D ) |
| string | gppHead = '[ GaudiPythonParallel ] ' |
| string | line = '-' |
| def GaudiPython::Parallel::_detect_ncpus | ( | ) | [private] |
Detects the number of effective CPUs in the system
Definition at line 45 of file Parallel.py.
00045 : 00046 """Detects the number of effective CPUs in the system""" 00047 #for Linux, Unix and MacOS 00048 if hasattr(os, "sysconf"): 00049 if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): 00050 #Linux and Unix 00051 ncpus = os.sysconf("SC_NPROCESSORS_ONLN") 00052 if isinstance(ncpus, int) and ncpus > 0: 00053 return ncpus 00054 else: 00055 #MacOS X 00056 return int(os.popen2("sysctl -n hw.ncpu")[1].read()) 00057 #for Windows 00058 if os.environ.has_key("NUMBER_OF_PROCESSORS"): 00059 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); 00060 if ncpus > 0: 00061 return ncpus 00062 #return the default value 00063 return 1 00064 class Statistics(object):
| def GaudiPython::Parallel::_ppfunction | ( | args | ) | [private] |
Definition at line 27 of file Parallel.py.
00027 : 00028 #--- Unpack arguments 00029 task, item = args 00030 stat = Statistics() 00031 #--- Initialize the remote side (at least once) 00032 if not task.__class__._initializeDone : 00033 for k,v in task.environ.items() : 00034 if k not in excluded_varnames : os.environ[k] = v 00035 task.initializeRemote() 00036 task.__class__._initializeDone = True 00037 #--- Reset the task output 00038 task._resetOutput() 00039 #--- Call processing 00040 task.process(item) 00041 #--- Collect statistics 00042 stat.stop() 00043 return (copy.deepcopy(task.output), stat) 00044 def _detect_ncpus():
| def GaudiPython::Parallel::_prefunction | ( | f, | ||
| task, | ||||
| item | ||||
| ) | [private] |
Definition at line 25 of file Parallel.py.
00025 : 00026 return f((task,item)) def _ppfunction( args ) :
| def GaudiPython::Parallel::dumpHistograms | ( | hvt, | ||
node = 'Unspecified', |
||||
omitList = [] | ||||
| ) |
Definition at line 306 of file Parallel.py.
00306 : 00307 nlist = hvt.getHistoNames( ) 00308 throw = [] 00309 for om in omitList : 00310 for name in nlist : 00311 if name.startswith(om) : throw.append(name) 00312 [ nlist.remove(name) for name in throw ] 00313 del throw 00314 histDict = {} 00315 objects = 0 ; histos = 0 00316 if nlist : 00317 for n in nlist : 00318 o = hvt[ n ] 00319 if type(o) in aidatypes : 00320 o = aida2root(o) 00321 histos += 1 00322 else : 00323 objects += 1 00324 histDict[ n ] = o 00325 else : 00326 print head+'WARNING : no histograms to recover?' 00327 # print line 00328 # print '%s : Histos collected'%(node) 00329 # print 'Objects : %i'%( objects ) 00330 # print 'Histos : %i'%( histos ) 00331 # print 'Total : %i'%( len(histDict.keys()) ) 00332 # print line 00333 return histDict 00334 def dumpTES( someClass ) :
| def GaudiPython::Parallel::dumpTES | ( | someClass | ) |
Definition at line 335 of file Parallel.py.
00335 : 00336 buf = TBufferFile(TBuffer.kWrite) 00337 someClass.ts.dumpBuffer(buf) 00338 return buf 00339 def dumpTEStoMessage( self ) :
| def GaudiPython::Parallel::dumpTEStoMessage | ( | self | ) |
Definition at line 340 of file Parallel.py.
00340 : 00341 buf = TMessage() 00342 self.ts.dumpBuffer(buf) 00343 return buf 00344 def loadTES( someClass, tbuf ) :
| def GaudiPython::Parallel::getOutputList | ( | configuration | ) |
Definition at line 292 of file Parallel.py.
00292 : 00293 writers = configuration['ApplicationMgr'].OutStream 00294 print 'setupSystem: OutStreams identified : ', writers 00295 # check which tes items are for writing by looking at the writer algorithm settings 00296 # remove the trailing '#n' (sub-level notation for writer algorithms) 00297 outct = 0 00298 itmlst = [] 00299 for w in writers : 00300 if hasattr(w,'Output') : 00301 for itm in w.ItemList + w.OptItemList : 00302 hsh = itm.find('#') 00303 itmlst.append(itm[:hsh]) 00304 return itmlst 00305 def dumpHistograms( hvt, node='Unspecified', omitList=[] ) :
| def GaudiPython::Parallel::loadTES | ( | someClass, | ||
| tbuf | ||||
| ) |
Definition at line 345 of file Parallel.py.
00345 : 00346 # Using TMessages/TSockets, we DON'T do SetBufferOffset/ReadMode. But we are using TBufferFiles 00347 # so it must be done always 00348 # if someClass.__class__.__name__ == 'Worker' : tbuf.SetBufferOffset() ; tbuf.SetReadMode() 00349 # if someClass.__class__.__name__ == 'Writer' : tbuf.SetBufferOffset() ; tbuf.SetReadMode() 00350 tbuf.SetBufferOffset() ; tbuf.SetReadMode() 00351 root = gbl.DataObject() 00352 someClass.evt.setRoot('/Event', root) 00353 setOwnership(root, False) 00354 someClass.ts.loadBuffer(tbuf) 00355 class CollectHistograms( PyAlgorithm ) :
| def GaudiPython::Parallel::setupSystem | ( | nWorkers, | ||
| config | ||||
| ) |
Definition at line 234 of file Parallel.py.
00234 : 00235 qLimit = 50 00236 inq = Queue( ) # the Queue from Reader->(Workers) 00237 outQs = [ Queue( qLimit ) for i in xrange(nWorkers) ] # One outQueue for each Worker->Writer 00238 00239 commonQueue = Queue( ) # Common Queue from setupSystem->(everyone) (for signals) 00240 qToParent = Queue( ) # Common Queue from (everyone)->setupSystem (for signals) 00241 00242 cStatQueue = Queue( ) # A common queue, shared for (Workers)->Writer 00243 rHistQ = Queue( ) # A special Queue from Reader->Writer (bypassing workers) 00244 00245 # keys of the config dictionary 00246 ks = config.keys() 00247 ks.sort() 00248 00249 # determine application 00250 apps = ['Gauss', 'Boole', 'Brunel', 'DaVinci'] 00251 _app = None 00252 for app in apps : 00253 if app in ks : _app = app 00254 00255 # determine output streams 00256 outs = [ 'Output', 'OutputFile', 'OutStream' ] 00257 outlist = [] 00258 for k in ks : 00259 for o in outs : 00260 if hasattr( config[k], o ) : outlist.append(k) 00261 00262 itmlst = getOutputList( config ) 00263 00264 # create Reader/Worker/Writer classes 00265 w = Writer( commonQueue, outQs, cStatQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app ) 00266 wks = [Worker(i, inq, commonQueue, outQs[i], cStatQueue, qToParent, nWorkers, config, qLimit, _app, itemlist=itmlst) for i in xrange(nWorkers)] 00267 r = Reader( inq, commonQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app ) 00268 00269 nProcesses = nWorkers+2 # workers + (reader + writer) 00270 nSenders = nWorkers+1 # workers + reader, the writer doesn't send anything 00271 nc = 0 00272 00273 # collect statistics dicts from each worker 00274 store = [] ; hFlagCt = 0 00275 while nc < nProcesses : 00276 item = qToParent.get() 00277 if item == 'h' : print 'HFLAG RECEIVED BY SETUPSYSTEM' ; [commonQueue.put(None) for i in xrange(nSenders)] 00278 if not item : nc += 1 # ; print 'Parent : NC now: %i'%( nc ) 00279 # print ' PARENT : all None flags received' 00280 wks = [] 00281 for dictionary in store : 00282 if dictionary['name'] == 'Worker' : wks.append(dictionary) 00283 if dictionary['name'] == 'Reader' : reader = dictionary 00284 if dictionary['name'] == 'Writer' : writer = dictionary 00285 # print 'PARENT recording stats' 00286 # recordReaderStats( reader ) 00287 # recordWorkerStats( wks ) 00288 # recordWriterStats( writer ) 00289 # print ' ==================================================== PARENT FINISHED, RETURNING' 00290 return True 00291 def getOutputList( configuration ) :
| list GaudiPython::Parallel::__all__ = [ 'Task','WorkManager' ] |
Definition at line 12 of file Parallel.py.
| GaudiPython::Parallel::aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root |
Definition at line 223 of file Parallel.py.
Initial value:
( gbl.AIDA.IHistogram,
gbl.AIDA.IHistogram1D,
gbl.AIDA.IHistogram2D,
gbl.AIDA.IHistogram3D,
gbl.AIDA.IProfile1D,
gbl.AIDA.IProfile2D )
Definition at line 224 of file Parallel.py.
| list GaudiPython::Parallel::excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY'] |
Definition at line 13 of file Parallel.py.
| string GaudiPython::Parallel::gppHead = '[ GaudiPythonParallel ] ' |
Definition at line 231 of file Parallel.py.
| string GaudiPython::Parallel::line = '-' |
Definition at line 232 of file Parallel.py.
| tuple GaudiPython::Parallel::thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D ) |
Definition at line 230 of file Parallel.py.