00001
00002
00003
00004 """ GaudiPython.Parallel module.
00005 This module provides 'parallel' processing support for GaudiPyhton.
00006 It is adding some sugar on top of public domain packages such as
00007 the 'processing' or the 'pp' packages. The interface can be made
00008 independent of the underlying implementation package.
00009 Two main class are defined: Task and WorkManager
00010 """
00011
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014
00015 import sys, os, time, copy
00016
00017
00018 from processing import Process, Queue, Pool, currentProcess
00019 from ROOT import TBufferFile, TBuffer
00020 from Gaudi.Configuration import appendPostConfigAction, Configurable, INFO, ERROR, VERBOSE
00021 from GaudiPython import AppMgr, gbl, setOwnership, SUCCESS, PyAlgorithm
00022
00023
00024
00025 def _prefunction( f, task, item) :
00026 return f((task,item))
00027 def _ppfunction( args ) :
00028
00029 task, item = args
00030 stat = Statistics()
00031
00032 if not task.__class__._initializeDone :
00033 for k,v in task.environ.items() :
00034 if k not in excluded_varnames : os.environ[k] = v
00035 task.initializeRemote()
00036 task.__class__._initializeDone = True
00037
00038 task._resetOutput()
00039
00040 task.process(item)
00041
00042 stat.stop()
00043 return (copy.deepcopy(task.output), stat)
00044
00045 def _detect_ncpus():
00046 """Detects the number of effective CPUs in the system"""
00047
00048 if hasattr(os, "sysconf"):
00049 if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
00050
00051 ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
00052 if isinstance(ncpus, int) and ncpus > 0:
00053 return ncpus
00054 else:
00055
00056 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
00057
00058 if os.environ.has_key("NUMBER_OF_PROCESSORS"):
00059 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
00060 if ncpus > 0:
00061 return ncpus
00062
00063 return 1
00064
00065 class Statistics(object):
00066 def __init__(self):
00067 self.name = os.getenv('HOSTNAME')
00068 self.start = time.time()
00069 self.time = 0.0
00070 self.njob = 0
00071 def stop(self):
00072 self.time = time.time() - self.start
00073
00074 class Task(object) :
00075 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00076 User class much inherit from it and implement the methods initializeLocal,
00077 initializeRemote, process and finalize. """
00078 _initializeDone = False
00079 def __new__ ( cls, *args, **kwargs ):
00080 task = object.__new__( cls, *args, **kwargs )
00081 task.output = ()
00082 task.environ = {}
00083 for k,v in os.environ.items(): task.environ[k] = v
00084 task.cwd = os.getcwd()
00085 return task
00086 def initializeLocal(self):
00087 pass
00088 def initializeRemote(self):
00089 pass
00090 def process(self, item):
00091 pass
00092 def finalize(self) :
00093 pass
00094 def _mergeResults(self, result) :
00095 if type(result) is not type(self.output) :
00096 raise TypeError("output type is not same as obtained result")
00097
00098 if not hasattr( result , '__iter__' ):
00099 if hasattr(self.output,'Add') : self.output.Add(result)
00100 elif hasattr(self.output,'__iadd__') : self.output += result
00101 elif hasattr(self.output,'__add__') : self.output = self.output + result
00102 else : raise TypeError('result cannot be added')
00103
00104 elif type(result) is dict :
00105 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00106 else: minkeys = result.keys()
00107 for key in result.keys() :
00108 if key in self.output :
00109 if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00110 elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00111 elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00112 else : raise TypeError('result cannot be added')
00113 else :
00114 self.output[key] = result[key]
00115
00116 else :
00117 for i in range( min( len(self.output) , len(result)) ):
00118 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00119 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00120 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00121 else : raise TypeError('result cannot be added')
00122 def _resetOutput(self):
00123 output = (type(self.output) is dict) and self.output.values() or self.output
00124 for o in output :
00125 if hasattr(o, 'Reset'): o.Reset()
00126
00127
00128 class WorkManager(object) :
00129 """ Class to in charge of managing the tasks and distributing them to
00130 the workers. They can be local (using other cores) or remote
00131 using other nodes in the local cluster """
00132
00133 def __init__( self, ncpus='autodetect', ppservers=None) :
00134 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus()
00135 else : self.ncpus = ncpus
00136 if ppservers :
00137 import pp
00138 self.ppservers = ppservers
00139 self.sessions = [ SshSession(srv) for srv in ppservers ]
00140 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00141 self.mode = 'cluster'
00142 else :
00143 import processing
00144 self.pool = processing.Pool(self.ncpus)
00145 self.mode = 'multicore'
00146 self.stats = {}
00147
00148 def __del__(self):
00149 if hasattr(self,'server') : self.server.destroy()
00150
00151 def process(self, task, items, timeout=90000):
00152 if not isinstance(task,Task) :
00153 raise TypeError("task argument needs to be an 'Task' instance")
00154
00155 task.initializeLocal()
00156
00157 if self.mode == 'cluster' :
00158 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items]
00159 for job in jobs :
00160 result, stat = job()
00161 task._mergeResults(result)
00162 self._mergeStatistics(stat)
00163 self._printStatistics()
00164 self.server.print_stats()
00165 elif self.mode == 'multicore' :
00166 start = time.time()
00167 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00168 for result, stat in jobs.get(timeout) :
00169 task._mergeResults(result)
00170 self._mergeStatistics(stat)
00171 end = time.time()
00172 self._printStatistics()
00173 print 'Time elapsed since server creation %f' %(end-start)
00174
00175 task.finalize()
00176 def _printStatistics(self):
00177 njobs = 0
00178 for stat in self.stats.values():
00179 njobs += stat.njob
00180 print 'Job execution statistics:'
00181 print 'job count | % of all jobs | job time sum | time per job | job server'
00182 for name, stat in self.stats.items():
00183 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00184
00185 def _mergeStatistics(self, stat):
00186 if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00187 s = self.stats[stat.name]
00188 s.time += stat.time
00189 s.njob += 1
00190
00191
00192 class SshSession(object) :
00193 def __init__(self, hostname):
00194 import pyssh
00195 import pp
00196 self.host = hostname
00197 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
00198 self.session = pyssh.Ssh(host=hostname)
00199 self.session.open()
00200 self.session.read_lazy()
00201 self.session.write('cd %s\n' % os.getcwd())
00202 self.session.read_lazy()
00203 self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00204 self.session.read_lazy()
00205 self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00206 self.session.read_lazy()
00207 self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00208 self.session.read_lazy()
00209 self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00210 self.session.read_lazy()
00211 self.session.read_lazy()
00212 print 'started ppserver in ', hostname
00213 def __del__(self):
00214 self.session.close()
00215 print 'killed ppserver in ', self.host
00216
00217
00218
00219
00220
00221
00222
00223 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00224 aidatypes = ( gbl.AIDA.IHistogram,
00225 gbl.AIDA.IHistogram1D,
00226 gbl.AIDA.IHistogram2D,
00227 gbl.AIDA.IHistogram3D,
00228 gbl.AIDA.IProfile1D,
00229 gbl.AIDA.IProfile2D )
00230 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00231 gppHead = '[ GaudiPythonParallel ] '
00232 line = '-'*80
00233
00234 def setupSystem(nWorkers, config) :
00235 qLimit = 50
00236 inq = Queue( )
00237 outQs = [ Queue( qLimit ) for i in xrange(nWorkers) ]
00238
00239 commonQueue = Queue( )
00240 qToParent = Queue( )
00241
00242 cStatQueue = Queue( )
00243 rHistQ = Queue( )
00244
00245
00246 ks = config.keys()
00247 ks.sort()
00248
00249
00250 apps = ['Gauss', 'Boole', 'Brunel', 'DaVinci']
00251 _app = None
00252 for app in apps :
00253 if app in ks : _app = app
00254
00255
00256 outs = [ 'Output', 'OutputFile', 'OutStream' ]
00257 outlist = []
00258 for k in ks :
00259 for o in outs :
00260 if hasattr( config[k], o ) : outlist.append(k)
00261
00262 itmlst = getOutputList( config )
00263
00264
00265 w = Writer( commonQueue, outQs, cStatQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app )
00266 wks = [Worker(i, inq, commonQueue, outQs[i], cStatQueue, qToParent, nWorkers, config, qLimit, _app, itemlist=itmlst) for i in xrange(nWorkers)]
00267 r = Reader( inq, commonQueue, rHistQ, qToParent, nWorkers, config, qLimit, _app )
00268
00269 nProcesses = nWorkers+2
00270 nSenders = nWorkers+1
00271 nc = 0
00272
00273
00274 store = [] ; hFlagCt = 0
00275 while nc < nProcesses :
00276 item = qToParent.get()
00277 if item == 'h' : print 'HFLAG RECEIVED BY SETUPSYSTEM' ; [commonQueue.put(None) for i in xrange(nSenders)]
00278 if not item : nc += 1
00279
00280 wks = []
00281 for dictionary in store :
00282 if dictionary['name'] == 'Worker' : wks.append(dictionary)
00283 if dictionary['name'] == 'Reader' : reader = dictionary
00284 if dictionary['name'] == 'Writer' : writer = dictionary
00285
00286
00287
00288
00289
00290 return True
00291
00292 def getOutputList( configuration ) :
00293 writers = configuration['ApplicationMgr'].OutStream
00294 print 'setupSystem: OutStreams identified : ', writers
00295
00296
00297 outct = 0
00298 itmlst = []
00299 for w in writers :
00300 if hasattr(w,'Output') :
00301 for itm in w.ItemList + w.OptItemList :
00302 hsh = itm.find('#')
00303 itmlst.append(itm[:hsh])
00304 return itmlst
00305
00306 def dumpHistograms( hvt, node='Unspecified', omitList=[] ) :
00307 nlist = hvt.getHistoNames( )
00308 throw = []
00309 for om in omitList :
00310 for name in nlist :
00311 if name.startswith(om) : throw.append(name)
00312 [ nlist.remove(name) for name in throw ]
00313 del throw
00314 histDict = {}
00315 objects = 0 ; histos = 0
00316 if nlist :
00317 for n in nlist :
00318 o = hvt[ n ]
00319 if type(o) in aidatypes :
00320 o = aida2root(o)
00321 histos += 1
00322 else :
00323 objects += 1
00324 histDict[ n ] = o
00325 else :
00326 print head+'WARNING : no histograms to recover?'
00327
00328
00329
00330
00331
00332
00333 return histDict
00334
00335 def dumpTES( someClass ) :
00336 buf = TBufferFile(TBuffer.kWrite)
00337 someClass.ts.dumpBuffer(buf)
00338 return buf
00339
00340 def dumpTEStoMessage( self ) :
00341 buf = TMessage()
00342 self.ts.dumpBuffer(buf)
00343 return buf
00344
00345 def loadTES( someClass, tbuf ) :
00346
00347
00348
00349
00350 tbuf.SetBufferOffset() ; tbuf.SetReadMode()
00351 root = gbl.DataObject()
00352 someClass.evt.setRoot('/Event', root)
00353 setOwnership(root, False)
00354 someClass.ts.loadBuffer(tbuf)
00355
00356 class CollectHistograms( PyAlgorithm ) :
00357 def __init__( self, node ) :
00358 self.node = node
00359 self.omit = node.omitHistos
00360 PyAlgorithm.__init__( self )
00361 return None
00362 def execute( self ) :
00363 return SUCCESS
00364 def finalize( self ) :
00365 header = 'CollectHistograms : '
00366 w = self.node
00367 nodeName = w.__class__.__name__
00368
00369 w.histDict = dumpHistograms( w.hvt, node=nodeName, omitList= self.omit )
00370
00371
00372 ks = w.histDict.keys()
00373
00374 chunk = 100
00375 reps = len(ks)/chunk + 1
00376 for i in xrange(reps) :
00377 someKeys = ks[i*chunk : (i+1)*chunk]
00378 smalld = dict( [(key, w.histDict[key]) for key in someKeys] )
00379 w.cstatq.put( (w.id, smalld) )
00380 w.cstatq.put( None )
00381
00382 for item in iter(w.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(w.id)
00383
00384
00385 w.hvt.clearStore()
00386 root = gbl.DataObject()
00387 w.hvt.setRoot('/stat', root)
00388 w.hvt.dump()
00389
00390 return SUCCESS
00391
00392
00393
00394
00395
00396 class Reader( ) :
00397 def __init__( self, inq, commonQueue, rstatq, qToParent, workers, config, qLimit, _app ) :
00398 self.inq = inq
00399 self.c = config
00400 self.commonQ = commonQueue
00401 self.cstatq = rstatq
00402 self.workers = workers
00403 self.qToParent = qToParent
00404 self.qLimit = qLimit
00405 self.id = -1
00406 self._app = _app
00407
00408
00409
00410
00411 r = Process( target=self.read )
00412 r.start()
00413
00414 def readerConfig( self ) :
00415 ks = self.c.keys()
00416 if 'ApplicationMgr' in ks :
00417 self.c['ApplicationMgr'].OutStream = []
00418 if self._app == 'Gauss' : pass
00419 else : self.c['ApplicationMgr'].TopAlg = []
00420 else :
00421 self.qToParent.put(None)
00422 print 'Reader : readerConfig : ApplicationMgr not available for configuration?'
00423
00424 try : self.c['HistogramPersistencySvc'].OutputFile = ''
00425 except : print 'Reader : No Histogram output to cancel! Config continues...'
00426
00427 if 'MessageSvc' in ks :
00428 self.c['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
00429 self.c['MessageSvc'].OutputLevel = ERROR
00430 else :
00431 self.qToParent.put(None)
00432 print 'Reader : readerConfig : MessageSvc not available for configuration?'
00433 if self._app == 'Gauss' :
00434 gs = self.c['GaussSequencer']
00435
00436
00437 ed = [gs.Members[0]]
00438 gs.Members = ed
00439
00440 def read( self ):
00441 currentProcess().setName('+Reader+')
00442 appendPostConfigAction( self.readerConfig() )
00443
00444
00445
00446 self.ct = 0
00447
00448
00449 self.a = AppMgr()
00450 self.evt = self.a.evtsvc()
00451 self.hvt = self.a.histsvc()
00452 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00453 self.omitHistos = ['/stat/CaloPIDs']
00454 collectHistos = CollectHistograms( self )
00455 self.a.addAlgorithm( collectHistos )
00456 self.a.initialize()
00457 self.a.start()
00458
00459 for i in xrange( self.c['ApplicationMgr'].EvtMax ) :
00460 self.a.run(1)
00461 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00462
00463 if self._app == 'Gauss' :
00464 if self.evt.getHistoNames() :
00465 for i in self.evt.getHistoNames() : self.ts.addOptItem(i,1)
00466 else :
00467 if self.evt.getList() :
00468 for i in self.evt.getList() : self.ts.addOptItem(i,1)
00469 buf = dumpTES( self )
00470 sent = False
00471 while sent == False :
00472 try :
00473 self.inq.put( buf, block=True )
00474 while self.inq._buffer : pass
00475 sent = True ; self.ct += 1
00476 except :
00477 self.errs += 1
00478 self.evt.clearStore()
00479
00480 for w in range(self.workers) : self.inq.put(None)
00481
00482
00483
00484 self.a.stop()
00485 self.a.finalize()
00486
00487
00488
00489
00490
00491
00492
00493 self.qToParent.put(None)
00494
00495
00496
00497
00498
00499 class Worker( ) :
00500 def __init__( self, wid, inq, cq, outq, cstatq, qToParent, nprocs, config, qLimit, _app, itemlist=None ) :
00501
00502
00503
00504
00505
00506 self.id = wid
00507 self.c = config
00508 self.inq = inq
00509 self.qLimit = qLimit
00510 self.commonQ = cq
00511 self.cstatq = cstatq
00512 self.qToParent = qToParent
00513 self.nprocs = nprocs
00514 self.outList = []
00515 self.tempCt = 0
00516 self.putErrs = 0
00517 self.completed = 0
00518 self.KeepGoing = True
00519 self.itmlst = itemlist
00520 self._app = _app
00521
00522 self.outq = outq
00523
00524
00525
00526
00527 if self.itmlst : self.eventOutput = True
00528 else : self.eventOutput = False
00529
00530 w = Process( target=self.work )
00531 w.setDaemon(True)
00532 w.start()
00533
00534 def workerConfig( self ) :
00535 ks = self.c.keys()
00536 if 'ApplicationMgr' in ks :
00537 self.c['ApplicationMgr'].OutStream = []
00538 else :
00539 self.qToParent.put(None)
00540 print 'Worker %i : workerConfig : ApplicationMgr not available for configuration?'%( self.id )
00541 if 'EventSelector' in ks :
00542 self.c['EventSelector'].Input = []
00543 else :
00544 print 'Worker %i : workerConfig : EventSelector not available for configuration?'%( self.id )
00545 try : self.c['HistogramPersistencySvc'].OutputFile = ''
00546 except : print 'Worker-%i: No Histogram output to cancel! Config continues...'%self.id
00547 formatHead = '[Worker %i]'%self.id
00548 if 'MessageSvc' in ks :
00549 self.c['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00550 self.c['MessageSvc'].OutputLevel = INFO
00551 else :
00552 print 'Worker %i : workerConfig : MessageSvc not available for configuration?'%( self.id )
00553
00554 if self.id :
00555 for k in ks :
00556 if hasattr( self.c[k], 'OutputLevel' ) : self.c[k].OutputLevel = ERROR
00557
00558 if self._app == 'Gauss' :
00559 gs = self.c[ 'GaussSequencer' ]
00560
00561
00562 ed = [ gs.Members[1] ]
00563 gs.Members = ed
00564
00565 if self._app == 'DaVinci' :
00566 if 'NTupleSvc' in self.c.keys() :
00567 self.c['NTupleSvc'].Output = ["FILE1 DATAFILE='Worker-%i-Hlt12-StatsTuple.root' TYP='ROOT' OPT='NEW'"%self.id]
00568 self.c['NTupleSvc'].OutputLevel = VERBOSE
00569
00570 def workerExecuteWithOutput( self, tbuf ) :
00571 loadTES( self, tbuf )
00572 self.a._evtpro.executeEvent()
00573 buf = dumpTES( self )
00574 self.outq.put(buf)
00575
00576 while self.outq._buffer : pass
00577 self.evt.clearStore()
00578 self.completed += 1
00579 return True
00580
00581 def workerExecuteNoOutput( self, tbuf ) :
00582 loadTES( self, tbuf )
00583 self.a._evtpro.executeEvent()
00584 self.outq.put('dummy')
00585 self.evt.clearStore()
00586 self.completed += 1
00587 return True
00588
00589 def work( self ):
00590
00591 cName = currentProcess().getName()
00592 currentProcess().setName('Worker '+cName)
00593 appendPostConfigAction( self.workerConfig() )
00594
00595 self.ct = 0
00596
00597
00598 buf = self.inq.get()
00599 if buf is not None :
00600
00601 self.a = AppMgr()
00602 self.evt = self.a.evtsvc()
00603 self.hvt = self.a.histsvc()
00604 self.nvt = self.a.ntuplesvc()
00605 first = True
00606 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00607 self.omitHistos = ['/stat/CaloPIDs']
00608 collectHistos = CollectHistograms( self )
00609 self.a.addAlgorithm( collectHistos )
00610 self.a.initialize()
00611 self.a.start()
00612
00613
00614 if self.eventOutput :
00615 [ self.ts.addOptItem(itm,1) for itm in self.itmlst ]
00616 wFunction = self.workerExecuteWithOutput
00617 else :
00618 wFunction = self.workerExecuteNoOutput
00619
00620
00621
00622 while self.KeepGoing :
00623 if first : pass
00624 else : buf = self.inq.get()
00625 if buf is not None :
00626 sc = wFunction( buf )
00627 if sc : pass
00628 self.ct += 1
00629 if first : first = False
00630 else :
00631 self.KeepGoing = False
00632
00633
00634
00635 self.outq.put(None)
00636
00637
00638
00639
00640 sc = self.finalize()
00641 self.qToParent.put(None)
00642
00643 else :
00644 self.outq.put(None)
00645 sc = self.finalize()
00646 self.qToParent.put(None)
00647 self.cstatq.put( (self.id, {}) )
00648 self.cstatq.put( None )
00649 for item in iter(self.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(self.id)
00650
00651 def finalize( self ) :
00652 self.a.stop()
00653 self.a.finalize()
00654 self.Finished = True
00655 return True
00656
00657
00658
00659
00660
00661 class Writer( ) :
00662 def __init__( self, common_queue, out_qList, cstatq, rstatq, qToParent, workers, config, qLimit, _app ) :
00663 self.qList = out_qList
00664 self.cq = common_queue
00665 self.cstatq = cstatq
00666 self.rstatq = rstatq
00667 self.qToParent = qToParent
00668 self.qLimit = qLimit
00669 self.workers = workers
00670 self.output = True
00671 self.flags = 0
00672 self.c = config
00673 self._app = _app
00674
00675
00676
00677
00678 self.bookingDict = {}
00679 self.bookingDict['DataObject'] = self.bookDataObject
00680 self.bookingDict['NTuple::Directory'] = self.bookDataObject
00681 self.bookingDict['NTuple::File'] = self.bookDataObject
00682 self.bookingDict['TH1D'] = self.bookTH1D
00683 self.bookingDict['TH2D'] = self.bookTH2D
00684 self.bookingDict['TH3D'] = self.bookTH3D
00685 self.bookingDict['TProfile'] = self.bookTProfile
00686 self.bookingDict['TProfile2D'] = self.bookTProfile2D
00687
00688 w = Process( target=self.write )
00689 w.start()
00690
00691 def bookDataObject( self, n, o ):
00692 self.hvt.registerObject( n, o )
00693
00694 def bookTH1D( self, n, o ) :
00695 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax())
00696 aida2root(obj).Add(o)
00697
00698 def bookTH2D( self, n, o ) :
00699 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax())
00700 aida2root(obj).Add(o)
00701
00702 def bookTH3D( self, n, o ) :
00703 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetXbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(),
00704 o.GetYaxis().GetXbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(),
00705 o.GetZaxis().GetXbins(), o.GetZaxis().GetXmin(), o.GetZaxis().GetXmax() )
00706 aida2root(obj).Add(o)
00707
00708 def bookTProfile( self, n, o ) :
00709 obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetOption())
00710 aida2root(obj).Add(o)
00711
00712 def bookTProfile2D( self, n, o ) :
00713 obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(), o.GetOption())
00714 aida2root(obj).Add(o)
00715
00716
00717 def writerConfig( self ) :
00718 ks = self.c.keys()
00719 if 'EventSelector' in ks :
00720 self.c['EventSelector'].Input = []
00721 else :
00722 print 'Writer : writerConfig : EventSelector not available for configuration?'
00723 if 'ApplicationMgr' in ks :
00724 self.c['ApplicationMgr'].TopAlg = []
00725 else :
00726 print 'Writer : writerConfig : ApplicationMgr not available for configuration?'
00727 if 'MessageSvc' in ks :
00728 self.c['MessageSvc'].Format = '[Writer]% F%18W%S%7W%R%T %0W%M'
00729 else :
00730 print 'Writer : writerConfig : MessageSvc not available for configuration?'
00731
00732
00733 for k in self.c.keys() :
00734 if hasattr(self.c[k], 'RequireAlgs') : self.c[k].RequireAlgs = []
00735
00736 def write( self ):
00737 currentProcess().setName('+Writer+')
00738 print 'WRITER : applying PostConfigAction'
00739 appendPostConfigAction( self.writerConfig() )
00740 print '[ GaudiPython.Parallel ] Writer started : Process %i'%( os.getpid() )
00741
00742 self.a = AppMgr()
00743 self.a.initialize()
00744 self.a.start()
00745 self.evt = self.a.evtsvc()
00746 self.hvt = self.a.histsvc()
00747 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00748
00749 self.ct = 0
00750 status = [True]*self.workers
00751 recvd = [0]*self.workers
00752 cqnc = 0
00753 gotOK = True
00754
00755 ind = 0 ; gotWhich = False ; ifblock = [] ; tryBlock = [] ; starttry = None ; outs = []
00756
00757 waitForFlag = True ; whichQ = 0
00758 while sum(status) > 0 :
00759 whichQ = (whichQ+1)%self.workers
00760 outq = self.qList[whichQ]
00761 trying = True
00762 try :
00763 tbuf = outq.get(timeout=0.01,block=False)
00764 trying = False
00765 except :
00766 continue
00767 if tbuf :
00768 recvd[whichQ] += 1
00769 try :
00770 loadTES( self, tbuf )
00771 self.a._evtpro.executeEvent()
00772 self.evt.clearStore()
00773 self.ct += 1
00774 if not self.ct%20 : print '[ GaudiPython.Parallel ] Writer Progress (n. Events) : %i'%self.ct
00775 except :
00776 print '[ GaudiPython.Parallel ] Writer trying to load a ', type(tbuf), tbuf[:10] , '??... skipping to next'
00777 else :
00778 status[whichQ] = False
00779
00780
00781
00782 if self.output : self.finalize()
00783 else : self.a.stop() ; self.a.finalize()
00784
00785
00786 self.qToParent.put(None)
00787
00788 def composition( self ) :
00789 lst = self.hvt.getHistoNames()
00790 record = []
00791 objects = 0 ; histos = 0
00792 if lst :
00793 for n in lst :
00794 o = self.hvt[ n ]
00795 if type(o) in aidatypes :
00796 histos += 1
00797 record.append( (n, o.entries()) )
00798 else :
00799 objects += 1
00800 else :
00801 print 'Empty list!'
00802
00803
00804
00805
00806
00807
00808 def finalize( self ) :
00809 nc = 0
00810 self.HistoCollection = []
00811 for item in iter(self.rstatq.get, None) : self.HistoCollection.append( item )
00812 while nc < self.workers :
00813 tup = self.cstatq.get()
00814 if tup : self.HistoCollection.append( tup )
00815 else : nc += 1
00816 self.HistoCollection.sort()
00817
00818 self.qToParent.put('h')
00819 self.RebuildHistoStore()
00820
00821 self.a.stop()
00822 self.a.finalize()
00823
00824
00825 def RebuildHistoStore( self ) :
00826 for tup in self.HistoCollection :
00827 workerID, histDict = tup
00828 added = 0 ; registered = 0; booked = 0
00829 for n in histDict.keys() :
00830 o = histDict[ n ]
00831 obj = self.hvt.retrieve( n )
00832 if obj :
00833 aida2root(obj).Add(o)
00834 added += 1
00835 else :
00836 if o.__class__.__name__ in self.bookingDict.keys() :
00837 self.bookingDict[o.__class__.__name__](n, o)
00838 else :
00839 print 'No booking method for: ', n, o, type(o), o.__class__.__name__
00840 booked += 1
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851
00852
00853
00854