![]() |
|
|
Generated: 8 Jan 2009 |
Class to in charge of managing the tasks and distributing them to
the workers. They can be local (using other cores) or remote
using other nodes in the local cluster Definition at line 120 of file Parallel.py.
Public Member Functions | |
| def | __init__ |
| def | __del__ |
| def | process |
Public Attributes | |
| ncpus | |
| ppservers | |
| sessions | |
| server | |
| mode | |
| pool | |
| stats | |
Private Member Functions | |
| def | _printStatistics |
| def | _mergeStatistics |
| def GaudiPython::Parallel::WorkManager::__init__ | ( | self, | ||
ncpus = 'autodetect', |
||||
ppservers = None | ||||
| ) |
Definition at line 125 of file Parallel.py.
00125 : 00126 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus() 00127 else : self.ncpus = ncpus 00128 if ppservers : 00129 import pp 00130 self.ppservers = ppservers 00131 self.sessions = [ SshSession(srv) for srv in ppservers ] 00132 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers) 00133 self.mode = 'cluster' 00134 else : 00135 import processing 00136 self.pool = processing.Pool(self.ncpus) 00137 self.mode = 'multicore' 00138 self.stats = {} 00139 def __del__(self):
| def GaudiPython::Parallel::WorkManager::__del__ | ( | self | ) |
Definition at line 140 of file Parallel.py.
00140 : 00141 if hasattr(self,'server') : self.server.destroy() 00142 def process(self, task, items, timeout=90000):
| def GaudiPython::Parallel::WorkManager::process | ( | self, | ||
| task, | ||||
| items, | ||||
timeout = 90000 | ||||
| ) |
Definition at line 143 of file Parallel.py.
00143 : 00144 if not isinstance(task,Task) : 00145 raise TypeError("task argument needs to be an 'Task' instance") 00146 # --- Call the Local initialialization 00147 task.initializeLocal() 00148 # --- Schedule all the jobs .... 00149 if self.mode == 'cluster' : 00150 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items] 00151 for job in jobs : 00152 result, stat = job() 00153 task._mergeResults(result) 00154 self._mergeStatistics(stat) 00155 self._printStatistics() 00156 self.server.print_stats() 00157 elif self.mode == 'multicore' : 00158 start = time.time() 00159 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items )) 00160 for result, stat in jobs.get(timeout) : 00161 task._mergeResults(result) 00162 self._mergeStatistics(stat) 00163 end = time.time() 00164 self._printStatistics() 00165 print 'Time elapsed since server creation %f' %(end-start) 00166 # --- Call te Local Finalize 00167 task.finalize() def _printStatistics(self):
| def GaudiPython::Parallel::WorkManager::_printStatistics | ( | self | ) | [private] |
Definition at line 168 of file Parallel.py.
00168 : 00169 njobs = 0 00170 for stat in self.stats.values(): 00171 njobs += stat.njob 00172 print 'Job execution statistics:' 00173 print 'job count | % of all jobs | job time sum | time per job | job server' 00174 for name, stat in self.stats.items(): 00175 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name) 00176 def _mergeStatistics(self, stat):
| def GaudiPython::Parallel::WorkManager::_mergeStatistics | ( | self, | ||
| stat | ||||
| ) | [private] |
Definition at line 177 of file Parallel.py.
00177 : 00178 if stat.name not in self.stats : self.stats[stat.name] = Statistics() 00179 s = self.stats[stat.name] 00180 s.time += stat.time 00181 s.njob += 1 00182 00183 class SshSession(object) :
Definition at line 126 of file Parallel.py.
Definition at line 130 of file Parallel.py.
Definition at line 131 of file Parallel.py.
Definition at line 132 of file Parallel.py.
Definition at line 133 of file Parallel.py.
Definition at line 136 of file Parallel.py.
Definition at line 138 of file Parallel.py.