13 """GaudiMP.Parallel module. 
   14 This module provides 'parallel' processing support for GaudiPyhton. 
   15 It is adding some sugar on top of public domain packages such as 
   16 the 'multiprocessing' or the 'pp' packages. The interface can be made 
   17 independent of the underlying implementation package. 
   18 Two main class are defined: Task and WorkManager 
   21 __all__ = [
"Task", 
"WorkManager"]
 
   22 excluded_varnames = [
"HOSTNAME", 
"SSH_CLIENT", 
"SSH_CONNECTION", 
"DISPLAY"]
 
   25 import multiprocessing
 
   32     return f((task, item))
 
   40     if not task.__class__._initializeDone:
 
   41         for k, v 
in task.environ.items():
 
   42             if k 
not in excluded_varnames:
 
   44         task.initializeRemote()
 
   45         task.__class__._initializeDone = 
True 
   52     return (copy.deepcopy(task.output), stat)
 
   57         self.
name = os.getenv(
"HOSTNAME")
 
   67     """Basic base class to encapsulate any processing that is going to be porcessed in parallel. 
   68     User class much inherit from it and implement the methods initializeLocal, 
   69     initializeRemote, process and finalize.""" 
   71     _initializeDone = 
False 
   74         task = object.__new__(cls)
 
   77         for k, v 
in os.environ.items():
 
   79         task.cwd = os.getcwd()
 
   95         if not isinstance(result, 
type(self.output)):
 
   96             raise TypeError(
"output type is not same as obtained result")
 
   98         if not hasattr(result, 
"__iter__"):
 
   99             if hasattr(self.output, 
"Add"):
 
  100                 self.output.Add(result)
 
  101             elif hasattr(self.output, 
"__iadd__"):
 
  102                 self.output += result
 
  103             elif hasattr(self.output, 
"__add__"):
 
  106                 raise TypeError(
"result cannot be added")
 
  108         elif isinstance(result, dict):
 
  109             for key 
in result.keys():
 
  111                     if hasattr(self.
output[key], 
"Add"):
 
  112                         self.
output[key].Add(result[key])
 
  113                     elif hasattr(self.
output[key], 
"__iadd__"):
 
  114                         self.
output[key] += result[key]
 
  115                     elif hasattr(self.
output[key], 
"__add__"):
 
  118                         raise TypeError(
"result cannot be added")
 
  120                     self.
output[key] = result[key]
 
  123             for i 
in range(min(len(self.
output), len(result))):
 
  124                 if hasattr(self.
output[i], 
"Add"):
 
  125                     self.
output[i].Add(result[i])
 
  126                 elif hasattr(self.
output[i], 
"__iadd__"):
 
  127                     self.
output[i] += result[i]
 
  128                 elif hasattr(self.
output[i], 
"__add__"):
 
  131                     raise TypeError(
"result cannot be added")
 
  136             if hasattr(o, 
"Reset"):
 
  141     """Class to in charge of managing the tasks and distributing them to 
  142     the workers. They can be local (using other cores) or remote 
  143     using other nodes in the local cluster""" 
  145     def __init__(self, ncpus="autodetect", ppservers=None):
 
  146         if ncpus == 
"autodetect":
 
  147             self.
ncpus = multiprocessing.cpu_count()
 
  159             self.
mode = 
"multicore" 
  163         if hasattr(self, 
"server"):
 
  165         if hasattr(self, 
"pool"):
 
  168     def process(self, task, items, timeout=90000):
 
  169         if not isinstance(task, Task):
 
  170             raise TypeError(
"task argument needs to be an 'Task' instance")
 
  172         task.initializeLocal()
 
  174         if self.
mode == 
"cluster":
 
  178                     (_ppfunction, task, item),
 
  180                     (
"GaudiMP.Parallel", 
"time"),
 
  186                 task._mergeResults(result)
 
  190         elif self.
mode == 
"multicore":
 
  192             jobs = self.
pool.map_async(_ppfunction, zip([task 
for i 
in items], items))
 
  193             for result, stat 
in jobs.get(timeout):
 
  194                 task._mergeResults(result)
 
  198             print(
"Time elapsed since server creation %f" % (end - start))
 
  204         for stat 
in self.
stats.values():
 
  206         print(
"Job execution statistics:")
 
  207         print(
"job count | % of all jobs | job time sum | time per job | job server")
 
  208         for name, stat 
in self.
stats.items():
 
  210                 "       %d |        %6.2f |     %8.3f |    %8.3f | %s" 
  213                     100.0 * stat.njob / njobs,
 
  215                     stat.time / stat.njob,
 
  221         if stat.name 
not in self.
stats:
 
  223         s = self.
stats[stat.name]
 
  234         ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
 
  238         self.
session.write(
"cd %s\n" % os.getcwd())
 
  240         self.
session.write(
"setenv PYTHONPATH %s\n" % os.environ[
"PYTHONPATH"])
 
  243             "setenv LD_LIBRARY_PATH %s\n" % os.environ[
"LD_LIBRARY_PATH"]
 
  246         self.
session.write(
"setenv ROOTSYS %s\n" % os.environ[
"ROOTSYS"])
 
  249             "%s %s/scripts-%s/ppserver.py \n" 
  250             % (sys.executable, ppprefix, sys.version.split()[0])
 
  254         print(
"started ppserver in ", hostname)
 
  258         print(
"killed ppserver in ", self.
host)