00001
00002
00003
00004 """ GaudiMP.Parallel module.
00005 This module provides 'parallel' processing support for GaudiPyhton.
00006 It is adding some sugar on top of public domain packages such as
00007 the 'multiprocessing' or the 'pp' packages. The interface can be made
00008 independent of the underlying implementation package.
00009 Two main class are defined: Task and WorkManager
00010 """
00011
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014
00015 import sys, os, time, copy
00016 import multiprocessing
00017
00018 def _prefunction( f, task, item) :
00019 return f((task,item))
00020 def _ppfunction( args ) :
00021
00022 task, item = args
00023 stat = Statistics()
00024
00025 if not task.__class__._initializeDone :
00026 for k,v in task.environ.items() :
00027 if k not in excluded_varnames : os.environ[k] = v
00028 task.initializeRemote()
00029 task.__class__._initializeDone = True
00030
00031 task._resetOutput()
00032
00033 task.process(item)
00034
00035 stat.stop()
00036 return (copy.deepcopy(task.output), stat)
00037
00038 class Statistics(object):
00039 def __init__(self):
00040 self.name = os.getenv('HOSTNAME')
00041 self.start = time.time()
00042 self.time = 0.0
00043 self.njob = 0
00044 def stop(self):
00045 self.time = time.time() - self.start
00046
00047 class Task(object) :
00048 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00049 User class much inherit from it and implement the methods initializeLocal,
00050 initializeRemote, process and finalize. """
00051 _initializeDone = False
00052 def __new__ ( cls, *args, **kwargs ):
00053 task = object.__new__( cls )
00054 task.output = ()
00055 task.environ = {}
00056 for k,v in os.environ.items(): task.environ[k] = v
00057 task.cwd = os.getcwd()
00058 return task
00059 def initializeLocal(self):
00060 pass
00061 def initializeRemote(self):
00062 pass
00063 def process(self, item):
00064 pass
00065 def finalize(self) :
00066 pass
00067 def _mergeResults(self, result) :
00068 if type(result) is not type(self.output) :
00069 raise TypeError("output type is not same as obtained result")
00070
00071 if not hasattr( result , '__iter__' ):
00072 if hasattr(self.output,'Add') : self.output.Add(result)
00073 elif hasattr(self.output,'__iadd__') : self.output += result
00074 elif hasattr(self.output,'__add__') : self.output = self.output + result
00075 else : raise TypeError('result cannot be added')
00076
00077 elif type(result) is dict :
00078 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00079 else: minkeys = result.keys()
00080 for key in result.keys() :
00081 if key in self.output :
00082 if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00083 elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00084 elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00085 else : raise TypeError('result cannot be added')
00086 else :
00087 self.output[key] = result[key]
00088
00089 else :
00090 for i in range( min( len(self.output) , len(result)) ):
00091 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00092 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00093 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00094 else : raise TypeError('result cannot be added')
00095 def _resetOutput(self):
00096 output = (type(self.output) is dict) and self.output.values() or self.output
00097 for o in output :
00098 if hasattr(o, 'Reset'): o.Reset()
00099
00100
00101 class WorkManager(object) :
00102 """ Class to in charge of managing the tasks and distributing them to
00103 the workers. They can be local (using other cores) or remote
00104 using other nodes in the local cluster """
00105
00106 def __init__( self, ncpus='autodetect', ppservers=None) :
00107 if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
00108 else : self.ncpus = ncpus
00109 if ppservers :
00110 import pp
00111 self.ppservers = ppservers
00112 self.sessions = [ SshSession(srv) for srv in ppservers ]
00113 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00114 self.mode = 'cluster'
00115 else :
00116 self.pool = multiprocessing.Pool(self.ncpus)
00117 self.mode = 'multicore'
00118 self.stats = {}
00119
00120 def __del__(self):
00121 if hasattr(self,'server') : self.server.destroy()
00122
00123 def process(self, task, items, timeout=90000):
00124 if not isinstance(task,Task) :
00125 raise TypeError("task argument needs to be an 'Task' instance")
00126
00127 task.initializeLocal()
00128
00129 if self.mode == 'cluster' :
00130 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
00131 for job in jobs :
00132 result, stat = job()
00133 task._mergeResults(result)
00134 self._mergeStatistics(stat)
00135 self._printStatistics()
00136 self.server.print_stats()
00137 elif self.mode == 'multicore' :
00138 start = time.time()
00139 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00140 for result, stat in jobs.get(timeout) :
00141 task._mergeResults(result)
00142 self._mergeStatistics(stat)
00143 end = time.time()
00144 self._printStatistics()
00145 print 'Time elapsed since server creation %f' %(end-start)
00146
00147 task.finalize()
00148 def _printStatistics(self):
00149 njobs = 0
00150 for stat in self.stats.values():
00151 njobs += stat.njob
00152 print 'Job execution statistics:'
00153 print 'job count | % of all jobs | job time sum | time per job | job server'
00154 for name, stat in self.stats.items():
00155 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00156
00157 def _mergeStatistics(self, stat):
00158 if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00159 s = self.stats[stat.name]
00160 s.time += stat.time
00161 s.njob += 1
00162
00163
00164 class SshSession(object) :
00165 def __init__(self, hostname):
00166 import pyssh
00167 import pp
00168 self.host = hostname
00169 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
00170 self.session = pyssh.Ssh(host=hostname)
00171 self.session.open()
00172 self.session.read_lazy()
00173 self.session.write('cd %s\n' % os.getcwd())
00174 self.session.read_lazy()
00175 self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00176 self.session.read_lazy()
00177 self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00178 self.session.read_lazy()
00179 self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00180 self.session.read_lazy()
00181 self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00182 self.session.read_lazy()
00183 self.session.read_lazy()
00184 print 'started ppserver in ', hostname
00185 def __del__(self):
00186 self.session.close()
00187 print 'killed ppserver in ', self.host
00188
00189