The Gaudi Framework  master (181af51f)
Loading...
Searching...
No Matches
Parallel.py
Go to the documentation of this file.
13"""GaudiMP.Parallel module.
14This module provides 'parallel' processing support for GaudiPyhton.
15It is adding some sugar on top of public domain packages such as
16the 'multiprocessing' or the 'pp' packages. The interface can be made
17independent of the underlying implementation package.
18Two main class are defined: Task and WorkManager
19"""
20
21__all__ = ["Task", "WorkManager"]
22excluded_varnames = ["HOSTNAME", "SSH_CLIENT", "SSH_CONNECTION", "DISPLAY"]
23
24import copy
25import multiprocessing
26import os
27import sys
28import time
29
30
31def _prefunction(f, task, item):
32 return f((task, item))
33
34
35def _ppfunction(args):
36 # --- Unpack arguments
37 task, item = args
38 stat = Statistics()
39 # --- Initialize the remote side (at least once)
40 if not task.__class__._initializeDone:
41 for k, v in task.environ.items():
42 if k not in excluded_varnames:
43 os.environ[k] = v
44 task.initializeRemote()
45 task.__class__._initializeDone = True
46 # --- Reset the task output
47 task._resetOutput()
48 # --- Call processing
49 task.process(item)
50 # --- Collect statistics
51 stat.stop()
52 return (copy.deepcopy(task.output), stat)
53
54
55class Statistics(object):
56 def __init__(self):
57 self.name = os.getenv("HOSTNAME")
58 self.start = time.time()
59 self.time = 0.0
60 self.njob = 0
61
62 def stop(self):
63 self.time = time.time() - self.start
64
65
66class Task(object):
67 """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
68 User class much inherit from it and implement the methods initializeLocal,
69 initializeRemote, process and finalize."""
70
71 _initializeDone = False
72
73 def __new__(cls, *args, **kwargs):
74 task = object.__new__(cls)
75 task.output = ()
76 task.environ = {}
77 for k, v in os.environ.items():
78 task.environ[k] = v
79 task.cwd = os.getcwd()
80 return task
81
82 def initializeLocal(self):
83 pass
84
86 pass
87
88 def process(self, item):
89 pass
90
91 def finalize(self):
92 pass
93
94 def _mergeResults(self, result):
95 if not isinstance(result, type(self.output)):
96 raise TypeError("output type is not same as obtained result")
97 # --No iteratable---
98 if not hasattr(result, "__iter__"):
99 if hasattr(self.output, "Add"):
100 self.output.Add(result)
101 elif hasattr(self.output, "__iadd__"):
102 self.output += result
103 elif hasattr(self.output, "__add__"):
104 self.output = self.output + result
105 else:
106 raise TypeError("result cannot be added")
107 # --Dictionary---
108 elif isinstance(result, dict):
109 for key in result.keys():
110 if key in self.output:
111 if hasattr(self.output[key], "Add"):
112 self.output[key].Add(result[key])
113 elif hasattr(self.output[key], "__iadd__"):
114 self.output[key] += result[key]
115 elif hasattr(self.output[key], "__add__"):
116 self.output[key] = self.output[key] + result[key]
117 else:
118 raise TypeError("result cannot be added")
119 else:
120 self.output[key] = result[key]
121 # --Anything else (list)
122 else:
123 for i in range(min(len(self.output), len(result))):
124 if hasattr(self.output[i], "Add"):
125 self.output[i].Add(result[i])
126 elif hasattr(self.output[i], "__iadd__"):
127 self.output[i] += result[i]
128 elif hasattr(self.output[i], "__add__"):
129 self.output[i] = self.output[i] + result[i]
130 else:
131 raise TypeError("result cannot be added")
132
133 def _resetOutput(self):
134 output = isinstance(self.output, dict) and self.output.values() or self.output
135 for o in output:
136 if hasattr(o, "Reset"):
137 o.Reset()
138
139
140class WorkManager(object):
141 """Class to in charge of managing the tasks and distributing them to
142 the workers. They can be local (using other cores) or remote
143 using other nodes in the local cluster"""
144
145 def __init__(self, ncpus="autodetect", ppservers=None):
146 if ncpus == "autodetect":
147 self.ncpus = multiprocessing.cpu_count()
148 else:
149 self.ncpus = ncpus
150 if ppservers:
151 import pp
152
153 self.ppservers = ppservers
154 self.sessions = [SshSession(srv) for srv in ppservers]
155 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
156 self.mode = "cluster"
157 else:
158 self.pool = multiprocessing.Pool(self.ncpus)
159 self.mode = "multicore"
160 self.stats = {}
161
162 def __del__(self):
163 if hasattr(self, "server"):
164 self.server.destroy()
165 if hasattr(self, "pool"):
166 self.pool.close()
167
168 def process(self, task, items, timeout=90000):
169 if not isinstance(task, Task):
170 raise TypeError("task argument needs to be an 'Task' instance")
171 # --- Call the Local initialialization
172 task.initializeLocal()
173 # --- Schedule all the jobs ....
174 if self.mode == "cluster":
175 jobs = [
176 self.server.submit(
177 _prefunction,
178 (_ppfunction, task, item),
179 (),
180 ("GaudiMP.Parallel", "time"),
181 )
182 for item in items
183 ]
184 for job in jobs:
185 result, stat = job()
186 task._mergeResults(result)
187 self._mergeStatistics(stat)
188 self._printStatistics()
189 self.server.print_stats()
190 elif self.mode == "multicore":
191 start = time.time()
192 jobs = self.pool.map_async(_ppfunction, zip([task for i in items], items))
193 for result, stat in jobs.get(timeout):
194 task._mergeResults(result)
195 self._mergeStatistics(stat)
196 end = time.time()
197 self._printStatistics()
198 print("Time elapsed since server creation %f" % (end - start))
199 # --- Call the Local Finalize
200 task.finalize()
201
202 def _printStatistics(self):
203 njobs = 0
204 for stat in self.stats.values():
205 njobs += stat.njob
206 print("Job execution statistics:")
207 print("job count | % of all jobs | job time sum | time per job | job server")
208 for name, stat in self.stats.items():
209 print(
210 " %d | %6.2f | %8.3f | %8.3f | %s"
211 % (
212 stat.njob,
213 100.0 * stat.njob / njobs,
214 stat.time,
215 stat.time / stat.njob,
216 name,
217 )
218 )
219
220 def _mergeStatistics(self, stat):
221 if stat.name not in self.stats:
222 self.stats[stat.name] = Statistics()
223 s = self.stats[stat.name]
224 s.time += stat.time
225 s.njob += 1
226
227
228class SshSession(object):
229 def __init__(self, hostname):
230 import pp
231 import pyssh
232
233 self.host = hostname
234 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
235 self.session = pyssh.Ssh(host=hostname)
236 self.session.open()
237 self.session.read_lazy()
238 self.session.write("cd %s\n" % os.getcwd())
239 self.session.read_lazy()
240 self.session.write("setenv PYTHONPATH %s\n" % os.environ["PYTHONPATH"])
241 self.session.read_lazy()
242 self.session.write(
243 "setenv LD_LIBRARY_PATH %s\n" % os.environ["LD_LIBRARY_PATH"]
244 )
245 self.session.read_lazy()
246 self.session.write("setenv ROOTSYS %s\n" % os.environ["ROOTSYS"])
247 self.session.read_lazy()
248 self.session.write(
249 "%s %s/scripts-%s/ppserver.py \n"
250 % (sys.executable, ppprefix, sys.version.split()[0])
251 )
252 self.session.read_lazy()
253 self.session.read_lazy()
254 print("started ppserver in ", hostname)
255
256 def __del__(self):
257 self.session.close()
258 print("killed ppserver in ", self.host)
process(self, item)
Definition Parallel.py:88
__new__(cls, *args, **kwargs)
Definition Parallel.py:73
_mergeResults(self, result)
Definition Parallel.py:94
_prefunction(f, task, item)
Definition Parallel.py:31
_ppfunction(args)
Definition Parallel.py:35