13 """ GaudiMP.Parallel module.
14 This module provides 'parallel' processing support for GaudiPyhton.
15 It is adding some sugar on top of public domain packages such as
16 the 'multiprocessing' or the 'pp' packages. The interface can be made
17 independent of the underlying implementation package.
18 Two main class are defined: Task and WorkManager
20 from __future__
import print_function
22 __all__ = [
"Task",
"WorkManager"]
23 excluded_varnames = [
"HOSTNAME",
"SSH_CLIENT",
"SSH_CONNECTION",
"DISPLAY"]
26 import multiprocessing
33 return f((task, item))
41 if not task.__class__._initializeDone:
42 for k, v
in task.environ.items():
43 if k
not in excluded_varnames:
45 task.initializeRemote()
46 task.__class__._initializeDone =
True
53 return (copy.deepcopy(task.output), stat)
58 self.
name = os.getenv(
"HOSTNAME")
68 """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69 User class much inherit from it and implement the methods initializeLocal,
70 initializeRemote, process and finalize."""
72 _initializeDone =
False
75 task = object.__new__(cls)
78 for k, v
in os.environ.items():
80 task.cwd = os.getcwd()
96 if type(result)
is not type(self.output):
97 raise TypeError(
"output type is not same as obtained result")
99 if not hasattr(result,
"__iter__"):
100 if hasattr(self.output,
"Add"):
101 self.output.Add(result)
102 elif hasattr(self.output,
"__iadd__"):
103 self.output += result
104 elif hasattr(self.output,
"__add__"):
107 raise TypeError(
"result cannot be added")
109 elif type(result)
is dict:
113 minkeys = result.keys()
114 for key
in result.keys():
116 if hasattr(self.
output[key],
"Add"):
117 self.
output[key].Add(result[key])
118 elif hasattr(self.
output[key],
"__iadd__"):
119 self.
output[key] += result[key]
120 elif hasattr(self.
output[key],
"__add__"):
123 raise TypeError(
"result cannot be added")
125 self.
output[key] = result[key]
129 if hasattr(self.
output[i],
"Add"):
130 self.
output[i].Add(result[i])
131 elif hasattr(self.
output[i],
"__iadd__"):
132 self.
output[i] += result[i]
133 elif hasattr(self.
output[i],
"__add__"):
136 raise TypeError(
"result cannot be added")
141 if hasattr(o,
"Reset"):
146 """Class to in charge of managing the tasks and distributing them to
147 the workers. They can be local (using other cores) or remote
148 using other nodes in the local cluster"""
150 def __init__(self, ncpus="autodetect", ppservers=None):
151 if ncpus ==
"autodetect":
152 self.
ncpus = multiprocessing.cpu_count()
164 self.
mode =
"multicore"
168 if hasattr(self,
"server"):
170 if hasattr(self,
"pool"):
173 def process(self, task, items, timeout=90000):
174 if not isinstance(task, Task):
175 raise TypeError(
"task argument needs to be an 'Task' instance")
177 task.initializeLocal()
179 if self.
mode ==
"cluster":
183 (_ppfunction, task, item),
185 (
"GaudiMP.Parallel",
"time"),
191 task._mergeResults(result)
195 elif self.
mode ==
"multicore":
197 jobs = self.
pool.map_async(_ppfunction, zip([task
for i
in items], items))
198 for result, stat
in jobs.get(timeout):
199 task._mergeResults(result)
203 print(
"Time elapsed since server creation %f" % (end - start))
209 for stat
in self.
stats.values():
211 print(
"Job execution statistics:")
212 print(
"job count | % of all jobs | job time sum | time per job | job server")
215 " %d | %6.2f | %8.3f | %8.3f | %s"
218 100.0 * stat.njob / njobs,
220 stat.time / stat.njob,
226 if stat.name
not in self.
stats:
228 s = self.
stats[stat.name]
239 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
243 self.
session.write(
"cd %s\n" % os.getcwd())
245 self.
session.write(
"setenv PYTHONPATH %s\n" % os.environ[
"PYTHONPATH"])
248 "setenv LD_LIBRARY_PATH %s\n" % os.environ[
"LD_LIBRARY_PATH"]
251 self.
session.write(
"setenv ROOTSYS %s\n" % os.environ[
"ROOTSYS"])
254 "%s %s/scripts-%s/ppserver.py \n"
255 % (sys.executable, ppprefix, sys.version.split()[0])
259 print(
"started ppserver in ", hostname)
263 print(
"killed ppserver in ", self.
host)