13 """ GaudiMP.Parallel module.
14 This module provides 'parallel' processing support for GaudiPyhton.
15 It is adding some sugar on top of public domain packages such as
16 the 'multiprocessing' or the 'pp' packages. The interface can be made
17 independent of the underlying implementation package.
18 Two main class are defined: Task and WorkManager
20 from __future__
import print_function
22 __all__ = [
"Task",
"WorkManager"]
23 excluded_varnames = [
"HOSTNAME",
"SSH_CLIENT",
"SSH_CONNECTION",
"DISPLAY"]
26 import multiprocessing
33 return f((task, item))
41 if not task.__class__._initializeDone:
42 for k, v
in task.environ.items():
43 if k
not in excluded_varnames:
45 task.initializeRemote()
46 task.__class__._initializeDone =
True
53 return (copy.deepcopy(task.output), stat)
58 self.
name = os.getenv(
"HOSTNAME")
68 """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69 User class much inherit from it and implement the methods initializeLocal,
70 initializeRemote, process and finalize."""
72 _initializeDone =
False
75 task = object.__new__(cls)
78 for k, v
in os.environ.items():
80 task.cwd = os.getcwd()
96 if not isinstance(result,
type(self.output)):
97 raise TypeError(
"output type is not same as obtained result")
99 if not hasattr(result,
"__iter__"):
100 if hasattr(self.output,
"Add"):
101 self.output.Add(result)
102 elif hasattr(self.output,
"__iadd__"):
103 self.output += result
104 elif hasattr(self.output,
"__add__"):
107 raise TypeError(
"result cannot be added")
109 elif type(result)
is dict:
110 for key
in result.keys():
112 if hasattr(self.
output[key],
"Add"):
113 self.
output[key].Add(result[key])
114 elif hasattr(self.
output[key],
"__iadd__"):
115 self.
output[key] += result[key]
116 elif hasattr(self.
output[key],
"__add__"):
119 raise TypeError(
"result cannot be added")
121 self.
output[key] = result[key]
125 if hasattr(self.
output[i],
"Add"):
126 self.
output[i].Add(result[i])
127 elif hasattr(self.
output[i],
"__iadd__"):
128 self.
output[i] += result[i]
129 elif hasattr(self.
output[i],
"__add__"):
132 raise TypeError(
"result cannot be added")
137 if hasattr(o,
"Reset"):
142 """Class to in charge of managing the tasks and distributing them to
143 the workers. They can be local (using other cores) or remote
144 using other nodes in the local cluster"""
146 def __init__(self, ncpus="autodetect", ppservers=None):
147 if ncpus ==
"autodetect":
148 self.
ncpus = multiprocessing.cpu_count()
160 self.
mode =
"multicore"
164 if hasattr(self,
"server"):
166 if hasattr(self,
"pool"):
169 def process(self, task, items, timeout=90000):
170 if not isinstance(task, Task):
171 raise TypeError(
"task argument needs to be an 'Task' instance")
173 task.initializeLocal()
175 if self.
mode ==
"cluster":
179 (_ppfunction, task, item),
181 (
"GaudiMP.Parallel",
"time"),
187 task._mergeResults(result)
191 elif self.
mode ==
"multicore":
193 jobs = self.
pool.map_async(_ppfunction, zip([task
for i
in items], items))
194 for result, stat
in jobs.get(timeout):
195 task._mergeResults(result)
199 print(
"Time elapsed since server creation %f" % (end - start))
205 for stat
in self.
stats.values():
207 print(
"Job execution statistics:")
208 print(
"job count | % of all jobs | job time sum | time per job | job server")
211 " %d | %6.2f | %8.3f | %8.3f | %s"
214 100.0 * stat.njob / njobs,
216 stat.time / stat.njob,
222 if stat.name
not in self.
stats:
224 s = self.
stats[stat.name]
235 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
239 self.
session.write(
"cd %s\n" % os.getcwd())
241 self.
session.write(
"setenv PYTHONPATH %s\n" % os.environ[
"PYTHONPATH"])
244 "setenv LD_LIBRARY_PATH %s\n" % os.environ[
"LD_LIBRARY_PATH"]
247 self.
session.write(
"setenv ROOTSYS %s\n" % os.environ[
"ROOTSYS"])
250 "%s %s/scripts-%s/ppserver.py \n"
251 % (sys.executable, ppprefix, sys.version.split()[0])
255 print(
"started ppserver in ", hostname)
259 print(
"killed ppserver in ", self.
host)