13 """GaudiMP.Parallel module.
14 This module provides 'parallel' processing support for GaudiPyhton.
15 It is adding some sugar on top of public domain packages such as
16 the 'multiprocessing' or the 'pp' packages. The interface can be made
17 independent of the underlying implementation package.
18 Two main class are defined: Task and WorkManager
21 __all__ = [
"Task",
"WorkManager"]
22 excluded_varnames = [
"HOSTNAME",
"SSH_CLIENT",
"SSH_CONNECTION",
"DISPLAY"]
25 import multiprocessing
32 return f((task, item))
40 if not task.__class__._initializeDone:
41 for k, v
in task.environ.items():
42 if k
not in excluded_varnames:
44 task.initializeRemote()
45 task.__class__._initializeDone =
True
52 return (copy.deepcopy(task.output), stat)
57 self.
name = os.getenv(
"HOSTNAME")
67 """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
68 User class much inherit from it and implement the methods initializeLocal,
69 initializeRemote, process and finalize."""
71 _initializeDone =
False
74 task = object.__new__(cls)
77 for k, v
in os.environ.items():
79 task.cwd = os.getcwd()
95 if not isinstance(result,
type(self.output)):
96 raise TypeError(
"output type is not same as obtained result")
98 if not hasattr(result,
"__iter__"):
99 if hasattr(self.output,
"Add"):
100 self.output.Add(result)
101 elif hasattr(self.output,
"__iadd__"):
102 self.output += result
103 elif hasattr(self.output,
"__add__"):
106 raise TypeError(
"result cannot be added")
108 elif isinstance(result, dict):
109 for key
in result.keys():
111 if hasattr(self.
output[key],
"Add"):
112 self.
output[key].Add(result[key])
113 elif hasattr(self.
output[key],
"__iadd__"):
114 self.
output[key] += result[key]
115 elif hasattr(self.
output[key],
"__add__"):
118 raise TypeError(
"result cannot be added")
120 self.
output[key] = result[key]
123 for i
in range(min(len(self.
output), len(result))):
124 if hasattr(self.
output[i],
"Add"):
125 self.
output[i].Add(result[i])
126 elif hasattr(self.
output[i],
"__iadd__"):
127 self.
output[i] += result[i]
128 elif hasattr(self.
output[i],
"__add__"):
131 raise TypeError(
"result cannot be added")
136 if hasattr(o,
"Reset"):
141 """Class to in charge of managing the tasks and distributing them to
142 the workers. They can be local (using other cores) or remote
143 using other nodes in the local cluster"""
145 def __init__(self, ncpus="autodetect", ppservers=None):
146 if ncpus ==
"autodetect":
147 self.
ncpus = multiprocessing.cpu_count()
159 self.
mode =
"multicore"
163 if hasattr(self,
"server"):
165 if hasattr(self,
"pool"):
168 def process(self, task, items, timeout=90000):
169 if not isinstance(task, Task):
170 raise TypeError(
"task argument needs to be an 'Task' instance")
172 task.initializeLocal()
174 if self.
mode ==
"cluster":
178 (_ppfunction, task, item),
180 (
"GaudiMP.Parallel",
"time"),
186 task._mergeResults(result)
190 elif self.
mode ==
"multicore":
192 jobs = self.
pool.map_async(_ppfunction, zip([task
for i
in items], items))
193 for result, stat
in jobs.get(timeout):
194 task._mergeResults(result)
198 print(
"Time elapsed since server creation %f" % (end - start))
204 for stat
in self.
stats.values():
206 print(
"Job execution statistics:")
207 print(
"job count | % of all jobs | job time sum | time per job | job server")
208 for name, stat
in self.
stats.items():
210 " %d | %6.2f | %8.3f | %8.3f | %s"
213 100.0 * stat.njob / njobs,
215 stat.time / stat.njob,
221 if stat.name
not in self.
stats:
223 s = self.
stats[stat.name]
234 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
238 self.
session.write(
"cd %s\n" % os.getcwd())
240 self.
session.write(
"setenv PYTHONPATH %s\n" % os.environ[
"PYTHONPATH"])
243 "setenv LD_LIBRARY_PATH %s\n" % os.environ[
"LD_LIBRARY_PATH"]
246 self.
session.write(
"setenv ROOTSYS %s\n" % os.environ[
"ROOTSYS"])
249 "%s %s/scripts-%s/ppserver.py \n"
250 % (sys.executable, ppprefix, sys.version.split()[0])
254 print(
"started ppserver in ", hostname)
258 print(
"killed ppserver in ", self.
host)