All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Parallel.py
Go to the documentation of this file.
1 # File: GaudiMP/Parallel.py
2 # Author: Pere Mato (pere.mato@cern.ch)
3 
4 """ GaudiMP.Parallel module.
5  This module provides 'parallel' processing support for GaudiPyhton.
6  It is adding some sugar on top of public domain packages such as
7  the 'multiprocessing' or the 'pp' packages. The interface can be made
8  independent of the underlying implementation package.
9  Two main class are defined: Task and WorkManager
10 """
11 
12 __all__ = [ 'Task','WorkManager' ]
13 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
14 
15 import sys, os, time, copy
16 import multiprocessing
17 
18 def _prefunction( f, task, item) :
19  return f((task,item))
20 def _ppfunction( args ) :
21  #--- Unpack arguments
22  task, item = args
23  stat = Statistics()
24  #--- Initialize the remote side (at least once)
25  if not task.__class__._initializeDone :
26  for k,v in task.environ.items() :
27  if k not in excluded_varnames : os.environ[k] = v
28  task.initializeRemote()
29  task.__class__._initializeDone = True
30  #--- Reset the task output
31  task._resetOutput()
32  #--- Call processing
33  task.process(item)
34  #--- Collect statistics
35  stat.stop()
36  return (copy.deepcopy(task.output), stat)
37 
38 class Statistics(object):
39  def __init__(self):
40  self.name = os.getenv('HOSTNAME')
41  self.start = time.time()
42  self.time = 0.0
43  self.njob = 0
44  def stop(self):
45  self.time = time.time() - self.start
46 
47 class Task(object) :
48  """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
49  User class much inherit from it and implement the methods initializeLocal,
50  initializeRemote, process and finalize. """
51  _initializeDone = False
52  def __new__ ( cls, *args, **kwargs ):
53  task = object.__new__( cls )
54  task.output = ()
55  task.environ = {}
56  for k,v in os.environ.items(): task.environ[k] = v
57  task.cwd = os.getcwd()
58  return task
59  def initializeLocal(self):
60  pass
61  def initializeRemote(self):
62  pass
63  def process(self, item):
64  pass
65  def finalize(self) :
66  pass
67  def _mergeResults(self, result) :
68  if type(result) is not type(self.output) :
69  raise TypeError("output type is not same as obtained result")
70  #--No iteratable---
71  if not hasattr( result , '__iter__' ):
72  if hasattr(self.output,'Add') : self.output.Add(result)
73  elif hasattr(self.output,'__iadd__') : self.output += result
74  elif hasattr(self.output,'__add__') : self.output = self.output + result
75  else : raise TypeError('result cannot be added')
76  #--Dictionary---
77  elif type(result) is dict :
78  if self.output.keys() <= result.keys(): minkeys = self.output.keys()
79  else: minkeys = result.keys()
80  for key in result.keys() :
81  if key in self.output :
82  if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
83  elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
84  elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
85  else : raise TypeError('result cannot be added')
86  else :
87  self.output[key] = result[key]
88  #--Anything else (list)
89  else :
90  for i in range( min( len(self.output) , len(result)) ):
91  if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
92  elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
93  elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
94  else : raise TypeError('result cannot be added')
95  def _resetOutput(self):
96  output = (type(self.output) is dict) and self.output.values() or self.output
97  for o in output :
98  if hasattr(o, 'Reset'): o.Reset()
99 
100 
101 class WorkManager(object) :
102  """ Class to in charge of managing the tasks and distributing them to
103  the workers. They can be local (using other cores) or remote
104  using other nodes in the local cluster """
105 
106  def __init__( self, ncpus='autodetect', ppservers=None) :
107  if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
108  else : self.ncpus = ncpus
109  if ppservers :
110  import pp
111  self.ppservers = ppservers
112  self.sessions = [ SshSession(srv) for srv in ppservers ]
113  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
114  self.mode = 'cluster'
115  else :
116  self.pool = multiprocessing.Pool(self.ncpus)
117  self.mode = 'multicore'
118  self.stats = {}
119 
120  def __del__(self):
121  if hasattr(self,'server') : self.server.destroy()
122 
123  def process(self, task, items, timeout=90000):
124  if not isinstance(task,Task) :
125  raise TypeError("task argument needs to be an 'Task' instance")
126  # --- Call the Local initialialization
127  task.initializeLocal()
128  # --- Schedule all the jobs ....
129  if self.mode == 'cluster' :
130  jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
131  for job in jobs :
132  result, stat = job()
133  task._mergeResults(result)
134  self._mergeStatistics(stat)
135  self._printStatistics()
136  self.server.print_stats()
137  elif self.mode == 'multicore' :
138  start = time.time()
139  jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
140  for result, stat in jobs.get(timeout) :
141  task._mergeResults(result)
142  self._mergeStatistics(stat)
143  end = time.time()
144  self._printStatistics()
145  print 'Time elapsed since server creation %f' %(end-start)
146  # --- Call the Local Finalize
147  task.finalize()
148  def _printStatistics(self):
149  njobs = 0
150  for stat in self.stats.values():
151  njobs += stat.njob
152  print 'Job execution statistics:'
153  print 'job count | % of all jobs | job time sum | time per job | job server'
154  for name, stat in self.stats.items():
155  print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
156 
157  def _mergeStatistics(self, stat):
158  if stat.name not in self.stats : self.stats[stat.name] = Statistics()
159  s = self.stats[stat.name]
160  s.time += stat.time
161  s.njob += 1
162 
163 
164 class SshSession(object) :
165  def __init__(self, hostname):
166  import pyssh
167  import pp
168  self.host = hostname
169  ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
170  self.session = pyssh.Ssh(host=hostname)
171  self.session.open()
172  self.session.read_lazy()
173  self.session.write('cd %s\n' % os.getcwd())
174  self.session.read_lazy()
175  self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
176  self.session.read_lazy()
177  self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
178  self.session.read_lazy()
179  self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
180  self.session.read_lazy()
181  self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
182  self.session.read_lazy()
183  self.session.read_lazy()
184  print 'started ppserver in ', hostname
185  def __del__(self):
186  self.session.close()
187  print 'killed ppserver in ', self.host
188 
189 # == EOF ====================================================================================
def _mergeStatistics(self, stat)
Definition: Parallel.py:157
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def finalize(self)
Definition: Parallel.py:65
def __init__(self, ncpus='autodetect', ppservers=None)
Definition: Parallel.py:106
def initializeLocal(self)
Definition: Parallel.py:59
def __init__(self, hostname)
Definition: Parallel.py:165
def _ppfunction(args)
Definition: Parallel.py:20
def process(self, task, items, timeout=90000)
Definition: Parallel.py:123
def __new__(cls, args, kwargs)
Definition: Parallel.py:52
def _prefunction(f, task, item)
Definition: Parallel.py:18
def _mergeResults(self, result)
Definition: Parallel.py:67
def process(self, item)
Definition: Parallel.py:63
def initializeRemote(self)
Definition: Parallel.py:61
def _resetOutput(self)
Definition: Parallel.py:95