The Gaudi Framework  v32r2 (46d42edc)
Parallel.py
Go to the documentation of this file.
1 # File: GaudiMP/Parallel.py
2 # Author: Pere Mato (pere.mato@cern.ch)
3 """ GaudiMP.Parallel module.
4  This module provides 'parallel' processing support for GaudiPyhton.
5  It is adding some sugar on top of public domain packages such as
6  the 'multiprocessing' or the 'pp' packages. The interface can be made
7  independent of the underlying implementation package.
8  Two main class are defined: Task and WorkManager
9 """
10 from __future__ import print_function
11 
12 __all__ = ['Task', 'WorkManager']
13 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
14 
15 import sys
16 import os
17 import time
18 import copy
19 import multiprocessing
20 
21 
22 def _prefunction(f, task, item):
23  return f((task, item))
24 
25 
26 def _ppfunction(args):
27  # --- Unpack arguments
28  task, item = args
29  stat = Statistics()
30  # --- Initialize the remote side (at least once)
31  if not task.__class__._initializeDone:
32  for k, v in task.environ.items():
33  if k not in excluded_varnames:
34  os.environ[k] = v
35  task.initializeRemote()
36  task.__class__._initializeDone = True
37  # --- Reset the task output
38  task._resetOutput()
39  # --- Call processing
40  task.process(item)
41  # --- Collect statistics
42  stat.stop()
43  return (copy.deepcopy(task.output), stat)
44 
45 
46 class Statistics(object):
47  def __init__(self):
48  self.name = os.getenv('HOSTNAME')
49  self.start = time.time()
50  self.time = 0.0
51  self.njob = 0
52 
53  def stop(self):
54  self.time = time.time() - self.start
55 
56 
57 class Task(object):
58  """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
59  User class much inherit from it and implement the methods initializeLocal,
60  initializeRemote, process and finalize. """
61  _initializeDone = False
62 
63  def __new__(cls, *args, **kwargs):
64  task = object.__new__(cls)
65  task.output = ()
66  task.environ = {}
67  for k, v in os.environ.items():
68  task.environ[k] = v
69  task.cwd = os.getcwd()
70  return task
71 
72  def initializeLocal(self):
73  pass
74 
75  def initializeRemote(self):
76  pass
77 
78  def process(self, item):
79  pass
80 
81  def finalize(self):
82  pass
83 
84  def _mergeResults(self, result):
85  if type(result) is not type(self.output):
86  raise TypeError("output type is not same as obtained result")
87  # --No iteratable---
88  if not hasattr(result, '__iter__'):
89  if hasattr(self.output, 'Add'):
90  self.output.Add(result)
91  elif hasattr(self.output, '__iadd__'):
92  self.output += result
93  elif hasattr(self.output, '__add__'):
94  self.output = self.output + result
95  else:
96  raise TypeError('result cannot be added')
97  # --Dictionary---
98  elif type(result) is dict:
99  if self.output.keys() <= result.keys():
100  minkeys = self.output.keys()
101  else:
102  minkeys = result.keys()
103  for key in result.keys():
104  if key in self.output:
105  if hasattr(self.output[key], 'Add'):
106  self.output[key].Add(result[key])
107  elif hasattr(self.output[key], '__iadd__'):
108  self.output[key] += result[key]
109  elif hasattr(self.output[key], '__add__'):
110  self.output[key] = self.output[key] + result[key]
111  else:
112  raise TypeError('result cannot be added')
113  else:
114  self.output[key] = result[key]
115  # --Anything else (list)
116  else:
117  for i in range(min(len(self.output), len(result))):
118  if hasattr(self.output[i], 'Add'):
119  self.output[i].Add(result[i])
120  elif hasattr(self.output[i], '__iadd__'):
121  self.output[i] += result[i]
122  elif hasattr(self.output[i], '__add__'):
123  self.output[i] = self.output[i] + result[i]
124  else:
125  raise TypeError('result cannot be added')
126 
127  def _resetOutput(self):
128  output = (type(self.output) is
129  dict) and self.output.values() or self.output
130  for o in output:
131  if hasattr(o, 'Reset'):
132  o.Reset()
133 
134 
135 class WorkManager(object):
136  """ Class to in charge of managing the tasks and distributing them to
137  the workers. They can be local (using other cores) or remote
138  using other nodes in the local cluster """
139 
140  def __init__(self, ncpus='autodetect', ppservers=None):
141  if ncpus == 'autodetect':
142  self.ncpus = multiprocessing.cpu_count()
143  else:
144  self.ncpus = ncpus
145  if ppservers:
146  import pp
147  self.ppservers = ppservers
148  self.sessions = [SshSession(srv) for srv in ppservers]
149  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
150  self.mode = 'cluster'
151  else:
152  self.pool = multiprocessing.Pool(self.ncpus)
153  self.mode = 'multicore'
154  self.stats = {}
155 
156  def __del__(self):
157  if hasattr(self, 'server'):
158  self.server.destroy()
159 
160  def process(self, task, items, timeout=90000):
161  if not isinstance(task, Task):
162  raise TypeError("task argument needs to be an 'Task' instance")
163  # --- Call the Local initialialization
164  task.initializeLocal()
165  # --- Schedule all the jobs ....
166  if self.mode == 'cluster':
167  jobs = [
168  self.server.submit(_prefunction, (_ppfunction, task, item), (),
169  ('GaudiMP.Parallel', 'time'))
170  for item in items
171  ]
172  for job in jobs:
173  result, stat = job()
174  task._mergeResults(result)
175  self._mergeStatistics(stat)
176  self._printStatistics()
177  self.server.print_stats()
178  elif self.mode == 'multicore':
179  start = time.time()
180  jobs = self.pool.map_async(_ppfunction,
181  zip([task for i in items], items))
182  for result, stat in jobs.get(timeout):
183  task._mergeResults(result)
184  self._mergeStatistics(stat)
185  end = time.time()
186  self._printStatistics()
187  print('Time elapsed since server creation %f' % (end - start))
188  # --- Call the Local Finalize
189  task.finalize()
190 
191  def _printStatistics(self):
192  njobs = 0
193  for stat in self.stats.values():
194  njobs += stat.njob
195  print('Job execution statistics:')
196  print(
197  'job count | % of all jobs | job time sum | time per job | job server'
198  )
199  for name, stat in self.stats.items():
200  print(' %d | %6.2f | %8.3f | %8.3f | %s' %
201  (stat.njob, 100. * stat.njob / njobs, stat.time,
202  stat.time / stat.njob, name))
203 
204  def _mergeStatistics(self, stat):
205  if stat.name not in self.stats:
206  self.stats[stat.name] = Statistics()
207  s = self.stats[stat.name]
208  s.time += stat.time
209  s.njob += 1
210 
211 
212 class SshSession(object):
213  def __init__(self, hostname):
214  import pyssh
215  import pp
216  self.host = hostname
217  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
218  self.session = pyssh.Ssh(host=hostname)
219  self.session.open()
220  self.session.read_lazy()
221  self.session.write('cd %s\n' % os.getcwd())
222  self.session.read_lazy()
223  self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
224  self.session.read_lazy()
225  self.session.write(
226  'setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
227  self.session.read_lazy()
228  self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
229  self.session.read_lazy()
230  self.session.write('%s %s/scripts-%s/ppserver.py \n' %
231  (sys.executable, ppprefix, sys.version.split()[0]))
232  self.session.read_lazy()
233  self.session.read_lazy()
234  print('started ppserver in ', hostname)
235 
236  def __del__(self):
237  self.session.close()
238  print('killed ppserver in ', self.host)
def _mergeStatistics(self, stat)
Definition: Parallel.py:204
def __new__(cls, *args, **kwargs)
Definition: Parallel.py:63
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:202
def finalize(self)
Definition: Parallel.py:81
def __init__(self, ncpus='autodetect', ppservers=None)
Definition: Parallel.py:140
def initializeLocal(self)
Definition: Parallel.py:72
def __init__(self, hostname)
Definition: Parallel.py:213
def _ppfunction(args)
Definition: Parallel.py:26
def process(self, task, items, timeout=90000)
Definition: Parallel.py:160
def _prefunction(f, task, item)
Definition: Parallel.py:22
def _mergeResults(self, result)
Definition: Parallel.py:84
def process(self, item)
Definition: Parallel.py:78
def initializeRemote(self)
Definition: Parallel.py:75
def _resetOutput(self)
Definition: Parallel.py:127
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.