4 """ GaudiMP.Parallel module. 5 This module provides 'parallel' processing support for GaudiPyhton. 6 It is adding some sugar on top of public domain packages such as 7 the 'multiprocessing' or the 'pp' packages. The interface can be made 8 independent of the underlying implementation package. 9 Two main class are defined: Task and WorkManager 12 __all__ = [
'Task',
'WorkManager']
13 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
19 import multiprocessing
23 return f((task, item))
31 if not task.__class__._initializeDone:
32 for k, v
in task.environ.items():
33 if k
not in excluded_varnames:
35 task.initializeRemote()
36 task.__class__._initializeDone =
True 43 return (copy.deepcopy(task.output), stat)
48 self.
name = os.getenv(
'HOSTNAME')
58 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel. 59 User class much inherit from it and implement the methods initializeLocal, 60 initializeRemote, process and finalize. """ 61 _initializeDone =
False 64 task = object.__new__(cls)
67 for k, v
in os.environ.items():
69 task.cwd = os.getcwd()
85 if type(result)
is not type(self.output):
86 raise TypeError(
"output type is not same as obtained result")
88 if not hasattr(result,
'__iter__'):
89 if hasattr(self.output,
'Add'):
90 self.output.Add(result)
91 elif hasattr(self.output,
'__iadd__'):
93 elif hasattr(self.output,
'__add__'):
96 raise TypeError(
'result cannot be added')
98 elif type(result)
is dict:
99 if self.output.keys() <= result.keys():
100 minkeys = self.output.keys()
102 minkeys = result.keys()
103 for key
in result.keys():
105 if hasattr(self.
output[key],
'Add'):
106 self.
output[key].Add(result[key])
107 elif hasattr(self.
output[key],
'__iadd__'):
108 self.
output[key] += result[key]
109 elif hasattr(self.
output[key],
'__add__'):
112 raise TypeError(
'result cannot be added')
114 self.
output[key] = result[key]
117 for i
in range(min(len(self.
output), len(result))):
118 if hasattr(self.
output[i],
'Add'):
119 self.
output[i].Add(result[i])
120 elif hasattr(self.
output[i],
'__iadd__'):
121 self.
output[i] += result[i]
122 elif hasattr(self.
output[i],
'__add__'):
125 raise TypeError(
'result cannot be added')
129 is dict)
and self.output.values()
or self.
output 131 if hasattr(o,
'Reset'):
136 """ Class to in charge of managing the tasks and distributing them to 137 the workers. They can be local (using other cores) or remote 138 using other nodes in the local cluster """ 140 def __init__(self, ncpus='autodetect', ppservers=None):
141 if ncpus ==
'autodetect':
142 self.
ncpus = multiprocessing.cpu_count()
153 self.
mode =
'multicore' 157 if hasattr(self,
'server'):
158 self.server.destroy()
160 def process(self, task, items, timeout=90000):
161 if not isinstance(task, Task):
162 raise TypeError(
"task argument needs to be an 'Task' instance")
164 task.initializeLocal()
166 if self.
mode ==
'cluster':
167 jobs = [self.server.submit(
168 _prefunction, (_ppfunction, task, item), (), (
'GaudiMP.Parallel',
'time'))
for item
in items]
171 task._mergeResults(result)
174 self.server.print_stats()
175 elif self.
mode ==
'multicore':
177 jobs = self.pool.map_async(
178 _ppfunction, zip([task
for i
in items], items))
179 for result, stat
in jobs.get(timeout):
180 task._mergeResults(result)
184 print 'Time elapsed since server creation %f' % (end - start)
190 for stat
in self.stats.values():
192 print 'Job execution statistics:' 193 print 'job count | % of all jobs | job time sum | time per job | job server' 194 for name, stat
in self.stats.items():
195 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (
196 stat.njob, 100. * stat.njob / njobs, stat.time, stat.time / stat.njob, name)
199 if stat.name
not in self.
stats:
201 s = self.
stats[stat.name]
211 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
214 self.session.read_lazy()
215 self.session.write(
'cd %s\n' % os.getcwd())
216 self.session.read_lazy()
217 self.session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
218 self.session.read_lazy()
219 self.session.write(
'setenv LD_LIBRARY_PATH %s\n' %
220 os.environ[
'LD_LIBRARY_PATH'])
221 self.session.read_lazy()
222 self.session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
223 self.session.read_lazy()
224 self.session.write(
'%s %s/scripts-%s/ppserver.py \n' %
225 (sys.executable, ppprefix, sys.version.split()[0]))
226 self.session.read_lazy()
227 self.session.read_lazy()
228 print 'started ppserver in ', hostname
232 print 'killed ppserver in ', self.
host def _mergeStatistics(self, stat)
def _printStatistics(self)
def __init__(self, ncpus='autodetect', ppservers=None)
def initializeLocal(self)
def __init__(self, hostname)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def process(self, task, items, timeout=90000)
def __new__(cls, args, kwargs)
def _prefunction(f, task, item)
def _mergeResults(self, result)
def initializeRemote(self)