![]() |
|
|
Generated: 8 Jan 2009 |
00001 # File: GaudiPython/Parallel.py 00002 # Author: Pere Mato (pere.mato@cern.ch) 00003 00004 """ GaudiPython.Parallel module. 00005 This module provides 'parallel' processing support for GaudiPyhton. 00006 It is adding some sugar on top of public domain packages such as 00007 the 'processing' or the 'pp' packages. The interface can be made 00008 independent of the underlying implementation package. 00009 Two main class are defined: Task and WorkManager 00010 """ 00011 00012 __all__ = [ 'Task','WorkManager' ] 00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY'] 00014 00015 import sys, os, time, copy 00016 00017 def _prefunction( f, task, item) : 00018 return f((task,item)) 00019 def _ppfunction( args ) : 00020 #--- Unpack arguments 00021 task, item = args 00022 stat = Statistics() 00023 #--- Initialize the remote side (at least once) 00024 if not task.__class__._initializeDone : 00025 for k,v in task.environ.items() : 00026 if k not in excluded_varnames : os.environ[k] = v 00027 task.initializeRemote() 00028 task.__class__._initializeDone = True 00029 #--- Reset the task output 00030 task._resetOutput() 00031 #--- Call processing 00032 task.process(item) 00033 #--- Collect statistics 00034 stat.stop() 00035 return (copy.deepcopy(task.output), stat) 00036 00037 def _detect_ncpus(): 00038 """Detects the number of effective CPUs in the system""" 00039 #for Linux, Unix and MacOS 00040 if hasattr(os, "sysconf"): 00041 if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): 00042 #Linux and Unix 00043 ncpus = os.sysconf("SC_NPROCESSORS_ONLN") 00044 if isinstance(ncpus, int) and ncpus > 0: 00045 return ncpus 00046 else: 00047 #MacOS X 00048 return int(os.popen2("sysctl -n hw.ncpu")[1].read()) 00049 #for Windows 00050 if os.environ.has_key("NUMBER_OF_PROCESSORS"): 00051 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); 00052 if ncpus > 0: 00053 return ncpus 00054 #return the default value 00055 return 1 00056 00057 class Statistics(object): 00058 def __init__(self): 00059 self.name = os.getenv('HOSTNAME') 00060 self.start = time.time() 00061 self.time = 0.0 00062 self.njob = 0 00063 def stop(self): 00064 self.time = time.time() - self.start 00065 00066 class Task(object) : 00067 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel. 00068 User class much inherit from it and implement the methods initializeLocal, 00069 initializeRemote, process and finalize. """ 00070 _initializeDone = False 00071 def __new__ ( cls, *args, **kwargs ): 00072 task = object.__new__( cls, *args, **kwargs ) 00073 task.output = () 00074 task.environ = {} 00075 for k,v in os.environ.items(): task.environ[k] = v 00076 task.cwd = os.getcwd() 00077 return task 00078 def initializeLocal(self): 00079 pass 00080 def initializeRemote(self): 00081 pass 00082 def process(self, item): 00083 pass 00084 def finalize(self) : 00085 pass 00086 def _mergeResults(self, result) : 00087 if type(result) is not type(self.output) : 00088 raise TypeError("output type is not same as obtained result") 00089 #--No iteratable--- 00090 if not hasattr( result , '__iter__' ): 00091 if hasattr(self.output,'Add') : self.output.Add(result) 00092 elif hasattr(self.output,'__iadd__') : self.output += result 00093 elif hasattr(self.output,'__add__') : self.output = self.output + result 00094 else : raise TypeError('result cannot be added') 00095 #--Dictionary--- 00096 elif type(result) is dict : 00097 if self.output.keys() <= result.keys(): minkeys = self.output.keys() 00098 else: minkeys = result.keys() 00099 for key in result.keys() : 00100 if key in self.output : 00101 if hasattr(self.output[key],'Add') : self.output[key].Add(result[key]) 00102 elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key] 00103 elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key] 00104 else : raise TypeError('result cannot be added') 00105 else : 00106 self.output[key] = result[key] 00107 #--Anything else (list) 00108 else : 00109 for i in range( min( len(self.output) , len(result)) ): 00110 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i]) 00111 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i] 00112 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i] 00113 else : raise TypeError('result cannot be added') 00114 def _resetOutput(self): 00115 output = (type(self.output) is dict) and self.output.values() or self.output 00116 for o in output : 00117 if hasattr(o, 'Reset'): o.Reset() 00118 00119 00120 class WorkManager(object) : 00121 """ Class to in charge of managing the tasks and distributing them to 00122 the workers. They can be local (using other cores) or remote 00123 using other nodes in the local cluster """ 00124 00125 def __init__( self, ncpus='autodetect', ppservers=None) : 00126 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus() 00127 else : self.ncpus = ncpus 00128 if ppservers : 00129 import pp 00130 self.ppservers = ppservers 00131 self.sessions = [ SshSession(srv) for srv in ppservers ] 00132 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers) 00133 self.mode = 'cluster' 00134 else : 00135 import processing 00136 self.pool = processing.Pool(self.ncpus) 00137 self.mode = 'multicore' 00138 self.stats = {} 00139 00140 def __del__(self): 00141 if hasattr(self,'server') : self.server.destroy() 00142 00143 def process(self, task, items, timeout=90000): 00144 if not isinstance(task,Task) : 00145 raise TypeError("task argument needs to be an 'Task' instance") 00146 # --- Call the Local initialialization 00147 task.initializeLocal() 00148 # --- Schedule all the jobs .... 00149 if self.mode == 'cluster' : 00150 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items] 00151 for job in jobs : 00152 result, stat = job() 00153 task._mergeResults(result) 00154 self._mergeStatistics(stat) 00155 self._printStatistics() 00156 self.server.print_stats() 00157 elif self.mode == 'multicore' : 00158 start = time.time() 00159 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items )) 00160 for result, stat in jobs.get(timeout) : 00161 task._mergeResults(result) 00162 self._mergeStatistics(stat) 00163 end = time.time() 00164 self._printStatistics() 00165 print 'Time elapsed since server creation %f' %(end-start) 00166 # --- Call te Local Finalize 00167 task.finalize() 00168 def _printStatistics(self): 00169 njobs = 0 00170 for stat in self.stats.values(): 00171 njobs += stat.njob 00172 print 'Job execution statistics:' 00173 print 'job count | % of all jobs | job time sum | time per job | job server' 00174 for name, stat in self.stats.items(): 00175 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name) 00176 00177 def _mergeStatistics(self, stat): 00178 if stat.name not in self.stats : self.stats[stat.name] = Statistics() 00179 s = self.stats[stat.name] 00180 s.time += stat.time 00181 s.njob += 1 00182 00183 00184 class SshSession(object) : 00185 def __init__(self, hostname): 00186 import pyssh 00187 import pp 00188 self.host = hostname 00189 ppprefix = os.path.dirname(os.path.dirname(pp.__file__)) 00190 self.session = pyssh.Ssh(host=hostname) 00191 self.session.open() 00192 self.session.read_lazy() 00193 self.session.write('cd %s\n' % os.getcwd()) 00194 self.session.read_lazy() 00195 self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH']) 00196 self.session.read_lazy() 00197 self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH']) 00198 self.session.read_lazy() 00199 self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS']) 00200 self.session.read_lazy() 00201 self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] )) 00202 self.session.read_lazy() 00203 self.session.read_lazy() 00204 print 'started ppserver in ', hostname 00205 def __del__(self): 00206 self.session.close() 00207 print 'killed ppserver in ', self.host