The Gaudi Framework  v29r0 (ff2e7097)
Parallel.py
Go to the documentation of this file.
1 # File: GaudiMP/Parallel.py
2 # Author: Pere Mato (pere.mato@cern.ch)
3 
4 """ GaudiMP.Parallel module.
5  This module provides 'parallel' processing support for GaudiPyhton.
6  It is adding some sugar on top of public domain packages such as
7  the 'multiprocessing' or the 'pp' packages. The interface can be made
8  independent of the underlying implementation package.
9  Two main class are defined: Task and WorkManager
10 """
11 
12 __all__ = ['Task', 'WorkManager']
13 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
14 
15 import sys
16 import os
17 import time
18 import copy
19 import multiprocessing
20 
21 
22 def _prefunction(f, task, item):
23  return f((task, item))
24 
25 
26 def _ppfunction(args):
27  #--- Unpack arguments
28  task, item = args
29  stat = Statistics()
30  #--- Initialize the remote side (at least once)
31  if not task.__class__._initializeDone:
32  for k, v in task.environ.items():
33  if k not in excluded_varnames:
34  os.environ[k] = v
35  task.initializeRemote()
36  task.__class__._initializeDone = True
37  #--- Reset the task output
38  task._resetOutput()
39  #--- Call processing
40  task.process(item)
41  #--- Collect statistics
42  stat.stop()
43  return (copy.deepcopy(task.output), stat)
44 
45 
46 class Statistics(object):
47  def __init__(self):
48  self.name = os.getenv('HOSTNAME')
49  self.start = time.time()
50  self.time = 0.0
51  self.njob = 0
52 
53  def stop(self):
54  self.time = time.time() - self.start
55 
56 
57 class Task(object):
58  """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
59  User class much inherit from it and implement the methods initializeLocal,
60  initializeRemote, process and finalize. """
61  _initializeDone = False
62 
63  def __new__(cls, *args, **kwargs):
64  task = object.__new__(cls)
65  task.output = ()
66  task.environ = {}
67  for k, v in os.environ.items():
68  task.environ[k] = v
69  task.cwd = os.getcwd()
70  return task
71 
72  def initializeLocal(self):
73  pass
74 
75  def initializeRemote(self):
76  pass
77 
78  def process(self, item):
79  pass
80 
81  def finalize(self):
82  pass
83 
84  def _mergeResults(self, result):
85  if type(result) is not type(self.output):
86  raise TypeError("output type is not same as obtained result")
87  #--No iteratable---
88  if not hasattr(result, '__iter__'):
89  if hasattr(self.output, 'Add'):
90  self.output.Add(result)
91  elif hasattr(self.output, '__iadd__'):
92  self.output += result
93  elif hasattr(self.output, '__add__'):
94  self.output = self.output + result
95  else:
96  raise TypeError('result cannot be added')
97  #--Dictionary---
98  elif type(result) is dict:
99  if self.output.keys() <= result.keys():
100  minkeys = self.output.keys()
101  else:
102  minkeys = result.keys()
103  for key in result.keys():
104  if key in self.output:
105  if hasattr(self.output[key], 'Add'):
106  self.output[key].Add(result[key])
107  elif hasattr(self.output[key], '__iadd__'):
108  self.output[key] += result[key]
109  elif hasattr(self.output[key], '__add__'):
110  self.output[key] = self.output[key] + result[key]
111  else:
112  raise TypeError('result cannot be added')
113  else:
114  self.output[key] = result[key]
115  #--Anything else (list)
116  else:
117  for i in range(min(len(self.output), len(result))):
118  if hasattr(self.output[i], 'Add'):
119  self.output[i].Add(result[i])
120  elif hasattr(self.output[i], '__iadd__'):
121  self.output[i] += result[i]
122  elif hasattr(self.output[i], '__add__'):
123  self.output[i] = self.output[i] + result[i]
124  else:
125  raise TypeError('result cannot be added')
126 
127  def _resetOutput(self):
128  output = (type(self.output)
129  is dict) and self.output.values() or self.output
130  for o in output:
131  if hasattr(o, 'Reset'):
132  o.Reset()
133 
134 
135 class WorkManager(object):
136  """ Class to in charge of managing the tasks and distributing them to
137  the workers. They can be local (using other cores) or remote
138  using other nodes in the local cluster """
139 
140  def __init__(self, ncpus='autodetect', ppservers=None):
141  if ncpus == 'autodetect':
142  self.ncpus = multiprocessing.cpu_count()
143  else:
144  self.ncpus = ncpus
145  if ppservers:
146  import pp
147  self.ppservers = ppservers
148  self.sessions = [SshSession(srv) for srv in ppservers]
149  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
150  self.mode = 'cluster'
151  else:
152  self.pool = multiprocessing.Pool(self.ncpus)
153  self.mode = 'multicore'
154  self.stats = {}
155 
156  def __del__(self):
157  if hasattr(self, 'server'):
158  self.server.destroy()
159 
160  def process(self, task, items, timeout=90000):
161  if not isinstance(task, Task):
162  raise TypeError("task argument needs to be an 'Task' instance")
163  # --- Call the Local initialialization
164  task.initializeLocal()
165  # --- Schedule all the jobs ....
166  if self.mode == 'cluster':
167  jobs = [self.server.submit(
168  _prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel', 'time')) for item in items]
169  for job in jobs:
170  result, stat = job()
171  task._mergeResults(result)
172  self._mergeStatistics(stat)
173  self._printStatistics()
174  self.server.print_stats()
175  elif self.mode == 'multicore':
176  start = time.time()
177  jobs = self.pool.map_async(
178  _ppfunction, zip([task for i in items], items))
179  for result, stat in jobs.get(timeout):
180  task._mergeResults(result)
181  self._mergeStatistics(stat)
182  end = time.time()
183  self._printStatistics()
184  print 'Time elapsed since server creation %f' % (end - start)
185  # --- Call the Local Finalize
186  task.finalize()
187 
188  def _printStatistics(self):
189  njobs = 0
190  for stat in self.stats.values():
191  njobs += stat.njob
192  print 'Job execution statistics:'
193  print 'job count | % of all jobs | job time sum | time per job | job server'
194  for name, stat in self.stats.items():
195  print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100. * stat.njob / njobs, stat.time, stat.time / stat.njob, name)
196 
197  def _mergeStatistics(self, stat):
198  if stat.name not in self.stats:
199  self.stats[stat.name] = Statistics()
200  s = self.stats[stat.name]
201  s.time += stat.time
202  s.njob += 1
203 
204 
205 class SshSession(object):
206  def __init__(self, hostname):
207  import pyssh
208  import pp
209  self.host = hostname
210  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
211  self.session = pyssh.Ssh(host=hostname)
212  self.session.open()
213  self.session.read_lazy()
214  self.session.write('cd %s\n' % os.getcwd())
215  self.session.read_lazy()
216  self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
217  self.session.read_lazy()
218  self.session.write('setenv LD_LIBRARY_PATH %s\n' %
219  os.environ['LD_LIBRARY_PATH'])
220  self.session.read_lazy()
221  self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
222  self.session.read_lazy()
223  self.session.write('%s %s/scripts-%s/ppserver.py \n' %
224  (sys.executable, ppprefix, sys.version.split()[0]))
225  self.session.read_lazy()
226  self.session.read_lazy()
227  print 'started ppserver in ', hostname
228 
229  def __del__(self):
230  self.session.close()
231  print 'killed ppserver in ', self.host
def _mergeStatistics(self, stat)
Definition: Parallel.py:197
def finalize(self)
Definition: Parallel.py:81
def __init__(self, ncpus='autodetect', ppservers=None)
Definition: Parallel.py:140
def initializeLocal(self)
Definition: Parallel.py:72
def __init__(self, hostname)
Definition: Parallel.py:206
def _ppfunction(args)
Definition: Parallel.py:26
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def process(self, task, items, timeout=90000)
Definition: Parallel.py:160
def __new__(cls, args, kwargs)
Definition: Parallel.py:63
def _prefunction(f, task, item)
Definition: Parallel.py:22
def _mergeResults(self, result)
Definition: Parallel.py:84
def process(self, item)
Definition: Parallel.py:78
def initializeRemote(self)
Definition: Parallel.py:75
def _resetOutput(self)
Definition: Parallel.py:127