4 """ GaudiMP.Parallel module. 5 This module provides 'parallel' processing support for GaudiPyhton. 6 It is adding some sugar on top of public domain packages such as 7 the 'multiprocessing' or the 'pp' packages. The interface can be made 8 independent of the underlying implementation package. 9 Two main class are defined: Task and WorkManager 12 __all__ = [
'Task',
'WorkManager' ]
13 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
15 import sys, os, time, copy
16 import multiprocessing
25 if not task.__class__._initializeDone :
26 for k,v
in task.environ.items() :
27 if k
not in excluded_varnames : os.environ[k] = v
28 task.initializeRemote()
29 task.__class__._initializeDone =
True 36 return (copy.deepcopy(task.output), stat)
40 self.
name = os.getenv(
'HOSTNAME')
48 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel. 49 User class much inherit from it and implement the methods initializeLocal, 50 initializeRemote, process and finalize. """ 51 _initializeDone =
False 53 task = object.__new__( cls )
56 for k,v
in os.environ.items(): task.environ[k] = v
57 task.cwd = os.getcwd()
68 if type(result)
is not type(self.output) :
69 raise TypeError(
"output type is not same as obtained result")
71 if not hasattr( result ,
'__iter__' ):
72 if hasattr(self.output,
'Add') : self.output.Add(result)
73 elif hasattr(self.output,
'__iadd__') : self.output += result
75 else :
raise TypeError(
'result cannot be added')
77 elif type(result)
is dict :
78 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
79 else: minkeys = result.keys()
80 for key
in result.keys() :
82 if hasattr(self.
output[key],
'Add') : self.
output[key].Add(result[key])
83 elif hasattr(self.
output[key],
'__iadd__') : self.
output[key] += result[key]
84 elif hasattr(self.
output[key],
'__add__') : self.
output[key] = self.
output[key] + result[key]
85 else :
raise TypeError(
'result cannot be added')
87 self.
output[key] = result[key]
90 for i
in range( min( len(self.
output) , len(result)) ):
91 if hasattr(self.
output[i],
'Add') : self.
output[i].Add(result[i])
92 elif hasattr(self.
output[i],
'__iadd__') : self.
output[i] += result[i]
94 else :
raise TypeError(
'result cannot be added')
98 if hasattr(o,
'Reset'): o.Reset()
102 """ Class to in charge of managing the tasks and distributing them to 103 the workers. They can be local (using other cores) or remote 104 using other nodes in the local cluster """ 106 def __init__( self, ncpus='autodetect', ppservers=None) :
107 if ncpus ==
'autodetect' : self.
ncpus = multiprocessing.cpu_count()
108 else : self.
ncpus = ncpus
117 self.
mode =
'multicore' 121 if hasattr(self,
'server') : self.server.destroy()
123 def process(self, task, items, timeout=90000):
124 if not isinstance(task,Task) :
125 raise TypeError(
"task argument needs to be an 'Task' instance")
127 task.initializeLocal()
129 if self.
mode ==
'cluster' :
130 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), (
'GaudiMP.Parallel',
'time'))
for item
in items]
133 task._mergeResults(result)
136 self.server.print_stats()
137 elif self.
mode ==
'multicore' :
139 jobs = self.pool.map_async(_ppfunction, zip([task
for i
in items] , items ))
140 for result, stat
in jobs.get(timeout) :
141 task._mergeResults(result)
145 print 'Time elapsed since server creation %f' %(end-start)
150 for stat
in self.stats.values():
152 print 'Job execution statistics:' 153 print 'job count | % of all jobs | job time sum | time per job | job server' 154 for name, stat
in self.stats.items():
155 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
159 s = self.
stats[stat.name]
169 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
172 self.session.read_lazy()
173 self.session.write(
'cd %s\n' % os.getcwd())
174 self.session.read_lazy()
175 self.session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
176 self.session.read_lazy()
177 self.session.write(
'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
178 self.session.read_lazy()
179 self.session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
180 self.session.read_lazy()
181 self.session.write(
'%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
182 self.session.read_lazy()
183 self.session.read_lazy()
184 print 'started ppserver in ', hostname
187 print 'killed ppserver in ', self.
host def _mergeStatistics(self, stat)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def _printStatistics(self)
def __init__(self, ncpus='autodetect', ppservers=None)
def initializeLocal(self)
def __init__(self, hostname)
def process(self, task, items, timeout=90000)
def __new__(cls, args, kwargs)
def _prefunction(f, task, item)
def _mergeResults(self, result)
def initializeRemote(self)