3 """ GaudiMP.Parallel module. 4 This module provides 'parallel' processing support for GaudiPyhton. 5 It is adding some sugar on top of public domain packages such as 6 the 'multiprocessing' or the 'pp' packages. The interface can be made 7 independent of the underlying implementation package. 8 Two main class are defined: Task and WorkManager 10 from __future__
import print_function
12 __all__ = [
'Task',
'WorkManager']
13 excluded_varnames = [
'HOSTNAME',
'SSH_CLIENT',
'SSH_CONNECTION',
'DISPLAY']
19 import multiprocessing
23 return f((task, item))
31 if not task.__class__._initializeDone:
32 for k, v
in task.environ.items():
33 if k
not in excluded_varnames:
35 task.initializeRemote()
36 task.__class__._initializeDone =
True 43 return (copy.deepcopy(task.output), stat)
48 self.
name = os.getenv(
'HOSTNAME')
58 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel. 59 User class much inherit from it and implement the methods initializeLocal, 60 initializeRemote, process and finalize. """ 61 _initializeDone =
False 64 task = object.__new__(cls)
67 for k, v
in os.environ.items():
69 task.cwd = os.getcwd()
85 if type(result)
is not type(self.output):
86 raise TypeError(
"output type is not same as obtained result")
88 if not hasattr(result,
'__iter__'):
89 if hasattr(self.output,
'Add'):
90 self.output.Add(result)
91 elif hasattr(self.output,
'__iadd__'):
93 elif hasattr(self.output,
'__add__'):
96 raise TypeError(
'result cannot be added')
98 elif type(result)
is dict:
99 if self.
output.keys() <= result.keys():
100 minkeys = self.
output.keys()
102 minkeys = result.keys()
103 for key
in result.keys():
105 if hasattr(self.
output[key],
'Add'):
106 self.
output[key].Add(result[key])
107 elif hasattr(self.
output[key],
'__iadd__'):
108 self.
output[key] += result[key]
109 elif hasattr(self.
output[key],
'__add__'):
112 raise TypeError(
'result cannot be added')
114 self.
output[key] = result[key]
118 if hasattr(self.
output[i],
'Add'):
119 self.
output[i].Add(result[i])
120 elif hasattr(self.
output[i],
'__iadd__'):
121 self.
output[i] += result[i]
122 elif hasattr(self.
output[i],
'__add__'):
125 raise TypeError(
'result cannot be added')
131 if hasattr(o,
'Reset'):
136 """ Class to in charge of managing the tasks and distributing them to 137 the workers. They can be local (using other cores) or remote 138 using other nodes in the local cluster """ 140 def __init__(self, ncpus='autodetect', ppservers=None):
141 if ncpus ==
'autodetect':
142 self.
ncpus = multiprocessing.cpu_count()
153 self.
mode =
'multicore' 157 if hasattr(self,
'server'):
160 def process(self, task, items, timeout=90000):
161 if not isinstance(task, Task):
162 raise TypeError(
"task argument needs to be an 'Task' instance")
164 task.initializeLocal()
166 if self.
mode ==
'cluster':
168 self.
server.submit(_prefunction, (_ppfunction, task, item), (),
169 (
'GaudiMP.Parallel',
'time'))
174 task._mergeResults(result)
178 elif self.
mode ==
'multicore':
180 jobs = self.
pool.map_async(_ppfunction,
181 zip([task
for i
in items], items))
182 for result, stat
in jobs.get(timeout):
183 task._mergeResults(result)
187 print(
'Time elapsed since server creation %f' % (end - start))
193 for stat
in self.
stats.values():
195 print(
'Job execution statistics:')
197 'job count | % of all jobs | job time sum | time per job | job server' 200 print(
' %d | %6.2f | %8.3f | %8.3f | %s' %
201 (stat.njob, 100. * stat.njob / njobs, stat.time,
202 stat.time / stat.njob, name))
205 if stat.name
not in self.
stats:
207 s = self.
stats[stat.name]
217 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
221 self.
session.write(
'cd %s\n' % os.getcwd())
223 self.
session.write(
'setenv PYTHONPATH %s\n' % os.environ[
'PYTHONPATH'])
226 'setenv LD_LIBRARY_PATH %s\n' % os.environ[
'LD_LIBRARY_PATH'])
228 self.
session.write(
'setenv ROOTSYS %s\n' % os.environ[
'ROOTSYS'])
230 self.
session.write(
'%s %s/scripts-%s/ppserver.py \n' %
231 (sys.executable, ppprefix, sys.version.split()[0]))
234 print(
'started ppserver in ', hostname)
238 print(
'killed ppserver in ', self.
host)
def _mergeStatistics(self, stat)
def __new__(cls, *args, **kwargs)
EventIDBase min(const EventIDBase &lhs, const EventIDBase &rhs)
def _printStatistics(self)
def __init__(self, ncpus='autodetect', ppservers=None)
def initializeLocal(self)
def __init__(self, hostname)
def process(self, task, items, timeout=90000)
def _prefunction(f, task, item)
def _mergeResults(self, result)
def initializeRemote(self)
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.