Gaudi Framework, version v21r8

Home   Generated: 17 Mar 2010

GaudiPython::Parallel::WorkManager Class Reference

List of all members.

Public Member Functions

def __init__
def __del__
def process

Public Attributes

 ncpus
 ppservers
 sessions
 server
 mode
 pool
 stats

Private Member Functions

def _printStatistics
def _mergeStatistics


Detailed Description

Class to in charge of managing the tasks and distributing them to
    the workers. They can be local (using other cores) or remote
    using other nodes in the local cluster 

Definition at line 128 of file Parallel.py.


Member Function Documentation

def GaudiPython::Parallel::WorkManager::__init__ (   self,
  ncpus = 'autodetect',
  ppservers = None 
)

Definition at line 133 of file Parallel.py.

00133                                                             :
00134         if ncpus == 'autodetect' : self.ncpus = _detect_ncpus()
00135         else :                     self.ncpus = ncpus
00136         if ppservers :
00137             import pp
00138             self.ppservers = ppservers
00139             self.sessions = [ SshSession(srv) for srv in ppservers ]
00140             self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00141             self.mode = 'cluster'
00142         else :
00143             import processing
00144             self.pool = processing.Pool(self.ncpus)
00145             self.mode = 'multicore'
00146         self.stats = {}
00147 
    def __del__(self):

def GaudiPython::Parallel::WorkManager::__del__ (   self  ) 

Definition at line 148 of file Parallel.py.

00148                      :
00149         if hasattr(self,'server') : self.server.destroy()
00150 
    def process(self, task, items, timeout=90000):

def GaudiPython::Parallel::WorkManager::process (   self,
  task,
  items,
  timeout = 90000 
)

Definition at line 151 of file Parallel.py.

00151                                                  :
00152         if not isinstance(task,Task) :
00153             raise TypeError("task argument needs to be an 'Task' instance")
00154         # --- Call the Local initialialization
00155         task.initializeLocal()
00156         # --- Schedule all the jobs ....
00157         if self.mode == 'cluster' :
00158             jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items]
00159             for job in jobs :
00160                 result, stat = job()
00161                 task._mergeResults(result)
00162                 self._mergeStatistics(stat)
00163             self._printStatistics()
00164             self.server.print_stats()
00165         elif self.mode == 'multicore' :
00166             start = time.time()
00167             jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00168             for result, stat in  jobs.get(timeout) :
00169                 task._mergeResults(result)
00170                 self._mergeStatistics(stat)
00171             end = time.time()
00172             self._printStatistics()
00173             print 'Time elapsed since server creation %f' %(end-start)
00174         # --- Call the Local Finalize
00175         task.finalize()
    def _printStatistics(self):

def GaudiPython::Parallel::WorkManager::_printStatistics (   self  )  [private]

Definition at line 176 of file Parallel.py.

00176                               :
00177         njobs = 0
00178         for stat in self.stats.values():
00179             njobs += stat.njob
00180         print 'Job execution statistics:'
00181         print 'job count | % of all jobs | job time sum | time per job | job server'
00182         for name, stat  in self.stats.items():
00183             print '       %d |        %6.2f |     %8.3f |    %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00184 
    def _mergeStatistics(self, stat):

def GaudiPython::Parallel::WorkManager::_mergeStatistics (   self,
  stat 
) [private]

Definition at line 185 of file Parallel.py.

00185                                     :
00186         if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00187         s = self.stats[stat.name]
00188         s.time += stat.time
00189         s.njob += 1
00190 
00191 
class SshSession(object) :


Member Data Documentation

Definition at line 134 of file Parallel.py.

Definition at line 138 of file Parallel.py.

Definition at line 139 of file Parallel.py.

Definition at line 140 of file Parallel.py.

Definition at line 141 of file Parallel.py.

Definition at line 144 of file Parallel.py.

Definition at line 146 of file Parallel.py.


The documentation for this class was generated from the following file:

Generated at Wed Mar 17 18:21:55 2010 for Gaudi Framework, version v21r8 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004