13 """ GaudiMP.Parallel module. 
   14     This module provides 'parallel' processing support for GaudiPyhton. 
   15     It is adding some sugar on top of public domain packages such as 
   16     the 'multiprocessing' or the 'pp' packages. The interface can be made 
   17     independent of the underlying implementation package. 
   18     Two main class are defined: Task and WorkManager 
   20 from __future__ 
import print_function
 
   22 __all__ = [
"Task", 
"WorkManager"]
 
   23 excluded_varnames = [
"HOSTNAME", 
"SSH_CLIENT", 
"SSH_CONNECTION", 
"DISPLAY"]
 
   26 import multiprocessing
 
   33     return f((task, item))
 
   41     if not task.__class__._initializeDone:
 
   42         for k, v 
in task.environ.items():
 
   43             if k 
not in excluded_varnames:
 
   45         task.initializeRemote()
 
   46         task.__class__._initializeDone = 
True 
   53     return (copy.deepcopy(task.output), stat)
 
   58         self.
name = os.getenv(
"HOSTNAME")
 
   68     """Basic base class to encapsulate any processing that is going to be porcessed in parallel. 
   69     User class much inherit from it and implement the methods initializeLocal, 
   70     initializeRemote, process and finalize.""" 
   72     _initializeDone = 
False 
   75         task = object.__new__(cls)
 
   78         for k, v 
in os.environ.items():
 
   80         task.cwd = os.getcwd()
 
   96         if type(result) 
is not type(self.output):
 
   97             raise TypeError(
"output type is not same as obtained result")
 
   99         if not hasattr(result, 
"__iter__"):
 
  100             if hasattr(self.output, 
"Add"):
 
  101                 self.output.Add(result)
 
  102             elif hasattr(self.output, 
"__iadd__"):
 
  103                 self.output += result
 
  104             elif hasattr(self.output, 
"__add__"):
 
  107                 raise TypeError(
"result cannot be added")
 
  109         elif type(result) 
is dict:
 
  113                 minkeys = result.keys()
 
  114             for key 
in result.keys():
 
  116                     if hasattr(self.
output[key], 
"Add"):
 
  117                         self.
output[key].Add(result[key])
 
  118                     elif hasattr(self.
output[key], 
"__iadd__"):
 
  119                         self.
output[key] += result[key]
 
  120                     elif hasattr(self.
output[key], 
"__add__"):
 
  123                         raise TypeError(
"result cannot be added")
 
  125                     self.
output[key] = result[key]
 
  129                 if hasattr(self.
output[i], 
"Add"):
 
  130                     self.
output[i].Add(result[i])
 
  131                 elif hasattr(self.
output[i], 
"__iadd__"):
 
  132                     self.
output[i] += result[i]
 
  133                 elif hasattr(self.
output[i], 
"__add__"):
 
  136                     raise TypeError(
"result cannot be added")
 
  141             if hasattr(o, 
"Reset"):
 
  146     """Class to in charge of managing the tasks and distributing them to 
  147     the workers. They can be local (using other cores) or remote 
  148     using other nodes in the local cluster""" 
  150     def __init__(self, ncpus="autodetect", ppservers=None):
 
  151         if ncpus == 
"autodetect":
 
  152             self.
ncpus = multiprocessing.cpu_count()
 
  164             self.
mode = 
"multicore" 
  168         if hasattr(self, 
"server"):
 
  170         if hasattr(self, 
"pool"):
 
  173     def process(self, task, items, timeout=90000):
 
  174         if not isinstance(task, Task):
 
  175             raise TypeError(
"task argument needs to be an 'Task' instance")
 
  177         task.initializeLocal()
 
  179         if self.
mode == 
"cluster":
 
  183                     (_ppfunction, task, item),
 
  185                     (
"GaudiMP.Parallel", 
"time"),
 
  191                 task._mergeResults(result)
 
  195         elif self.
mode == 
"multicore":
 
  197             jobs = self.
pool.map_async(_ppfunction, zip([task 
for i 
in items], items))
 
  198             for result, stat 
in jobs.get(timeout):
 
  199                 task._mergeResults(result)
 
  203             print(
"Time elapsed since server creation %f" % (end - start))
 
  209         for stat 
in self.
stats.values():
 
  211         print(
"Job execution statistics:")
 
  212         print(
"job count | % of all jobs | job time sum | time per job | job server")
 
  215                 "       %d |        %6.2f |     %8.3f |    %8.3f | %s" 
  218                     100.0 * stat.njob / njobs,
 
  220                     stat.time / stat.njob,
 
  226         if stat.name 
not in self.
stats:
 
  228         s = self.
stats[stat.name]
 
  239         ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
 
  243         self.
session.write(
"cd %s\n" % os.getcwd())
 
  245         self.
session.write(
"setenv PYTHONPATH %s\n" % os.environ[
"PYTHONPATH"])
 
  248             "setenv LD_LIBRARY_PATH %s\n" % os.environ[
"LD_LIBRARY_PATH"]
 
  251         self.
session.write(
"setenv ROOTSYS %s\n" % os.environ[
"ROOTSYS"])
 
  254             "%s %s/scripts-%s/ppserver.py \n" 
  255             % (sys.executable, ppprefix, sys.version.split()[0])
 
  259         print(
"started ppserver in ", hostname)
 
  263         print(
"killed ppserver in ", self.
host)