Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Parallel.py
Go to the documentation of this file.
1 # File: GaudiMP/Parallel.py
2 # Author: Pere Mato (pere.mato@cern.ch)
3 """ GaudiMP.Parallel module.
4  This module provides 'parallel' processing support for GaudiPyhton.
5  It is adding some sugar on top of public domain packages such as
6  the 'multiprocessing' or the 'pp' packages. The interface can be made
7  independent of the underlying implementation package.
8  Two main class are defined: Task and WorkManager
9 """
10 
11 __all__ = ['Task', 'WorkManager']
12 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
13 
14 import sys
15 import os
16 import time
17 import copy
18 import multiprocessing
19 
20 
21 def _prefunction(f, task, item):
22  return f((task, item))
23 
24 
25 def _ppfunction(args):
26  # --- Unpack arguments
27  task, item = args
28  stat = Statistics()
29  # --- Initialize the remote side (at least once)
30  if not task.__class__._initializeDone:
31  for k, v in task.environ.items():
32  if k not in excluded_varnames:
33  os.environ[k] = v
34  task.initializeRemote()
35  task.__class__._initializeDone = True
36  # --- Reset the task output
37  task._resetOutput()
38  # --- Call processing
39  task.process(item)
40  # --- Collect statistics
41  stat.stop()
42  return (copy.deepcopy(task.output), stat)
43 
44 
45 class Statistics(object):
46  def __init__(self):
47  self.name = os.getenv('HOSTNAME')
48  self.start = time.time()
49  self.time = 0.0
50  self.njob = 0
51 
52  def stop(self):
53  self.time = time.time() - self.start
54 
55 
56 class Task(object):
57  """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
58  User class much inherit from it and implement the methods initializeLocal,
59  initializeRemote, process and finalize. """
60  _initializeDone = False
61 
62  def __new__(cls, *args, **kwargs):
63  task = object.__new__(cls)
64  task.output = ()
65  task.environ = {}
66  for k, v in os.environ.items():
67  task.environ[k] = v
68  task.cwd = os.getcwd()
69  return task
70 
71  def initializeLocal(self):
72  pass
73 
74  def initializeRemote(self):
75  pass
76 
77  def process(self, item):
78  pass
79 
80  def finalize(self):
81  pass
82 
83  def _mergeResults(self, result):
84  if type(result) is not type(self.output):
85  raise TypeError("output type is not same as obtained result")
86  # --No iteratable---
87  if not hasattr(result, '__iter__'):
88  if hasattr(self.output, 'Add'):
89  self.output.Add(result)
90  elif hasattr(self.output, '__iadd__'):
91  self.output += result
92  elif hasattr(self.output, '__add__'):
93  self.output = self.output + result
94  else:
95  raise TypeError('result cannot be added')
96  # --Dictionary---
97  elif type(result) is dict:
98  if self.output.keys() <= result.keys():
99  minkeys = self.output.keys()
100  else:
101  minkeys = result.keys()
102  for key in result.keys():
103  if key in self.output:
104  if hasattr(self.output[key], 'Add'):
105  self.output[key].Add(result[key])
106  elif hasattr(self.output[key], '__iadd__'):
107  self.output[key] += result[key]
108  elif hasattr(self.output[key], '__add__'):
109  self.output[key] = self.output[key] + result[key]
110  else:
111  raise TypeError('result cannot be added')
112  else:
113  self.output[key] = result[key]
114  # --Anything else (list)
115  else:
116  for i in range(min(len(self.output), len(result))):
117  if hasattr(self.output[i], 'Add'):
118  self.output[i].Add(result[i])
119  elif hasattr(self.output[i], '__iadd__'):
120  self.output[i] += result[i]
121  elif hasattr(self.output[i], '__add__'):
122  self.output[i] = self.output[i] + result[i]
123  else:
124  raise TypeError('result cannot be added')
125 
126  def _resetOutput(self):
127  output = (type(self.output) is
128  dict) and self.output.values() or self.output
129  for o in output:
130  if hasattr(o, 'Reset'):
131  o.Reset()
132 
133 
134 class WorkManager(object):
135  """ Class to in charge of managing the tasks and distributing them to
136  the workers. They can be local (using other cores) or remote
137  using other nodes in the local cluster """
138 
139  def __init__(self, ncpus='autodetect', ppservers=None):
140  if ncpus == 'autodetect':
141  self.ncpus = multiprocessing.cpu_count()
142  else:
143  self.ncpus = ncpus
144  if ppservers:
145  import pp
146  self.ppservers = ppservers
147  self.sessions = [SshSession(srv) for srv in ppservers]
148  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
149  self.mode = 'cluster'
150  else:
151  self.pool = multiprocessing.Pool(self.ncpus)
152  self.mode = 'multicore'
153  self.stats = {}
154 
155  def __del__(self):
156  if hasattr(self, 'server'):
157  self.server.destroy()
158 
159  def process(self, task, items, timeout=90000):
160  if not isinstance(task, Task):
161  raise TypeError("task argument needs to be an 'Task' instance")
162  # --- Call the Local initialialization
163  task.initializeLocal()
164  # --- Schedule all the jobs ....
165  if self.mode == 'cluster':
166  jobs = [
167  self.server.submit(_prefunction, (_ppfunction, task, item), (),
168  ('GaudiMP.Parallel', 'time'))
169  for item in items
170  ]
171  for job in jobs:
172  result, stat = job()
173  task._mergeResults(result)
174  self._mergeStatistics(stat)
175  self._printStatistics()
176  self.server.print_stats()
177  elif self.mode == 'multicore':
178  start = time.time()
179  jobs = self.pool.map_async(_ppfunction,
180  zip([task for i in items], items))
181  for result, stat in jobs.get(timeout):
182  task._mergeResults(result)
183  self._mergeStatistics(stat)
184  end = time.time()
185  self._printStatistics()
186  print 'Time elapsed since server creation %f' % (end - start)
187  # --- Call the Local Finalize
188  task.finalize()
189 
190  def _printStatistics(self):
191  njobs = 0
192  for stat in self.stats.values():
193  njobs += stat.njob
194  print 'Job execution statistics:'
195  print 'job count | % of all jobs | job time sum | time per job | job server'
196  for name, stat in self.stats.items():
197  print ' %d | %6.2f | %8.3f | %8.3f | %s' % (
198  stat.njob, 100. * stat.njob / njobs, stat.time,
199  stat.time / stat.njob, name)
200 
201  def _mergeStatistics(self, stat):
202  if stat.name not in self.stats:
203  self.stats[stat.name] = Statistics()
204  s = self.stats[stat.name]
205  s.time += stat.time
206  s.njob += 1
207 
208 
209 class SshSession(object):
210  def __init__(self, hostname):
211  import pyssh
212  import pp
213  self.host = hostname
214  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
215  self.session = pyssh.Ssh(host=hostname)
216  self.session.open()
217  self.session.read_lazy()
218  self.session.write('cd %s\n' % os.getcwd())
219  self.session.read_lazy()
220  self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
221  self.session.read_lazy()
222  self.session.write(
223  'setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
224  self.session.read_lazy()
225  self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
226  self.session.read_lazy()
227  self.session.write('%s %s/scripts-%s/ppserver.py \n' %
228  (sys.executable, ppprefix, sys.version.split()[0]))
229  self.session.read_lazy()
230  self.session.read_lazy()
231  print 'started ppserver in ', hostname
232 
233  def __del__(self):
234  self.session.close()
235  print 'killed ppserver in ', self.host
def _mergeStatistics(self, stat)
Definition: Parallel.py:201
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:202
def finalize(self)
Definition: Parallel.py:80
def __init__(self, ncpus='autodetect', ppservers=None)
Definition: Parallel.py:139
def initializeLocal(self)
Definition: Parallel.py:71
def __init__(self, hostname)
Definition: Parallel.py:210
def _ppfunction(args)
Definition: Parallel.py:25
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def process(self, task, items, timeout=90000)
Definition: Parallel.py:159
def __new__(cls, args, kwargs)
Definition: Parallel.py:62
def _prefunction(f, task, item)
Definition: Parallel.py:21
def _mergeResults(self, result)
Definition: Parallel.py:83
def process(self, item)
Definition: Parallel.py:77
def initializeRemote(self)
Definition: Parallel.py:74
def _resetOutput(self)
Definition: Parallel.py:126