Gaudi Framework, version v23r4

Home   Generated: Mon Sep 17 2012

Parallel.py

Go to the documentation of this file.
00001 # File: GaudiMP/Parallel.py
00002 # Author: Pere Mato (pere.mato@cern.ch)
00003 
00004 """ GaudiMP.Parallel module.
00005     This module provides 'parallel' processing support for GaudiPyhton.
00006     It is adding some sugar on top of public domain packages such as
00007     the 'multiprocessing' or the 'pp' packages. The interface can be made
00008     independent of the underlying implementation package.
00009     Two main class are defined: Task and WorkManager
00010 """
00011 
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014 
00015 import sys, os, time, copy
00016 import multiprocessing
00017 
00018 def _prefunction( f, task, item) :
00019     return f((task,item))
00020 def _ppfunction( args ) :
00021     #--- Unpack arguments
00022     task, item = args
00023     stat = Statistics()
00024     #--- Initialize the remote side (at least once)
00025     if not task.__class__._initializeDone :
00026         for k,v in task.environ.items() :
00027             if k not in excluded_varnames : os.environ[k] = v
00028         task.initializeRemote()
00029         task.__class__._initializeDone = True
00030     #--- Reset the task output
00031     task._resetOutput()
00032     #--- Call processing
00033     task.process(item)
00034     #--- Collect statistics
00035     stat.stop()
00036     return (copy.deepcopy(task.output), stat)
00037 
00038 class Statistics(object):
00039     def __init__(self):
00040         self.name  = os.getenv('HOSTNAME')
00041         self.start = time.time()
00042         self.time  = 0.0
00043         self.njob  = 0
00044     def stop(self):
00045         self.time = time.time() - self.start
00046 
00047 class Task(object) :
00048     """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00049         User class much inherit from it and implement the methods initializeLocal,
00050         initializeRemote, process and finalize.   """
00051     _initializeDone = False
00052     def __new__ ( cls, *args, **kwargs ):
00053         task = object.__new__( cls )
00054         task.output = ()
00055         task.environ = {}
00056         for k,v in os.environ.items(): task.environ[k] = v
00057         task.cwd = os.getcwd()
00058         return task
00059     def initializeLocal(self):
00060         pass
00061     def initializeRemote(self):
00062         pass
00063     def process(self, item):
00064         pass
00065     def finalize(self) :
00066         pass
00067     def _mergeResults(self, result) :
00068         if type(result) is not type(self.output) :
00069             raise TypeError("output type is not same as obtained result")
00070         #--No iteratable---
00071         if not hasattr( result , '__iter__' ):
00072             if hasattr(self.output,'Add') : self.output.Add(result)
00073             elif hasattr(self.output,'__iadd__') : self.output += result
00074             elif hasattr(self.output,'__add__') : self.output = self.output + result
00075             else : raise TypeError('result cannot be added')
00076         #--Dictionary---
00077         elif type(result) is dict :
00078             if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00079             else: minkeys = result.keys()
00080             for key in result.keys() :
00081                 if key in self.output :
00082                     if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00083                     elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00084                     elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00085                     else : raise TypeError('result cannot be added')
00086                 else :
00087                     self.output[key] = result[key]
00088         #--Anything else (list)
00089         else :
00090             for i in range( min( len(self.output) , len(result)) ):
00091                 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00092                 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00093                 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00094                 else : raise TypeError('result cannot be added')
00095     def _resetOutput(self):
00096         output =  (type(self.output) is dict) and self.output.values() or self.output
00097         for o in output :
00098             if hasattr(o, 'Reset'): o.Reset()
00099 
00100 
00101 class WorkManager(object) :
00102     """ Class to in charge of managing the tasks and distributing them to
00103         the workers. They can be local (using other cores) or remote
00104         using other nodes in the local cluster """
00105 
00106     def __init__( self, ncpus='autodetect', ppservers=None) :
00107         if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
00108         else :                     self.ncpus = ncpus
00109         if ppservers :
00110             import pp
00111             self.ppservers = ppservers
00112             self.sessions = [ SshSession(srv) for srv in ppservers ]
00113             self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00114             self.mode = 'cluster'
00115         else :
00116             self.pool = multiprocessing.Pool(self.ncpus)
00117             self.mode = 'multicore'
00118         self.stats = {}
00119 
00120     def __del__(self):
00121         if hasattr(self,'server') : self.server.destroy()
00122 
00123     def process(self, task, items, timeout=90000):
00124         if not isinstance(task,Task) :
00125             raise TypeError("task argument needs to be an 'Task' instance")
00126         # --- Call the Local initialialization
00127         task.initializeLocal()
00128         # --- Schedule all the jobs ....
00129         if self.mode == 'cluster' :
00130             jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
00131             for job in jobs :
00132                 result, stat = job()
00133                 task._mergeResults(result)
00134                 self._mergeStatistics(stat)
00135             self._printStatistics()
00136             self.server.print_stats()
00137         elif self.mode == 'multicore' :
00138             start = time.time()
00139             jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00140             for result, stat in  jobs.get(timeout) :
00141                 task._mergeResults(result)
00142                 self._mergeStatistics(stat)
00143             end = time.time()
00144             self._printStatistics()
00145             print 'Time elapsed since server creation %f' %(end-start)
00146         # --- Call the Local Finalize
00147         task.finalize()
00148     def _printStatistics(self):
00149         njobs = 0
00150         for stat in self.stats.values():
00151             njobs += stat.njob
00152         print 'Job execution statistics:'
00153         print 'job count | % of all jobs | job time sum | time per job | job server'
00154         for name, stat  in self.stats.items():
00155             print '       %d |        %6.2f |     %8.3f |    %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00156 
00157     def _mergeStatistics(self, stat):
00158         if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00159         s = self.stats[stat.name]
00160         s.time += stat.time
00161         s.njob += 1
00162 
00163 
00164 class SshSession(object) :
00165     def __init__(self, hostname):
00166         import pyssh
00167         import pp
00168         self.host = hostname
00169         ppprefix =  os.path.dirname(os.path.dirname(pp.__file__))
00170         self.session = pyssh.Ssh(host=hostname)
00171         self.session.open()
00172         self.session.read_lazy()
00173         self.session.write('cd %s\n' % os.getcwd())
00174         self.session.read_lazy()
00175         self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00176         self.session.read_lazy()
00177         self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00178         self.session.read_lazy()
00179         self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00180         self.session.read_lazy()
00181         self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00182         self.session.read_lazy()
00183         self.session.read_lazy()
00184         print 'started ppserver in ', hostname
00185     def __del__(self):
00186         self.session.close()
00187         print 'killed ppserver in ', self.host
00188 
00189 # == EOF ====================================================================================
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Mon Sep 17 2012 13:49:35 for Gaudi Framework, version v23r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004