The Gaudi Framework  v36r7 (7f57a304)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Parallel.py
Go to the documentation of this file.
1 
13 """ GaudiMP.Parallel module.
14  This module provides 'parallel' processing support for GaudiPyhton.
15  It is adding some sugar on top of public domain packages such as
16  the 'multiprocessing' or the 'pp' packages. The interface can be made
17  independent of the underlying implementation package.
18  Two main class are defined: Task and WorkManager
19 """
20 from __future__ import print_function
21 
22 __all__ = ["Task", "WorkManager"]
23 excluded_varnames = ["HOSTNAME", "SSH_CLIENT", "SSH_CONNECTION", "DISPLAY"]
24 
25 import copy
26 import multiprocessing
27 import os
28 import sys
29 import time
30 
31 
32 def _prefunction(f, task, item):
33  return f((task, item))
34 
35 
36 def _ppfunction(args):
37  # --- Unpack arguments
38  task, item = args
39  stat = Statistics()
40  # --- Initialize the remote side (at least once)
41  if not task.__class__._initializeDone:
42  for k, v in task.environ.items():
43  if k not in excluded_varnames:
44  os.environ[k] = v
45  task.initializeRemote()
46  task.__class__._initializeDone = True
47  # --- Reset the task output
48  task._resetOutput()
49  # --- Call processing
50  task.process(item)
51  # --- Collect statistics
52  stat.stop()
53  return (copy.deepcopy(task.output), stat)
54 
55 
56 class Statistics(object):
57  def __init__(self):
58  self.name = os.getenv("HOSTNAME")
59  self.start = time.time()
60  self.time = 0.0
61  self.njob = 0
62 
63  def stop(self):
64  self.time = time.time() - self.start
65 
66 
67 class Task(object):
68  """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69  User class much inherit from it and implement the methods initializeLocal,
70  initializeRemote, process and finalize."""
71 
72  _initializeDone = False
73 
74  def __new__(cls, *args, **kwargs):
75  task = object.__new__(cls)
76  task.output = ()
77  task.environ = {}
78  for k, v in os.environ.items():
79  task.environ[k] = v
80  task.cwd = os.getcwd()
81  return task
82 
83  def initializeLocal(self):
84  pass
85 
86  def initializeRemote(self):
87  pass
88 
89  def process(self, item):
90  pass
91 
92  def finalize(self):
93  pass
94 
95  def _mergeResults(self, result):
96  if type(result) is not type(self.output):
97  raise TypeError("output type is not same as obtained result")
98  # --No iteratable---
99  if not hasattr(result, "__iter__"):
100  if hasattr(self.output, "Add"):
101  self.output.Add(result)
102  elif hasattr(self.output, "__iadd__"):
103  self.output += result
104  elif hasattr(self.output, "__add__"):
105  self.output = self.output + result
106  else:
107  raise TypeError("result cannot be added")
108  # --Dictionary---
109  elif type(result) is dict:
110  if self.output.keys() <= result.keys():
111  minkeys = self.output.keys()
112  else:
113  minkeys = result.keys()
114  for key in result.keys():
115  if key in self.output:
116  if hasattr(self.output[key], "Add"):
117  self.output[key].Add(result[key])
118  elif hasattr(self.output[key], "__iadd__"):
119  self.output[key] += result[key]
120  elif hasattr(self.output[key], "__add__"):
121  self.output[key] = self.output[key] + result[key]
122  else:
123  raise TypeError("result cannot be added")
124  else:
125  self.output[key] = result[key]
126  # --Anything else (list)
127  else:
128  for i in range(min(len(self.output), len(result))):
129  if hasattr(self.output[i], "Add"):
130  self.output[i].Add(result[i])
131  elif hasattr(self.output[i], "__iadd__"):
132  self.output[i] += result[i]
133  elif hasattr(self.output[i], "__add__"):
134  self.output[i] = self.output[i] + result[i]
135  else:
136  raise TypeError("result cannot be added")
137 
138  def _resetOutput(self):
139  output = (type(self.output) is dict) and self.output.values() or self.output
140  for o in output:
141  if hasattr(o, "Reset"):
142  o.Reset()
143 
144 
145 class WorkManager(object):
146  """Class to in charge of managing the tasks and distributing them to
147  the workers. They can be local (using other cores) or remote
148  using other nodes in the local cluster"""
149 
150  def __init__(self, ncpus="autodetect", ppservers=None):
151  if ncpus == "autodetect":
152  self.ncpus = multiprocessing.cpu_count()
153  else:
154  self.ncpus = ncpus
155  if ppservers:
156  import pp
157 
158  self.ppservers = ppservers
159  self.sessions = [SshSession(srv) for srv in ppservers]
160  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
161  self.mode = "cluster"
162  else:
163  self.pool = multiprocessing.Pool(self.ncpus)
164  self.mode = "multicore"
165  self.stats = {}
166 
167  def __del__(self):
168  if hasattr(self, "server"):
169  self.server.destroy()
170  if hasattr(self, "pool"):
171  self.pool.close()
172 
173  def process(self, task, items, timeout=90000):
174  if not isinstance(task, Task):
175  raise TypeError("task argument needs to be an 'Task' instance")
176  # --- Call the Local initialialization
177  task.initializeLocal()
178  # --- Schedule all the jobs ....
179  if self.mode == "cluster":
180  jobs = [
181  self.server.submit(
182  _prefunction,
183  (_ppfunction, task, item),
184  (),
185  ("GaudiMP.Parallel", "time"),
186  )
187  for item in items
188  ]
189  for job in jobs:
190  result, stat = job()
191  task._mergeResults(result)
192  self._mergeStatistics(stat)
193  self._printStatistics()
194  self.server.print_stats()
195  elif self.mode == "multicore":
196  start = time.time()
197  jobs = self.pool.map_async(_ppfunction, zip([task for i in items], items))
198  for result, stat in jobs.get(timeout):
199  task._mergeResults(result)
200  self._mergeStatistics(stat)
201  end = time.time()
202  self._printStatistics()
203  print("Time elapsed since server creation %f" % (end - start))
204  # --- Call the Local Finalize
205  task.finalize()
206 
207  def _printStatistics(self):
208  njobs = 0
209  for stat in self.stats.values():
210  njobs += stat.njob
211  print("Job execution statistics:")
212  print("job count | % of all jobs | job time sum | time per job | job server")
213  for name, stat in self.stats.items():
214  print(
215  " %d | %6.2f | %8.3f | %8.3f | %s"
216  % (
217  stat.njob,
218  100.0 * stat.njob / njobs,
219  stat.time,
220  stat.time / stat.njob,
221  name,
222  )
223  )
224 
225  def _mergeStatistics(self, stat):
226  if stat.name not in self.stats:
227  self.stats[stat.name] = Statistics()
228  s = self.stats[stat.name]
229  s.time += stat.time
230  s.njob += 1
231 
232 
233 class SshSession(object):
234  def __init__(self, hostname):
235  import pp
236  import pyssh
237 
238  self.host = hostname
239  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
240  self.session = pyssh.Ssh(host=hostname)
241  self.session.open()
242  self.session.read_lazy()
243  self.session.write("cd %s\n" % os.getcwd())
244  self.session.read_lazy()
245  self.session.write("setenv PYTHONPATH %s\n" % os.environ["PYTHONPATH"])
246  self.session.read_lazy()
247  self.session.write(
248  "setenv LD_LIBRARY_PATH %s\n" % os.environ["LD_LIBRARY_PATH"]
249  )
250  self.session.read_lazy()
251  self.session.write("setenv ROOTSYS %s\n" % os.environ["ROOTSYS"])
252  self.session.read_lazy()
253  self.session.write(
254  "%s %s/scripts-%s/ppserver.py \n"
255  % (sys.executable, ppprefix, sys.version.split()[0])
256  )
257  self.session.read_lazy()
258  self.session.read_lazy()
259  print("started ppserver in ", hostname)
260 
261  def __del__(self):
262  self.session.close()
263  print("killed ppserver in ", self.host)
GaudiMP.Parallel.WorkManager.mode
mode
Definition: Parallel.py:161
GaudiMP.Parallel.Statistics.start
start
Definition: Parallel.py:59
GaudiMP.Parallel.SshSession.session
session
Definition: Parallel.py:240
GaudiMP.Parallel.WorkManager.__init__
def __init__(self, ncpus="autodetect", ppservers=None)
Definition: Parallel.py:150
GaudiMP.Parallel.WorkManager.ppservers
ppservers
Definition: Parallel.py:158
GaudiMP.Parallel.Statistics.__init__
def __init__(self)
Definition: Parallel.py:57
GaudiMP.Parallel.WorkManager.pool
pool
Definition: Parallel.py:163
GaudiMP.Parallel.WorkManager.__del__
def __del__(self)
Definition: Parallel.py:167
GaudiMP.Parallel.Task._resetOutput
def _resetOutput(self)
Definition: Parallel.py:138
GaudiMP.Parallel.Task.output
output
Definition: Parallel.py:105
GaudiMP.Parallel.WorkManager.sessions
sessions
Definition: Parallel.py:159
GaudiMP.Parallel.WorkManager.process
def process(self, task, items, timeout=90000)
Definition: Parallel.py:173
GaudiMP.Parallel._ppfunction
def _ppfunction(args)
Definition: Parallel.py:36
GaudiMP.Parallel.Task.__new__
def __new__(cls, *args, **kwargs)
Definition: Parallel.py:74
GaudiMP.Parallel.Statistics.njob
njob
Definition: Parallel.py:61
GaudiMP.Parallel.SshSession.__del__
def __del__(self)
Definition: Parallel.py:261
GaudiMP.Parallel.WorkManager
Definition: Parallel.py:145
GaudiMP.Parallel.SshSession
Definition: Parallel.py:233
GaudiMP.Parallel.Task.initializeLocal
def initializeLocal(self)
Definition: Parallel.py:83
GaudiMP.Parallel._prefunction
def _prefunction(f, task, item)
Definition: Parallel.py:32
min
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:212
GaudiMP.Parallel.Statistics.time
time
Definition: Parallel.py:60
GaudiMP.Parallel.SshSession.host
host
Definition: Parallel.py:238
GaudiMP.Parallel.Statistics.stop
def stop(self)
Definition: Parallel.py:63
GaudiMP.Parallel.Task.process
def process(self, item)
Definition: Parallel.py:89
GaudiMP.Parallel.Task
Definition: Parallel.py:67
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.Parallel.Statistics
Definition: Parallel.py:56
GaudiMP.Parallel.WorkManager._mergeStatistics
def _mergeStatistics(self, stat)
Definition: Parallel.py:225
GaudiMP.Parallel.Task.initializeRemote
def initializeRemote(self)
Definition: Parallel.py:86
GaudiMP.Parallel.WorkManager.ncpus
ncpus
Definition: Parallel.py:152
GaudiMP.Parallel.Task._mergeResults
def _mergeResults(self, result)
Definition: Parallel.py:95
GaudiMP.Parallel.Statistics.name
name
Definition: Parallel.py:58
GaudiMP.Parallel.Task.finalize
def finalize(self)
Definition: Parallel.py:92
GaudiMP.Parallel.WorkManager._printStatistics
def _printStatistics(self)
Definition: Parallel.py:207
GaudiMP.Parallel.SshSession.__init__
def __init__(self, hostname)
Definition: Parallel.py:234
GaudiMP.Parallel.WorkManager.stats
stats
Definition: Parallel.py:165
GaudiMP.Parallel.WorkManager.server
server
Definition: Parallel.py:160
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiPython.Pythonizations.items
items
Definition: Pythonizations.py:546