The Gaudi Framework  v36r16 (ea80daf8)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Parallel.py
Go to the documentation of this file.
1 
13 """ GaudiMP.Parallel module.
14  This module provides 'parallel' processing support for GaudiPyhton.
15  It is adding some sugar on top of public domain packages such as
16  the 'multiprocessing' or the 'pp' packages. The interface can be made
17  independent of the underlying implementation package.
18  Two main class are defined: Task and WorkManager
19 """
20 from __future__ import print_function
21 
22 __all__ = ["Task", "WorkManager"]
23 excluded_varnames = ["HOSTNAME", "SSH_CLIENT", "SSH_CONNECTION", "DISPLAY"]
24 
25 import copy
26 import multiprocessing
27 import os
28 import sys
29 import time
30 
31 
32 def _prefunction(f, task, item):
33  return f((task, item))
34 
35 
36 def _ppfunction(args):
37  # --- Unpack arguments
38  task, item = args
39  stat = Statistics()
40  # --- Initialize the remote side (at least once)
41  if not task.__class__._initializeDone:
42  for k, v in task.environ.items():
43  if k not in excluded_varnames:
44  os.environ[k] = v
45  task.initializeRemote()
46  task.__class__._initializeDone = True
47  # --- Reset the task output
48  task._resetOutput()
49  # --- Call processing
50  task.process(item)
51  # --- Collect statistics
52  stat.stop()
53  return (copy.deepcopy(task.output), stat)
54 
55 
56 class Statistics(object):
57  def __init__(self):
58  self.name = os.getenv("HOSTNAME")
59  self.start = time.time()
60  self.time = 0.0
61  self.njob = 0
62 
63  def stop(self):
64  self.time = time.time() - self.start
65 
66 
67 class Task(object):
68  """Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69  User class much inherit from it and implement the methods initializeLocal,
70  initializeRemote, process and finalize."""
71 
72  _initializeDone = False
73 
74  def __new__(cls, *args, **kwargs):
75  task = object.__new__(cls)
76  task.output = ()
77  task.environ = {}
78  for k, v in os.environ.items():
79  task.environ[k] = v
80  task.cwd = os.getcwd()
81  return task
82 
83  def initializeLocal(self):
84  pass
85 
86  def initializeRemote(self):
87  pass
88 
89  def process(self, item):
90  pass
91 
92  def finalize(self):
93  pass
94 
95  def _mergeResults(self, result):
96  if not isinstance(result, type(self.output)):
97  raise TypeError("output type is not same as obtained result")
98  # --No iteratable---
99  if not hasattr(result, "__iter__"):
100  if hasattr(self.output, "Add"):
101  self.output.Add(result)
102  elif hasattr(self.output, "__iadd__"):
103  self.output += result
104  elif hasattr(self.output, "__add__"):
105  self.output = self.output + result
106  else:
107  raise TypeError("result cannot be added")
108  # --Dictionary---
109  elif type(result) is dict:
110  for key in result.keys():
111  if key in self.output:
112  if hasattr(self.output[key], "Add"):
113  self.output[key].Add(result[key])
114  elif hasattr(self.output[key], "__iadd__"):
115  self.output[key] += result[key]
116  elif hasattr(self.output[key], "__add__"):
117  self.output[key] = self.output[key] + result[key]
118  else:
119  raise TypeError("result cannot be added")
120  else:
121  self.output[key] = result[key]
122  # --Anything else (list)
123  else:
124  for i in range(min(len(self.output), len(result))):
125  if hasattr(self.output[i], "Add"):
126  self.output[i].Add(result[i])
127  elif hasattr(self.output[i], "__iadd__"):
128  self.output[i] += result[i]
129  elif hasattr(self.output[i], "__add__"):
130  self.output[i] = self.output[i] + result[i]
131  else:
132  raise TypeError("result cannot be added")
133 
134  def _resetOutput(self):
135  output = (type(self.output) is dict) and self.output.values() or self.output
136  for o in output:
137  if hasattr(o, "Reset"):
138  o.Reset()
139 
140 
141 class WorkManager(object):
142  """Class to in charge of managing the tasks and distributing them to
143  the workers. They can be local (using other cores) or remote
144  using other nodes in the local cluster"""
145 
146  def __init__(self, ncpus="autodetect", ppservers=None):
147  if ncpus == "autodetect":
148  self.ncpus = multiprocessing.cpu_count()
149  else:
150  self.ncpus = ncpus
151  if ppservers:
152  import pp
153 
154  self.ppservers = ppservers
155  self.sessions = [SshSession(srv) for srv in ppservers]
156  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
157  self.mode = "cluster"
158  else:
159  self.pool = multiprocessing.Pool(self.ncpus)
160  self.mode = "multicore"
161  self.stats = {}
162 
163  def __del__(self):
164  if hasattr(self, "server"):
165  self.server.destroy()
166  if hasattr(self, "pool"):
167  self.pool.close()
168 
169  def process(self, task, items, timeout=90000):
170  if not isinstance(task, Task):
171  raise TypeError("task argument needs to be an 'Task' instance")
172  # --- Call the Local initialialization
173  task.initializeLocal()
174  # --- Schedule all the jobs ....
175  if self.mode == "cluster":
176  jobs = [
177  self.server.submit(
178  _prefunction,
179  (_ppfunction, task, item),
180  (),
181  ("GaudiMP.Parallel", "time"),
182  )
183  for item in items
184  ]
185  for job in jobs:
186  result, stat = job()
187  task._mergeResults(result)
188  self._mergeStatistics(stat)
189  self._printStatistics()
190  self.server.print_stats()
191  elif self.mode == "multicore":
192  start = time.time()
193  jobs = self.pool.map_async(_ppfunction, zip([task for i in items], items))
194  for result, stat in jobs.get(timeout):
195  task._mergeResults(result)
196  self._mergeStatistics(stat)
197  end = time.time()
198  self._printStatistics()
199  print("Time elapsed since server creation %f" % (end - start))
200  # --- Call the Local Finalize
201  task.finalize()
202 
203  def _printStatistics(self):
204  njobs = 0
205  for stat in self.stats.values():
206  njobs += stat.njob
207  print("Job execution statistics:")
208  print("job count | % of all jobs | job time sum | time per job | job server")
209  for name, stat in self.stats.items():
210  print(
211  " %d | %6.2f | %8.3f | %8.3f | %s"
212  % (
213  stat.njob,
214  100.0 * stat.njob / njobs,
215  stat.time,
216  stat.time / stat.njob,
217  name,
218  )
219  )
220 
221  def _mergeStatistics(self, stat):
222  if stat.name not in self.stats:
223  self.stats[stat.name] = Statistics()
224  s = self.stats[stat.name]
225  s.time += stat.time
226  s.njob += 1
227 
228 
229 class SshSession(object):
230  def __init__(self, hostname):
231  import pp
232  import pyssh
233 
234  self.host = hostname
235  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
236  self.session = pyssh.Ssh(host=hostname)
237  self.session.open()
238  self.session.read_lazy()
239  self.session.write("cd %s\n" % os.getcwd())
240  self.session.read_lazy()
241  self.session.write("setenv PYTHONPATH %s\n" % os.environ["PYTHONPATH"])
242  self.session.read_lazy()
243  self.session.write(
244  "setenv LD_LIBRARY_PATH %s\n" % os.environ["LD_LIBRARY_PATH"]
245  )
246  self.session.read_lazy()
247  self.session.write("setenv ROOTSYS %s\n" % os.environ["ROOTSYS"])
248  self.session.read_lazy()
249  self.session.write(
250  "%s %s/scripts-%s/ppserver.py \n"
251  % (sys.executable, ppprefix, sys.version.split()[0])
252  )
253  self.session.read_lazy()
254  self.session.read_lazy()
255  print("started ppserver in ", hostname)
256 
257  def __del__(self):
258  self.session.close()
259  print("killed ppserver in ", self.host)
GaudiMP.Parallel.WorkManager.mode
mode
Definition: Parallel.py:157
GaudiMP.Parallel.Statistics.start
start
Definition: Parallel.py:59
GaudiMP.Parallel.SshSession.session
session
Definition: Parallel.py:236
GaudiMP.Parallel.WorkManager.__init__
def __init__(self, ncpus="autodetect", ppservers=None)
Definition: Parallel.py:146
GaudiMP.Parallel.WorkManager.ppservers
ppservers
Definition: Parallel.py:154
GaudiMP.Parallel.Statistics.__init__
def __init__(self)
Definition: Parallel.py:57
GaudiMP.Parallel.WorkManager.pool
pool
Definition: Parallel.py:159
GaudiMP.Parallel.WorkManager.__del__
def __del__(self)
Definition: Parallel.py:163
GaudiMP.Parallel.Task._resetOutput
def _resetOutput(self)
Definition: Parallel.py:134
GaudiMP.Parallel.Task.output
output
Definition: Parallel.py:105
GaudiMP.Parallel.WorkManager.sessions
sessions
Definition: Parallel.py:155
GaudiMP.Parallel.WorkManager.process
def process(self, task, items, timeout=90000)
Definition: Parallel.py:169
GaudiMP.Parallel._ppfunction
def _ppfunction(args)
Definition: Parallel.py:36
GaudiMP.Parallel.Task.__new__
def __new__(cls, *args, **kwargs)
Definition: Parallel.py:74
GaudiMP.Parallel.Statistics.njob
njob
Definition: Parallel.py:61
GaudiMP.Parallel.SshSession.__del__
def __del__(self)
Definition: Parallel.py:257
GaudiMP.Parallel.WorkManager
Definition: Parallel.py:141
GaudiMP.Parallel.SshSession
Definition: Parallel.py:229
GaudiMP.Parallel.Task.initializeLocal
def initializeLocal(self)
Definition: Parallel.py:83
GaudiMP.Parallel._prefunction
def _prefunction(f, task, item)
Definition: Parallel.py:32
min
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:212
GaudiMP.Parallel.Statistics.time
time
Definition: Parallel.py:60
GaudiMP.Parallel.SshSession.host
host
Definition: Parallel.py:234
GaudiMP.Parallel.Statistics.stop
def stop(self)
Definition: Parallel.py:63
GaudiMP.Parallel.Task.process
def process(self, item)
Definition: Parallel.py:89
GaudiMP.Parallel.Task
Definition: Parallel.py:67
gaudirun.type
type
Definition: gaudirun.py:162
GaudiMP.Parallel.Statistics
Definition: Parallel.py:56
GaudiMP.Parallel.WorkManager._mergeStatistics
def _mergeStatistics(self, stat)
Definition: Parallel.py:221
GaudiMP.Parallel.Task.initializeRemote
def initializeRemote(self)
Definition: Parallel.py:86
GaudiMP.Parallel.WorkManager.ncpus
ncpus
Definition: Parallel.py:148
GaudiMP.Parallel.Task._mergeResults
def _mergeResults(self, result)
Definition: Parallel.py:95
GaudiMP.Parallel.Statistics.name
name
Definition: Parallel.py:58
GaudiMP.Parallel.Task.finalize
def finalize(self)
Definition: Parallel.py:92
GaudiMP.Parallel.WorkManager._printStatistics
def _printStatistics(self)
Definition: Parallel.py:203
GaudiMP.Parallel.SshSession.__init__
def __init__(self, hostname)
Definition: Parallel.py:230
GaudiMP.Parallel.WorkManager.stats
stats
Definition: Parallel.py:161
GaudiMP.Parallel.WorkManager.server
server
Definition: Parallel.py:156
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiPython.Pythonizations.items
items
Definition: Pythonizations.py:546