4 """ GaudiMP.Parallel module.
5 This module provides 'parallel' processing support for GaudiPyhton.
6 It is adding some sugar on top of public domain packages such as
7 the 'multiprocessing' or the 'pp' packages. The interface can be made
8 independent of the underlying implementation package.
9 Two main class are defined: Task and WorkManager
12 __all__ = [
'Task',
'WorkManager' ]
13 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
15 import sys, os, time, copy
16 import multiprocessing
25 if not task.__class__._initializeDone :
26 for k,v
in task.environ.items() :
27 if k
not in excluded_varnames : os.environ[k] = v
28 task.initializeRemote()
29 task.__class__._initializeDone =
True
36 return (copy.deepcopy(task.output), stat)
38 class Statistics(object):
40 self.name = os.getenv(
'HOSTNAME')
41 self.start = time.time()
45 self.time = time.time() - self.start
48 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
49 User class much inherit from it and implement the methods initializeLocal,
50 initializeRemote, process and finalize. """
51 _initializeDone =
False
52 def __new__ ( cls, *args, **kwargs ):
53 task = object.__new__( cls )
56 for k,v
in os.environ.items(): task.environ[k] = v
57 task.cwd = os.getcwd()
59 def initializeLocal(self):
61 def initializeRemote(self):
63 def process(self, item):
67 def _mergeResults(self, result) :
68 if type(result)
is not type(self.output) :
69 raise TypeError(
"output type is not same as obtained result")
71 if not hasattr( result ,
'__iter__' ):
72 if hasattr(self.output,
'Add') : self.output.Add(result)
73 elif hasattr(self.output,
'__iadd__') : self.output += result
74 elif hasattr(self.output,
'__add__') : self.output = self.output + result
75 else :
raise TypeError(
'result cannot be added')
77 elif type(result)
is dict :
78 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
79 else: minkeys = result.keys()
80 for key
in result.keys() :
81 if key
in self.output :
82 if hasattr(self.output[key],
'Add') : self.output[key].Add(result[key])
83 elif hasattr(self.output[key],
'__iadd__') : self.output[key] += result[key]
84 elif hasattr(self.output[key],
'__add__') : self.output[key] = self.output[key] + result[key]
85 else :
raise TypeError(
'result cannot be added')
87 self.output[key] = result[key]
90 for i
in range(
min( len(self.output) , len(result)) ):
91 if hasattr(self.output[i],
'Add') : self.output[i].Add(result[i])
92 elif hasattr(self.output[i],
'__iadd__') : self.output[i] += result[i]
93 elif hasattr(self.output[i],
'__add__') : self.output[i] = self.output[i] + result[i]
94 else :
raise TypeError(
'result cannot be added')
95 def _resetOutput(self):
96 output = (
type(self.output)
is dict)
and self.output.values()
or self.output
98 if hasattr(o,
'Reset'): o.Reset()
101 class WorkManager(object) :
102 """ Class to in charge of managing the tasks and distributing them to
103 the workers. They can be local (using other cores) or remote
104 using other nodes in the local cluster """
106 def __init__( self, ncpus='autodetect', ppservers=None) :
107 if ncpus ==
'autodetect' : self.ncpus = multiprocessing.cpu_count()
108 else : self.ncpus = ncpus
111 self.ppservers = ppservers
112 self.sessions = [ SshSession(srv)
for srv
in ppservers ]
113 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
114 self.mode =
'cluster'
116 self.pool = multiprocessing.Pool(self.ncpus)
117 self.mode =
'multicore'
121 if hasattr(self,
'server') : self.server.destroy()
123 def process(self, task, items, timeout=90000):
124 if not isinstance(task,Task) :
125 raise TypeError(
"task argument needs to be an 'Task' instance")
127 task.initializeLocal()
129 if self.mode ==
'cluster' :
130 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), (
'GaudiMP.Parallel',
'time'))
for item
in items]
133 task._mergeResults(result)
134 self._mergeStatistics(stat)
135 self._printStatistics()
136 self.server.print_stats()
137 elif self.mode ==
'multicore' :
139 jobs = self.pool.map_async(_ppfunction, zip([task
for i
in items] , items ))
140 for result, stat
in jobs.get(timeout) :
141 task._mergeResults(result)
142 self._mergeStatistics(stat)
144 self._printStatistics()
145 print 'Time elapsed since server creation %f' %(end-start)
148 def _printStatistics(self):
150 for stat
in self.stats.values():
152 print 'Job execution statistics:'
153 print 'job count | % of all jobs | job time sum | time per job | job server'
154 for name, stat
in self.stats.items():
155 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
157 def _mergeStatistics(self, stat):
158 if stat.name
not in self.stats : self.stats[stat.name] = Statistics()
159 s = self.stats[stat.name]
164 class SshSession(object) :
165 def __init__(self, hostname):
169 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
170 self.session = pyssh.Ssh(host=hostname)
172 self.session.read_lazy()
173 self.session.write(
'cd %s\n' % os.getcwd())
174 self.session.read_lazy()
175 self.session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
176 self.session.read_lazy()
177 self.session.write(
'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
178 self.session.read_lazy()
179 self.session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
180 self.session.read_lazy()
181 self.session.write(
'%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
182 self.session.read_lazy()
183 self.session.read_lazy()
184 print 'started ppserver in ', hostname
187 print 'killed ppserver in ', self.host
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container
def _prefunction(f, task, item)