|
Gaudi Framework, version v21r9 |
| Home | Generated: 3 May 2010 |
Public Member Functions | |
| def | __init__ |
| def | __del__ |
| def | process |
Public Attributes | |
| ncpus | |
| ppservers | |
| sessions | |
| server | |
| mode | |
| pool | |
| stats | |
Private Member Functions | |
| def | _printStatistics |
| def | _mergeStatistics |
Class to in charge of managing the tasks and distributing them to
the workers. They can be local (using other cores) or remote
using other nodes in the local cluster Definition at line 128 of file Parallel.py.
| def GaudiPython::Parallel::WorkManager::__init__ | ( | self, | ||
ncpus = 'autodetect', |
||||
ppservers = None | ||||
| ) |
Definition at line 133 of file Parallel.py.
00133 : 00134 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus() 00135 else : self.ncpus = ncpus 00136 if ppservers : 00137 import pp 00138 self.ppservers = ppservers 00139 self.sessions = [ SshSession(srv) for srv in ppservers ] 00140 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers) 00141 self.mode = 'cluster' 00142 else : 00143 import processing 00144 self.pool = processing.Pool(self.ncpus) 00145 self.mode = 'multicore' 00146 self.stats = {} 00147 def __del__(self):
| def GaudiPython::Parallel::WorkManager::__del__ | ( | self | ) |
Definition at line 148 of file Parallel.py.
00148 : 00149 if hasattr(self,'server') : self.server.destroy() 00150 def process(self, task, items, timeout=90000):
| def GaudiPython::Parallel::WorkManager::process | ( | self, | ||
| task, | ||||
| items, | ||||
timeout = 90000 | ||||
| ) |
Definition at line 151 of file Parallel.py.
00151 : 00152 if not isinstance(task,Task) : 00153 raise TypeError("task argument needs to be an 'Task' instance") 00154 # --- Call the Local initialialization 00155 task.initializeLocal() 00156 # --- Schedule all the jobs .... 00157 if self.mode == 'cluster' : 00158 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items] 00159 for job in jobs : 00160 result, stat = job() 00161 task._mergeResults(result) 00162 self._mergeStatistics(stat) 00163 self._printStatistics() 00164 self.server.print_stats() 00165 elif self.mode == 'multicore' : 00166 start = time.time() 00167 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items )) 00168 for result, stat in jobs.get(timeout) : 00169 task._mergeResults(result) 00170 self._mergeStatistics(stat) 00171 end = time.time() 00172 self._printStatistics() 00173 print 'Time elapsed since server creation %f' %(end-start) 00174 # --- Call the Local Finalize 00175 task.finalize() def _printStatistics(self):
| def GaudiPython::Parallel::WorkManager::_printStatistics | ( | self | ) | [private] |
Definition at line 176 of file Parallel.py.
00176 : 00177 njobs = 0 00178 for stat in self.stats.values(): 00179 njobs += stat.njob 00180 print 'Job execution statistics:' 00181 print 'job count | % of all jobs | job time sum | time per job | job server' 00182 for name, stat in self.stats.items(): 00183 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name) 00184 def _mergeStatistics(self, stat):
| def GaudiPython::Parallel::WorkManager::_mergeStatistics | ( | self, | ||
| stat | ||||
| ) | [private] |
Definition at line 185 of file Parallel.py.
00185 : 00186 if stat.name not in self.stats : self.stats[stat.name] = Statistics() 00187 s = self.stats[stat.name] 00188 s.time += stat.time 00189 s.njob += 1 00190 00191 class SshSession(object) :
Definition at line 134 of file Parallel.py.
Definition at line 138 of file Parallel.py.
Definition at line 139 of file Parallel.py.
Definition at line 140 of file Parallel.py.
Definition at line 141 of file Parallel.py.
Definition at line 144 of file Parallel.py.
Definition at line 146 of file Parallel.py.