13"""GaudiMP.Parallel module.
14This module provides 'parallel' processing support for GaudiPyhton.
15It is adding some sugar on top of public domain packages such as
16the 'multiprocessing' or the 'pp' packages. The interface can be made
17independent of the underlying implementation package.
18Two main class are defined: Task and WorkManager
21__all__ = [
"Task",
"WorkManager"]
22excluded_varnames = [
"HOSTNAME",
"SSH_CLIENT",
"SSH_CONNECTION",
"DISPLAY"]
32 return f((task, item))
40 if not task.__class__._initializeDone:
41 for k, v
in task.environ.items():
42 if k
not in excluded_varnames:
44 task.initializeRemote()
45 task.__class__._initializeDone =
True
52 return (copy.deepcopy(task.output), stat)
57 self.
name = os.getenv(
"HOSTNAME")
67 """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
68 User class much inherit from it and implement the methods initializeLocal,
69 initializeRemote, process and finalize."""
71 _initializeDone =
False
74 task = object.__new__(cls)
77 for k, v
in os.environ.items():
79 task.cwd = os.getcwd()
95 if not isinstance(result, type(self.
output)):
96 raise TypeError(
"output type is not same as obtained result")
98 if not hasattr(result,
"__iter__"):
99 if hasattr(self.
output,
"Add"):
101 elif hasattr(self.
output,
"__iadd__"):
103 elif hasattr(self.
output,
"__add__"):
106 raise TypeError(
"result cannot be added")
108 elif isinstance(result, dict):
109 for key
in result.keys():
111 if hasattr(self.
output[key],
"Add"):
112 self.
output[key].Add(result[key])
113 elif hasattr(self.
output[key],
"__iadd__"):
114 self.
output[key] += result[key]
115 elif hasattr(self.
output[key],
"__add__"):
118 raise TypeError(
"result cannot be added")
120 self.
output[key] = result[key]
123 for i
in range(min(len(self.
output), len(result))):
124 if hasattr(self.
output[i],
"Add"):
125 self.
output[i].Add(result[i])
126 elif hasattr(self.
output[i],
"__iadd__"):
127 self.
output[i] += result[i]
128 elif hasattr(self.
output[i],
"__add__"):
131 raise TypeError(
"result cannot be added")
136 if hasattr(o,
"Reset"):
141 """Class to in charge of managing the tasks and distributing them to
142 the workers. They can be local (using other cores) or remote
143 using other nodes in the local cluster"""
145 def __init__(self, ncpus="autodetect", ppservers=None):
146 if ncpus == "autodetect":
147 self.ncpus = multiprocessing.cpu_count()
153 self.ppservers = ppservers
154 self.sessions = [SshSession(srv) for srv in ppservers]
155 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
156 self.mode = "cluster"
158 self.pool = multiprocessing.Pool(self.ncpus)
159 self.mode = "multicore"
163 if hasattr(self, "server"):
164 self.server.destroy()
165 if hasattr(self, "pool"):
168 def process(self, task, items, timeout=90000):
169 if not isinstance(task, Task):
170 raise TypeError("task argument needs to be an 'Task' instance")
171 # --- Call the Local initialialization
172 task.initializeLocal()
173 # --- Schedule all the jobs ....
174 if self.mode == "cluster":
178 (_ppfunction, task, item),
180 ("GaudiMP.Parallel", "time"),
186 task._mergeResults(result)
187 self._mergeStatistics(stat)
188 self._printStatistics()
189 self.server.print_stats()
190 elif self.mode == "multicore":
192 jobs = self.pool.map_async(_ppfunction, zip([task for i in items], items))
193 for result, stat in jobs.get(timeout):
194 task._mergeResults(result)
195 self._mergeStatistics(stat)
197 self._printStatistics()
198 print("Time elapsed since server creation %f" % (end - start))
199 # --- Call the Local Finalize
202 def _printStatistics(self):
204 for stat in self.stats.values():
206 print("Job execution statistics:")
207 print("job count | % of all jobs | job time sum | time per job | job server")
208 for name, stat in self.stats.items():
210 " %d | %6.2f | %8.3f | %8.3f | %s"
213 100.0 * stat.njob / njobs,
215 stat.time / stat.njob,
220 def _mergeStatistics(self, stat):
221 if stat.name not in self.stats:
222 self.stats[stat.name] = Statistics()
223 s = self.stats[stat.name]
228class SshSession(object):
229 def __init__(self, hostname):
234 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
235 self.session = pyssh.Ssh(host=hostname)
237 self.session.read_lazy()
238 self.session.write("cd %s\n" % os.getcwd())
239 self.session.read_lazy()
240 self.session.write("setenv PYTHONPATH %s\n" % os.environ["PYTHONPATH"])
241 self.session.read_lazy()
243 "setenv LD_LIBRARY_PATH %s\n" % os.environ["LD_LIBRARY_PATH"]
245 self.session.read_lazy()
246 self.session.write("setenv ROOTSYS %s\n" % os.environ["ROOTSYS"])
247 self.session.read_lazy()
249 "%s %s/scripts-%s/ppserver.py \n"
250 % (sys.executable, ppprefix, sys.version.split()[0])
252 self.session.read_lazy()
253 self.session.read_lazy()
254 print("started ppserver in ", hostname)
258 print("killed ppserver in ", self.host)
__new__(cls, *args, **kwargs)
_mergeResults(self, result)
_prefunction(f, task, item)