3 """ GaudiMP.Parallel module. 4 This module provides 'parallel' processing support for GaudiPyhton. 5 It is adding some sugar on top of public domain packages such as 6 the 'multiprocessing' or the 'pp' packages. The interface can be made 7 independent of the underlying implementation package. 8 Two main class are defined: Task and WorkManager 11 __all__ = [
'Task',
'WorkManager']
12 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
18 import multiprocessing
22 return f((task, item))
30 if not task.__class__._initializeDone:
31 for k, v
in task.environ.items():
32 if k
not in excluded_varnames:
34 task.initializeRemote()
35 task.__class__._initializeDone =
True 42 return (copy.deepcopy(task.output), stat)
47 self.
name = os.getenv(
'HOSTNAME')
57 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel. 58 User class much inherit from it and implement the methods initializeLocal, 59 initializeRemote, process and finalize. """ 60 _initializeDone =
False 63 task = object.__new__(cls)
66 for k, v
in os.environ.items():
68 task.cwd = os.getcwd()
84 if type(result)
is not type(self.output):
85 raise TypeError(
"output type is not same as obtained result")
87 if not hasattr(result,
'__iter__'):
88 if hasattr(self.output,
'Add'):
89 self.output.Add(result)
90 elif hasattr(self.output,
'__iadd__'):
92 elif hasattr(self.output,
'__add__'):
95 raise TypeError(
'result cannot be added')
97 elif type(result)
is dict:
98 if self.output.keys() <= result.keys():
99 minkeys = self.output.keys()
101 minkeys = result.keys()
102 for key
in result.keys():
104 if hasattr(self.
output[key],
'Add'):
105 self.
output[key].Add(result[key])
106 elif hasattr(self.
output[key],
'__iadd__'):
107 self.
output[key] += result[key]
108 elif hasattr(self.
output[key],
'__add__'):
111 raise TypeError(
'result cannot be added')
113 self.
output[key] = result[key]
117 if hasattr(self.
output[i],
'Add'):
118 self.
output[i].Add(result[i])
119 elif hasattr(self.
output[i],
'__iadd__'):
120 self.
output[i] += result[i]
121 elif hasattr(self.
output[i],
'__add__'):
124 raise TypeError(
'result cannot be added')
128 dict)
and self.output.values()
or self.
output 130 if hasattr(o,
'Reset'):
135 """ Class to in charge of managing the tasks and distributing them to 136 the workers. They can be local (using other cores) or remote 137 using other nodes in the local cluster """ 139 def __init__(self, ncpus='autodetect', ppservers=None):
140 if ncpus ==
'autodetect':
141 self.
ncpus = multiprocessing.cpu_count()
152 self.
mode =
'multicore' 156 if hasattr(self,
'server'):
157 self.server.destroy()
159 def process(self, task, items, timeout=90000):
160 if not isinstance(task, Task):
161 raise TypeError(
"task argument needs to be an 'Task' instance")
163 task.initializeLocal()
165 if self.
mode ==
'cluster':
167 self.server.submit(_prefunction, (_ppfunction, task, item), (),
168 (
'GaudiMP.Parallel',
'time'))
173 task._mergeResults(result)
176 self.server.print_stats()
177 elif self.
mode ==
'multicore':
179 jobs = self.pool.map_async(_ppfunction,
180 zip([task
for i
in items], items))
181 for result, stat
in jobs.get(timeout):
182 task._mergeResults(result)
186 print 'Time elapsed since server creation %f' % (end - start)
192 for stat
in self.stats.values():
194 print 'Job execution statistics:' 195 print 'job count | % of all jobs | job time sum | time per job | job server' 196 for name, stat
in self.stats.items():
197 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (
198 stat.njob, 100. * stat.njob / njobs, stat.time,
199 stat.time / stat.njob, name)
202 if stat.name
not in self.
stats:
204 s = self.
stats[stat.name]
214 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
217 self.session.read_lazy()
218 self.session.write(
'cd %s\n' % os.getcwd())
219 self.session.read_lazy()
220 self.session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
221 self.session.read_lazy()
223 'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
224 self.session.read_lazy()
225 self.session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
226 self.session.read_lazy()
227 self.session.write(
'%s %s/scripts-%s/ppserver.py \n' %
228 (sys.executable, ppprefix, sys.version.split()[0]))
229 self.session.read_lazy()
230 self.session.read_lazy()
231 print 'started ppserver in ', hostname
235 print 'killed ppserver in ', self.
host def _mergeStatistics(self, stat)
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
def _printStatistics(self)
def __init__(self, ncpus='autodetect', ppservers=None)
def initializeLocal(self)
def __init__(self, hostname)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def process(self, task, items, timeout=90000)
def __new__(cls, args, kwargs)
def _prefunction(f, task, item)
def _mergeResults(self, result)
def initializeRemote(self)