4 """ GaudiMP.Parallel module.
5 This module provides 'parallel' processing support for GaudiPyhton.
6 It is adding some sugar on top of public domain packages such as
7 the 'multiprocessing' or the 'pp' packages. The interface can be made
8 independent of the underlying implementation package.
9 Two main class are defined: Task and WorkManager
12 __all__ = [
'Task',
'WorkManager' ]
13 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
15 import sys, os, time, copy
16 import multiprocessing
25 if not task.__class__._initializeDone :
26 for k,v
in task.environ.items() :
27 if k
not in excluded_varnames : os.environ[k] = v
28 task.initializeRemote()
29 task.__class__._initializeDone =
True
36 return (copy.deepcopy(task.output), stat)
40 self.
name = os.getenv(
'HOSTNAME')
48 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
49 User class much inherit from it and implement the methods initializeLocal,
50 initializeRemote, process and finalize. """
51 _initializeDone =
False
53 task = object.__new__( cls )
56 for k,v
in os.environ.items(): task.environ[k] = v
57 task.cwd = os.getcwd()
68 if type(result)
is not type(self.output) :
69 raise TypeError(
"output type is not same as obtained result")
71 if not hasattr( result ,
'__iter__' ):
72 if hasattr(self.output,
'Add') : self.output.Add(result)
73 elif hasattr(self.output,
'__iadd__') : self.output += result
75 else :
raise TypeError(
'result cannot be added')
77 elif type(result)
is dict :
78 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
79 else: minkeys = result.keys()
80 for key
in result.keys() :
82 if hasattr(self.
output[key],
'Add') : self.
output[key].Add(result[key])
83 elif hasattr(self.
output[key],
'__iadd__') : self.
output[key] += result[key]
84 elif hasattr(self.
output[key],
'__add__') : self.
output[key] = self.
output[key] + result[key]
85 else :
raise TypeError(
'result cannot be added')
87 self.
output[key] = result[key]
91 if hasattr(self.
output[i],
'Add') : self.
output[i].Add(result[i])
92 elif hasattr(self.
output[i],
'__iadd__') : self.
output[i] += result[i]
94 else :
raise TypeError(
'result cannot be added')
98 if hasattr(o,
'Reset'): o.Reset()
102 """ Class to in charge of managing the tasks and distributing them to
103 the workers. They can be local (using other cores) or remote
104 using other nodes in the local cluster """
106 def __init__( self, ncpus='autodetect', ppservers=None) :
107 if ncpus ==
'autodetect' : self.
ncpus = multiprocessing.cpu_count()
108 else : self.
ncpus = ncpus
117 self.
mode =
'multicore'
121 if hasattr(self,
'server') : self.server.destroy()
123 def process(self, task, items, timeout=90000):
124 if not isinstance(task,Task) :
125 raise TypeError(
"task argument needs to be an 'Task' instance")
127 task.initializeLocal()
129 if self.
mode ==
'cluster' :
130 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), (
'GaudiMP.Parallel',
'time'))
for item
in items]
133 task._mergeResults(result)
136 self.server.print_stats()
137 elif self.
mode ==
'multicore' :
139 jobs = self.pool.map_async(_ppfunction, zip([task
for i
in items] , items ))
140 for result, stat
in jobs.get(timeout) :
141 task._mergeResults(result)
145 print 'Time elapsed since server creation %f' %(end-start)
150 for stat
in self.stats.values():
152 print 'Job execution statistics:'
153 print 'job count | % of all jobs | job time sum | time per job | job server'
154 for name, stat
in self.stats.items():
155 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
159 s = self.
stats[stat.name]
169 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
172 self.session.read_lazy()
173 self.session.write(
'cd %s\n' % os.getcwd())
174 self.session.read_lazy()
175 self.session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
176 self.session.read_lazy()
177 self.session.write(
'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
178 self.session.read_lazy()
179 self.session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
180 self.session.read_lazy()
181 self.session.write(
'%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
182 self.session.read_lazy()
183 self.session.read_lazy()
184 print 'started ppserver in ', hostname
187 print 'killed ppserver in ', self.
host
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container