The Gaudi Framework  v36r1 (3e2fb5a8)
Parallel.py
Go to the documentation of this file.
1 
13 """ GaudiMP.Parallel module.
14  This module provides 'parallel' processing support for GaudiPyhton.
15  It is adding some sugar on top of public domain packages such as
16  the 'multiprocessing' or the 'pp' packages. The interface can be made
17  independent of the underlying implementation package.
18  Two main class are defined: Task and WorkManager
19 """
20 from __future__ import print_function
21 
22 __all__ = ['Task', 'WorkManager']
23 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
24 
25 import sys
26 import os
27 import time
28 import copy
29 import multiprocessing
30 
31 
32 def _prefunction(f, task, item):
33  return f((task, item))
34 
35 
36 def _ppfunction(args):
37  # --- Unpack arguments
38  task, item = args
39  stat = Statistics()
40  # --- Initialize the remote side (at least once)
41  if not task.__class__._initializeDone:
42  for k, v in task.environ.items():
43  if k not in excluded_varnames:
44  os.environ[k] = v
45  task.initializeRemote()
46  task.__class__._initializeDone = True
47  # --- Reset the task output
48  task._resetOutput()
49  # --- Call processing
50  task.process(item)
51  # --- Collect statistics
52  stat.stop()
53  return (copy.deepcopy(task.output), stat)
54 
55 
56 class Statistics(object):
57  def __init__(self):
58  self.name = os.getenv('HOSTNAME')
59  self.start = time.time()
60  self.time = 0.0
61  self.njob = 0
62 
63  def stop(self):
64  self.time = time.time() - self.start
65 
66 
67 class Task(object):
68  """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
69  User class much inherit from it and implement the methods initializeLocal,
70  initializeRemote, process and finalize. """
71  _initializeDone = False
72 
73  def __new__(cls, *args, **kwargs):
74  task = object.__new__(cls)
75  task.output = ()
76  task.environ = {}
77  for k, v in os.environ.items():
78  task.environ[k] = v
79  task.cwd = os.getcwd()
80  return task
81 
82  def initializeLocal(self):
83  pass
84 
85  def initializeRemote(self):
86  pass
87 
88  def process(self, item):
89  pass
90 
91  def finalize(self):
92  pass
93 
94  def _mergeResults(self, result):
95  if type(result) is not type(self.output):
96  raise TypeError("output type is not same as obtained result")
97  # --No iteratable---
98  if not hasattr(result, '__iter__'):
99  if hasattr(self.output, 'Add'):
100  self.output.Add(result)
101  elif hasattr(self.output, '__iadd__'):
102  self.output += result
103  elif hasattr(self.output, '__add__'):
104  self.output = self.output + result
105  else:
106  raise TypeError('result cannot be added')
107  # --Dictionary---
108  elif type(result) is dict:
109  if self.output.keys() <= result.keys():
110  minkeys = self.output.keys()
111  else:
112  minkeys = result.keys()
113  for key in result.keys():
114  if key in self.output:
115  if hasattr(self.output[key], 'Add'):
116  self.output[key].Add(result[key])
117  elif hasattr(self.output[key], '__iadd__'):
118  self.output[key] += result[key]
119  elif hasattr(self.output[key], '__add__'):
120  self.output[key] = self.output[key] + result[key]
121  else:
122  raise TypeError('result cannot be added')
123  else:
124  self.output[key] = result[key]
125  # --Anything else (list)
126  else:
127  for i in range(min(len(self.output), len(result))):
128  if hasattr(self.output[i], 'Add'):
129  self.output[i].Add(result[i])
130  elif hasattr(self.output[i], '__iadd__'):
131  self.output[i] += result[i]
132  elif hasattr(self.output[i], '__add__'):
133  self.output[i] = self.output[i] + result[i]
134  else:
135  raise TypeError('result cannot be added')
136 
137  def _resetOutput(self):
138  output = (type(self.output) is
139  dict) and self.output.values() or self.output
140  for o in output:
141  if hasattr(o, 'Reset'):
142  o.Reset()
143 
144 
145 class WorkManager(object):
146  """ Class to in charge of managing the tasks and distributing them to
147  the workers. They can be local (using other cores) or remote
148  using other nodes in the local cluster """
149 
150  def __init__(self, ncpus='autodetect', ppservers=None):
151  if ncpus == 'autodetect':
152  self.ncpus = multiprocessing.cpu_count()
153  else:
154  self.ncpus = ncpus
155  if ppservers:
156  import pp
157  self.ppservers = ppservers
158  self.sessions = [SshSession(srv) for srv in ppservers]
159  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
160  self.mode = 'cluster'
161  else:
162  self.pool = multiprocessing.Pool(self.ncpus)
163  self.mode = 'multicore'
164  self.stats = {}
165 
166  def __del__(self):
167  if hasattr(self, 'server'):
168  self.server.destroy()
169 
170  def process(self, task, items, timeout=90000):
171  if not isinstance(task, Task):
172  raise TypeError("task argument needs to be an 'Task' instance")
173  # --- Call the Local initialialization
174  task.initializeLocal()
175  # --- Schedule all the jobs ....
176  if self.mode == 'cluster':
177  jobs = [
178  self.server.submit(_prefunction, (_ppfunction, task, item), (),
179  ('GaudiMP.Parallel', 'time'))
180  for item in items
181  ]
182  for job in jobs:
183  result, stat = job()
184  task._mergeResults(result)
185  self._mergeStatistics(stat)
186  self._printStatistics()
187  self.server.print_stats()
188  elif self.mode == 'multicore':
189  start = time.time()
190  jobs = self.pool.map_async(_ppfunction,
191  zip([task for i in items], items))
192  for result, stat in jobs.get(timeout):
193  task._mergeResults(result)
194  self._mergeStatistics(stat)
195  end = time.time()
196  self._printStatistics()
197  print('Time elapsed since server creation %f' % (end - start))
198  # --- Call the Local Finalize
199  task.finalize()
200 
201  def _printStatistics(self):
202  njobs = 0
203  for stat in self.stats.values():
204  njobs += stat.njob
205  print('Job execution statistics:')
206  print(
207  'job count | % of all jobs | job time sum | time per job | job server'
208  )
209  for name, stat in self.stats.items():
210  print(' %d | %6.2f | %8.3f | %8.3f | %s' %
211  (stat.njob, 100. * stat.njob / njobs, stat.time,
212  stat.time / stat.njob, name))
213 
214  def _mergeStatistics(self, stat):
215  if stat.name not in self.stats:
216  self.stats[stat.name] = Statistics()
217  s = self.stats[stat.name]
218  s.time += stat.time
219  s.njob += 1
220 
221 
222 class SshSession(object):
223  def __init__(self, hostname):
224  import pyssh
225  import pp
226  self.host = hostname
227  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
228  self.session = pyssh.Ssh(host=hostname)
229  self.session.open()
230  self.session.read_lazy()
231  self.session.write('cd %s\n' % os.getcwd())
232  self.session.read_lazy()
233  self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
234  self.session.read_lazy()
235  self.session.write(
236  'setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
237  self.session.read_lazy()
238  self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
239  self.session.read_lazy()
240  self.session.write('%s %s/scripts-%s/ppserver.py \n' %
241  (sys.executable, ppprefix, sys.version.split()[0]))
242  self.session.read_lazy()
243  self.session.read_lazy()
244  print('started ppserver in ', hostname)
245 
246  def __del__(self):
247  self.session.close()
248  print('killed ppserver in ', self.host)
GaudiMP.Parallel.WorkManager.mode
mode
Definition: Parallel.py:160
GaudiMP.Parallel.Statistics.start
start
Definition: Parallel.py:59
GaudiMP.Parallel.SshSession.session
session
Definition: Parallel.py:228
GaudiMP.Parallel.WorkManager.ppservers
ppservers
Definition: Parallel.py:157
GaudiMP.Parallel.Statistics.__init__
def __init__(self)
Definition: Parallel.py:57
GaudiMP.Parallel.WorkManager.pool
pool
Definition: Parallel.py:162
GaudiMP.Parallel.WorkManager.__del__
def __del__(self)
Definition: Parallel.py:166
GaudiMP.Parallel.Task._resetOutput
def _resetOutput(self)
Definition: Parallel.py:137
GaudiMP.Parallel.Task.output
output
Definition: Parallel.py:104
GaudiMP.Parallel.WorkManager.sessions
sessions
Definition: Parallel.py:158
GaudiMP.Parallel.WorkManager.process
def process(self, task, items, timeout=90000)
Definition: Parallel.py:170
GaudiMP.Parallel._ppfunction
def _ppfunction(args)
Definition: Parallel.py:36
GaudiMP.Parallel.Task.__new__
def __new__(cls, *args, **kwargs)
Definition: Parallel.py:73
GaudiMP.Parallel.Statistics.njob
njob
Definition: Parallel.py:61
GaudiMP.Parallel.SshSession.__del__
def __del__(self)
Definition: Parallel.py:246
GaudiMP.Parallel.WorkManager
Definition: Parallel.py:145
GaudiMP.Parallel.SshSession
Definition: Parallel.py:222
GaudiMP.Parallel.Task.initializeLocal
def initializeLocal(self)
Definition: Parallel.py:82
GaudiMP.Parallel._prefunction
def _prefunction(f, task, item)
Definition: Parallel.py:32
min
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:212
GaudiMP.Parallel.Statistics.time
time
Definition: Parallel.py:60
GaudiMP.Parallel.SshSession.host
host
Definition: Parallel.py:226
GaudiMP.Parallel.Statistics.stop
def stop(self)
Definition: Parallel.py:63
GaudiMP.Parallel.Task.process
def process(self, item)
Definition: Parallel.py:88
GaudiMP.Parallel.Task
Definition: Parallel.py:67
gaudirun.type
type
Definition: gaudirun.py:154
GaudiMP.Parallel.Statistics
Definition: Parallel.py:56
GaudiMP.Parallel.WorkManager._mergeStatistics
def _mergeStatistics(self, stat)
Definition: Parallel.py:214
GaudiMP.Parallel.Task.initializeRemote
def initializeRemote(self)
Definition: Parallel.py:85
GaudiMP.Parallel.WorkManager.ncpus
ncpus
Definition: Parallel.py:152
GaudiMP.Parallel.Task._mergeResults
def _mergeResults(self, result)
Definition: Parallel.py:94
GaudiMP.Parallel.Statistics.name
name
Definition: Parallel.py:58
GaudiMP.Parallel.Task.finalize
def finalize(self)
Definition: Parallel.py:91
GaudiMP.Parallel.WorkManager._printStatistics
def _printStatistics(self)
Definition: Parallel.py:201
GaudiMP.Parallel.SshSession.__init__
def __init__(self, hostname)
Definition: Parallel.py:223
GaudiMP.Parallel.WorkManager.__init__
def __init__(self, ncpus='autodetect', ppservers=None)
Definition: Parallel.py:150
GaudiMP.Parallel.WorkManager.stats
stats
Definition: Parallel.py:164
GaudiMP.Parallel.WorkManager.server
server
Definition: Parallel.py:159
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:97
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiPython.Pythonizations.items
items
Definition: Pythonizations.py:526