The Gaudi Framework  master (37c0b60a)
Parallel.py
Go to the documentation of this file.
1 
13 """GaudiMP.Parallel module.
14 This module provides 'parallel' processing support for GaudiPyhton.
15 It is adding some sugar on top of public domain packages such as
16 the 'multiprocessing' or the 'pp' packages. The interface can be made
17 independent of the underlying implementation package.
18 Two main class are defined: Task and WorkManager
19 """
20 
21 __all__ = ["Task", "WorkManager"]
22 excluded_varnames = ["HOSTNAME", "SSH_CLIENT", "SSH_CONNECTION", "DISPLAY"]
23 
24 import copy
25 import multiprocessing
26 import os
27 import sys
28 import time
29 
30 
31 def _prefunction(f, task, item):
32  return f((task, item))
33 
34 
35 def _ppfunction(args):
36  # --- Unpack arguments
37  task, item = args
38  stat = Statistics()
39  # --- Initialize the remote side (at least once)
40  if not task.__class__._initializeDone:
41  for k, v in task.environ.items():
42  if k not in excluded_varnames:
43  os.environ[k] = v
44  task.initializeRemote()
45  task.__class__._initializeDone = True
46  # --- Reset the task output
47  task._resetOutput()
48  # --- Call processing
49  task.process(item)
50  # --- Collect statistics
51  stat.stop()
52  return (copy.deepcopy(task.output), stat)
53 
54 
55 class Statistics(object):
56  def __init__(self):
57  self.name = os.getenv("HOSTNAME")
58  self.start = time.time()
59  self.time = 0.0
60  self.njob = 0
61 
62  def stop(self):
63  self.time = time.time() - self.start
64 
65 
66 class Task(object):
67  """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
68  User class much inherit from it and implement the methods initializeLocal,
69  initializeRemote, process and finalize."""
70 
71  _initializeDone = False
72 
73  def __new__(cls, *args, **kwargs):
74  task = object.__new__(cls)
75  task.output = ()
76  task.environ = {}
77  for k, v in os.environ.items():
78  task.environ[k] = v
79  task.cwd = os.getcwd()
80  return task
81 
82  def initializeLocal(self):
83  pass
84 
85  def initializeRemote(self):
86  pass
87 
88  def process(self, item):
89  pass
90 
91  def finalize(self):
92  pass
93 
94  def _mergeResults(self, result):
95  if not isinstance(result, type(self.output)):
96  raise TypeError("output type is not same as obtained result")
97  # --No iteratable---
98  if not hasattr(result, "__iter__"):
99  if hasattr(self.output, "Add"):
100  self.output.Add(result)
101  elif hasattr(self.output, "__iadd__"):
102  self.output += result
103  elif hasattr(self.output, "__add__"):
104  self.output = self.output + result
105  else:
106  raise TypeError("result cannot be added")
107  # --Dictionary---
108  elif isinstance(result, dict):
109  for key in result.keys():
110  if key in self.output:
111  if hasattr(self.output[key], "Add"):
112  self.output[key].Add(result[key])
113  elif hasattr(self.output[key], "__iadd__"):
114  self.output[key] += result[key]
115  elif hasattr(self.output[key], "__add__"):
116  self.output[key] = self.output[key] + result[key]
117  else:
118  raise TypeError("result cannot be added")
119  else:
120  self.output[key] = result[key]
121  # --Anything else (list)
122  else:
123  for i in range(min(len(self.output), len(result))):
124  if hasattr(self.output[i], "Add"):
125  self.output[i].Add(result[i])
126  elif hasattr(self.output[i], "__iadd__"):
127  self.output[i] += result[i]
128  elif hasattr(self.output[i], "__add__"):
129  self.output[i] = self.output[i] + result[i]
130  else:
131  raise TypeError("result cannot be added")
132 
133  def _resetOutput(self):
134  output = isinstance(self.output, dict) and self.output.values() or self.output
135  for o in output:
136  if hasattr(o, "Reset"):
137  o.Reset()
138 
139 
140 class WorkManager(object):
141  """Class to in charge of managing the tasks and distributing them to
142  the workers. They can be local (using other cores) or remote
143  using other nodes in the local cluster"""
144 
145  def __init__(self, ncpus="autodetect", ppservers=None):
146  if ncpus == "autodetect":
147  self.ncpus = multiprocessing.cpu_count()
148  else:
149  self.ncpus = ncpus
150  if ppservers:
151  import pp
152 
153  self.ppservers = ppservers
154  self.sessions = [SshSession(srv) for srv in ppservers]
155  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
156  self.mode = "cluster"
157  else:
158  self.pool = multiprocessing.Pool(self.ncpus)
159  self.mode = "multicore"
160  self.stats = {}
161 
162  def __del__(self):
163  if hasattr(self, "server"):
164  self.server.destroy()
165  if hasattr(self, "pool"):
166  self.pool.close()
167 
168  def process(self, task, items, timeout=90000):
169  if not isinstance(task, Task):
170  raise TypeError("task argument needs to be an 'Task' instance")
171  # --- Call the Local initialialization
172  task.initializeLocal()
173  # --- Schedule all the jobs ....
174  if self.mode == "cluster":
175  jobs = [
176  self.server.submit(
177  _prefunction,
178  (_ppfunction, task, item),
179  (),
180  ("GaudiMP.Parallel", "time"),
181  )
182  for item in items
183  ]
184  for job in jobs:
185  result, stat = job()
186  task._mergeResults(result)
187  self._mergeStatistics(stat)
188  self._printStatistics()
189  self.server.print_stats()
190  elif self.mode == "multicore":
191  start = time.time()
192  jobs = self.pool.map_async(_ppfunction, zip([task for i in items], items))
193  for result, stat in jobs.get(timeout):
194  task._mergeResults(result)
195  self._mergeStatistics(stat)
196  end = time.time()
197  self._printStatistics()
198  print("Time elapsed since server creation %f" % (end - start))
199  # --- Call the Local Finalize
200  task.finalize()
201 
202  def _printStatistics(self):
203  njobs = 0
204  for stat in self.stats.values():
205  njobs += stat.njob
206  print("Job execution statistics:")
207  print("job count | % of all jobs | job time sum | time per job | job server")
208  for name, stat in self.stats.items():
209  print(
210  " %d | %6.2f | %8.3f | %8.3f | %s"
211  % (
212  stat.njob,
213  100.0 * stat.njob / njobs,
214  stat.time,
215  stat.time / stat.njob,
216  name,
217  )
218  )
219 
220  def _mergeStatistics(self, stat):
221  if stat.name not in self.stats:
222  self.stats[stat.name] = Statistics()
223  s = self.stats[stat.name]
224  s.time += stat.time
225  s.njob += 1
226 
227 
228 class SshSession(object):
229  def __init__(self, hostname):
230  import pp
231  import pyssh
232 
233  self.host = hostname
234  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
235  self.session = pyssh.Ssh(host=hostname)
236  self.session.open()
237  self.session.read_lazy()
238  self.session.write("cd %s\n" % os.getcwd())
239  self.session.read_lazy()
240  self.session.write("setenv PYTHONPATH %s\n" % os.environ["PYTHONPATH"])
241  self.session.read_lazy()
242  self.session.write(
243  "setenv LD_LIBRARY_PATH %s\n" % os.environ["LD_LIBRARY_PATH"]
244  )
245  self.session.read_lazy()
246  self.session.write("setenv ROOTSYS %s\n" % os.environ["ROOTSYS"])
247  self.session.read_lazy()
248  self.session.write(
249  "%s %s/scripts-%s/ppserver.py \n"
250  % (sys.executable, ppprefix, sys.version.split()[0])
251  )
252  self.session.read_lazy()
253  self.session.read_lazy()
254  print("started ppserver in ", hostname)
255 
256  def __del__(self):
257  self.session.close()
258  print("killed ppserver in ", self.host)
GaudiMP.Parallel.WorkManager.mode
mode
Definition: Parallel.py:156
GaudiMP.Parallel.Statistics.start
start
Definition: Parallel.py:58
GaudiMP.Parallel.SshSession.session
session
Definition: Parallel.py:235
GaudiMP.Parallel.WorkManager.__init__
def __init__(self, ncpus="autodetect", ppservers=None)
Definition: Parallel.py:145
GaudiMP.Parallel.WorkManager.ppservers
ppservers
Definition: Parallel.py:153
GaudiMP.Parallel.Statistics.__init__
def __init__(self)
Definition: Parallel.py:56
GaudiMP.Parallel.WorkManager.pool
pool
Definition: Parallel.py:158
GaudiMP.Parallel.WorkManager.__del__
def __del__(self)
Definition: Parallel.py:162
GaudiMP.Parallel.Task._resetOutput
def _resetOutput(self)
Definition: Parallel.py:133
GaudiMP.Parallel.Task.output
output
Definition: Parallel.py:104
GaudiMP.Parallel.WorkManager.sessions
sessions
Definition: Parallel.py:154
GaudiMP.Parallel.WorkManager.process
def process(self, task, items, timeout=90000)
Definition: Parallel.py:168
GaudiMP.Parallel._ppfunction
def _ppfunction(args)
Definition: Parallel.py:35
GaudiMP.Parallel.Task.__new__
def __new__(cls, *args, **kwargs)
Definition: Parallel.py:73
GaudiMP.Parallel.Statistics.njob
njob
Definition: Parallel.py:60
GaudiMP.Parallel.SshSession.__del__
def __del__(self)
Definition: Parallel.py:256
GaudiMP.Parallel.WorkManager
Definition: Parallel.py:140
GaudiMP.Parallel.SshSession
Definition: Parallel.py:228
GaudiMP.Parallel.Task.initializeLocal
def initializeLocal(self)
Definition: Parallel.py:82
GaudiMP.Parallel._prefunction
def _prefunction(f, task, item)
Definition: Parallel.py:31
GaudiMP.Parallel.Statistics.time
time
Definition: Parallel.py:59
GaudiMP.Parallel.SshSession.host
host
Definition: Parallel.py:233
GaudiMP.Parallel.Statistics.stop
def stop(self)
Definition: Parallel.py:62
GaudiMP.Parallel.Task.process
def process(self, item)
Definition: Parallel.py:88
GaudiMP.Parallel.Task
Definition: Parallel.py:66
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.Parallel.Statistics
Definition: Parallel.py:55
GaudiMP.Parallel.WorkManager._mergeStatistics
def _mergeStatistics(self, stat)
Definition: Parallel.py:220
GaudiMP.Parallel.Task.initializeRemote
def initializeRemote(self)
Definition: Parallel.py:85
GaudiMP.Parallel.WorkManager.ncpus
ncpus
Definition: Parallel.py:147
GaudiMP.Parallel.Task._mergeResults
def _mergeResults(self, result)
Definition: Parallel.py:94
GaudiMP.Parallel.Statistics.name
name
Definition: Parallel.py:57
GaudiMP.Parallel.Task.finalize
def finalize(self)
Definition: Parallel.py:91
GaudiMP.Parallel.WorkManager._printStatistics
def _printStatistics(self)
Definition: Parallel.py:202
GaudiMP.Parallel.SshSession.__init__
def __init__(self, hostname)
Definition: Parallel.py:229
GaudiMP.Parallel.WorkManager.stats
stats
Definition: Parallel.py:160
GaudiMP.Parallel.WorkManager.server
server
Definition: Parallel.py:155
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:97