13 """ GaudiMP.Parallel module.
14 This module provides 'parallel' processing support for GaudiPyhton.
15 It is adding some sugar on top of public domain packages such as
16 the 'multiprocessing' or the 'pp' packages. The interface can be made
17 independent of the underlying implementation package.
18 Two main class are defined: Task and WorkManager
20 from __future__
import print_function
22 __all__ = [
'Task',
'WorkManager']
23 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
29 import multiprocessing
33 return f((task, item))
41 if not task.__class__._initializeDone:
42 for k, v
in task.environ.items():
43 if k
not in excluded_varnames:
45 task.initializeRemote()
46 task.__class__._initializeDone =
True
53 return (copy.deepcopy(task.output), stat)
58 self.
name = os.getenv(
'HOSTNAME')
68 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69 User class much inherit from it and implement the methods initializeLocal,
70 initializeRemote, process and finalize. """
71 _initializeDone =
False
74 task = object.__new__(cls)
77 for k, v
in os.environ.items():
79 task.cwd = os.getcwd()
95 if type(result)
is not type(self.output):
96 raise TypeError(
"output type is not same as obtained result")
98 if not hasattr(result,
'__iter__'):
99 if hasattr(self.output,
'Add'):
100 self.output.Add(result)
101 elif hasattr(self.output,
'__iadd__'):
102 self.output += result
103 elif hasattr(self.output,
'__add__'):
106 raise TypeError(
'result cannot be added')
108 elif type(result)
is dict:
112 minkeys = result.keys()
113 for key
in result.keys():
115 if hasattr(self.
output[key],
'Add'):
116 self.
output[key].Add(result[key])
117 elif hasattr(self.
output[key],
'__iadd__'):
118 self.
output[key] += result[key]
119 elif hasattr(self.
output[key],
'__add__'):
122 raise TypeError(
'result cannot be added')
124 self.
output[key] = result[key]
128 if hasattr(self.
output[i],
'Add'):
129 self.
output[i].Add(result[i])
130 elif hasattr(self.
output[i],
'__iadd__'):
131 self.
output[i] += result[i]
132 elif hasattr(self.
output[i],
'__add__'):
135 raise TypeError(
'result cannot be added')
141 if hasattr(o,
'Reset'):
146 """ Class to in charge of managing the tasks and distributing them to
147 the workers. They can be local (using other cores) or remote
148 using other nodes in the local cluster """
150 def __init__(self, ncpus='autodetect', ppservers=None):
151 if ncpus ==
'autodetect':
152 self.
ncpus = multiprocessing.cpu_count()
163 self.
mode =
'multicore'
167 if hasattr(self,
'server'):
170 def process(self, task, items, timeout=90000):
171 if not isinstance(task, Task):
172 raise TypeError(
"task argument needs to be an 'Task' instance")
174 task.initializeLocal()
176 if self.
mode ==
'cluster':
178 self.
server.submit(_prefunction, (_ppfunction, task, item), (),
179 (
'GaudiMP.Parallel',
'time'))
184 task._mergeResults(result)
188 elif self.
mode ==
'multicore':
190 jobs = self.
pool.map_async(_ppfunction,
191 zip([task
for i
in items], items))
192 for result, stat
in jobs.get(timeout):
193 task._mergeResults(result)
197 print(
'Time elapsed since server creation %f' % (end - start))
203 for stat
in self.
stats.values():
205 print(
'Job execution statistics:')
207 'job count | % of all jobs | job time sum | time per job | job server'
210 print(
' %d | %6.2f | %8.3f | %8.3f | %s' %
211 (stat.njob, 100. * stat.njob / njobs, stat.time,
212 stat.time / stat.njob, name))
215 if stat.name
not in self.
stats:
217 s = self.
stats[stat.name]
227 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
231 self.
session.write(
'cd %s\n' % os.getcwd())
233 self.
session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
236 'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
238 self.
session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
240 self.
session.write(
'%s %s/scripts-%s/ppserver.py \n' %
241 (sys.executable, ppprefix, sys.version.split()[0]))
244 print(
'started ppserver in ', hostname)
248 print(
'killed ppserver in ', self.
host)