GaudiMP.Parallel.WorkManager Class Reference
Inheritance diagram for GaudiMP.Parallel.WorkManager:
Collaboration diagram for GaudiMP.Parallel.WorkManager:

Public Member Functions

def __init__
 
def __del__ (self)
 
def process
 
def __init__
 
def __del__ (self)
 
def process
 

Public Attributes

 ncpus
 
 ppservers
 
 sessions
 
 server
 
 mode
 
 pool
 
 stats
 

Private Member Functions

def _printStatistics (self)
 
def _mergeStatistics (self, stat)
 
def _printStatistics (self)
 
def _mergeStatistics (self, stat)
 

Detailed Description

Class to in charge of managing the tasks and distributing them to
    the workers. They can be local (using other cores) or remote
    using other nodes in the local cluster 

Definition at line 101 of file Parallel.py.

Constructor & Destructor Documentation

def GaudiMP.Parallel.WorkManager.__init__ (   self,
  ncpus = 'autodetect',
  ppservers = None 
)

Definition at line 106 of file Parallel.py.

106  def __init__( self, ncpus='autodetect', ppservers=None) :
107  if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
108  else : self.ncpus = ncpus
109  if ppservers :
110  import pp
111  self.ppservers = ppservers
112  self.sessions = [ SshSession(srv) for srv in ppservers ]
113  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
114  self.mode = 'cluster'
115  else :
116  self.pool = multiprocessing.Pool(self.ncpus)
117  self.mode = 'multicore'
118  self.stats = {}
119 
def GaudiMP.Parallel.WorkManager.__del__ (   self)

Definition at line 120 of file Parallel.py.

120  def __del__(self):
121  if hasattr(self,'server') : self.server.destroy()
122 
def GaudiMP.Parallel.WorkManager.__init__ (   self,
  ncpus = 'autodetect',
  ppservers = None 
)

Definition at line 106 of file Parallel.py.

106  def __init__( self, ncpus='autodetect', ppservers=None) :
107  if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count()
108  else : self.ncpus = ncpus
109  if ppservers :
110  import pp
111  self.ppservers = ppservers
112  self.sessions = [ SshSession(srv) for srv in ppservers ]
113  self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
114  self.mode = 'cluster'
115  else :
116  self.pool = multiprocessing.Pool(self.ncpus)
117  self.mode = 'multicore'
118  self.stats = {}
119 
def GaudiMP.Parallel.WorkManager.__del__ (   self)

Definition at line 120 of file Parallel.py.

120  def __del__(self):
121  if hasattr(self,'server') : self.server.destroy()
122 

Member Function Documentation

def GaudiMP.Parallel.WorkManager._mergeStatistics (   self,
  stat 
)
private

Definition at line 157 of file Parallel.py.

157  def _mergeStatistics(self, stat):
158  if stat.name not in self.stats : self.stats[stat.name] = Statistics()
159  s = self.stats[stat.name]
160  s.time += stat.time
161  s.njob += 1
162 
163 
def _mergeStatistics(self, stat)
Definition: Parallel.py:157
def GaudiMP.Parallel.WorkManager._mergeStatistics (   self,
  stat 
)
private

Definition at line 157 of file Parallel.py.

157  def _mergeStatistics(self, stat):
158  if stat.name not in self.stats : self.stats[stat.name] = Statistics()
159  s = self.stats[stat.name]
160  s.time += stat.time
161  s.njob += 1
162 
163 
def _mergeStatistics(self, stat)
Definition: Parallel.py:157
def GaudiMP.Parallel.WorkManager._printStatistics (   self)
private

Definition at line 148 of file Parallel.py.

148  def _printStatistics(self):
149  njobs = 0
150  for stat in self.stats.values():
151  njobs += stat.njob
152  print 'Job execution statistics:'
153  print 'job count | % of all jobs | job time sum | time per job | job server'
154  for name, stat in self.stats.items():
155  print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
156 
def GaudiMP.Parallel.WorkManager._printStatistics (   self)
private

Definition at line 148 of file Parallel.py.

148  def _printStatistics(self):
149  njobs = 0
150  for stat in self.stats.values():
151  njobs += stat.njob
152  print 'Job execution statistics:'
153  print 'job count | % of all jobs | job time sum | time per job | job server'
154  for name, stat in self.stats.items():
155  print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
156 
def GaudiMP.Parallel.WorkManager.process (   self,
  task,
  items,
  timeout = 90000 
)

Definition at line 123 of file Parallel.py.

123  def process(self, task, items, timeout=90000):
124  if not isinstance(task,Task) :
125  raise TypeError("task argument needs to be an 'Task' instance")
126  # --- Call the Local initialialization
127  task.initializeLocal()
128  # --- Schedule all the jobs ....
129  if self.mode == 'cluster' :
130  jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
131  for job in jobs :
132  result, stat = job()
133  task._mergeResults(result)
134  self._mergeStatistics(stat)
135  self._printStatistics()
136  self.server.print_stats()
137  elif self.mode == 'multicore' :
138  start = time.time()
139  jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
140  for result, stat in jobs.get(timeout) :
141  task._mergeResults(result)
142  self._mergeStatistics(stat)
143  end = time.time()
144  self._printStatistics()
145  print 'Time elapsed since server creation %f' %(end-start)
146  # --- Call the Local Finalize
147  task.finalize()
def _mergeStatistics(self, stat)
Definition: Parallel.py:157
def GaudiMP.Parallel.WorkManager.process (   self,
  task,
  items,
  timeout = 90000 
)

Definition at line 123 of file Parallel.py.

123  def process(self, task, items, timeout=90000):
124  if not isinstance(task,Task) :
125  raise TypeError("task argument needs to be an 'Task' instance")
126  # --- Call the Local initialialization
127  task.initializeLocal()
128  # --- Schedule all the jobs ....
129  if self.mode == 'cluster' :
130  jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items]
131  for job in jobs :
132  result, stat = job()
133  task._mergeResults(result)
134  self._mergeStatistics(stat)
135  self._printStatistics()
136  self.server.print_stats()
137  elif self.mode == 'multicore' :
138  start = time.time()
139  jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
140  for result, stat in jobs.get(timeout) :
141  task._mergeResults(result)
142  self._mergeStatistics(stat)
143  end = time.time()
144  self._printStatistics()
145  print 'Time elapsed since server creation %f' %(end-start)
146  # --- Call the Local Finalize
147  task.finalize()
def _mergeStatistics(self, stat)
Definition: Parallel.py:157

Member Data Documentation

GaudiMP.Parallel.WorkManager.mode

Definition at line 114 of file Parallel.py.

GaudiMP.Parallel.WorkManager.ncpus

Definition at line 107 of file Parallel.py.

GaudiMP.Parallel.WorkManager.pool

Definition at line 116 of file Parallel.py.

GaudiMP.Parallel.WorkManager.ppservers

Definition at line 111 of file Parallel.py.

GaudiMP.Parallel.WorkManager.server

Definition at line 113 of file Parallel.py.

GaudiMP.Parallel.WorkManager.sessions

Definition at line 112 of file Parallel.py.

GaudiMP.Parallel.WorkManager.stats

Definition at line 118 of file Parallel.py.


The documentation for this class was generated from the following file: