|
Gaudi Framework, version v21r8 |
| Home | Generated: 17 Mar 2010 |
Public Member Functions | |
| def | __init__ |
| def | workerConfig |
| def | workerExecuteWithOutput |
| def | workerExecuteNoOutput |
| def | work |
| def | finalize |
Public Attributes | |
| id | |
| c | |
| inq | |
| qLimit | |
| commonQ | |
| cstatq | |
| qToParent | |
| nprocs | |
| outList | |
| tempCt | |
| putErrs | |
| completed | |
| KeepGoing | |
| itmlst | |
| outq | |
| eventOutput | |
| ct | |
| a | |
| evt | |
| hvt | |
| nvt | |
| ts | |
| omitHistos | |
| Finished | |
Private Attributes | |
| _app | |
Definition at line 499 of file Parallel.py.
| def GaudiPython::Parallel::Worker::__init__ | ( | self, | ||
| wid, | ||||
| inq, | ||||
| cq, | ||||
| outq, | ||||
| cstatq, | ||||
| qToParent, | ||||
| nprocs, | ||||
| config, | ||||
| qLimit, | ||||
| _app, | ||||
itemlist = None | ||||
| ) |
Definition at line 500 of file Parallel.py.
00500 : 00501 00502 # wid : an integer (0...Nworkers-1) identifying the worker 00503 # in_q : the common queue from reader to workers 00504 # out_q : the queue from a worker to the writer (unique queue for each worker) 00505 # common_q : the common queue from workers to writer, along which 'id' will be sent on event completion 00506 self.id = wid 00507 self.c = config 00508 self.inq = inq 00509 self.qLimit = qLimit 00510 self.commonQ = cq 00511 self.cstatq = cstatq 00512 self.qToParent = qToParent 00513 self.nprocs = nprocs 00514 self.outList = [] 00515 self.tempCt = 0 00516 self.putErrs = 0 00517 self.completed = 0 00518 self.KeepGoing = True 00519 self.itmlst = itemlist 00520 self._app = _app 00521 00522 self.outq = outq 00523 00524 # from ParallelStats import constructWorkerDict 00525 # self.constructWorkerDict = constructWorkerDict 00526 00527 if self.itmlst : self.eventOutput = True 00528 else : self.eventOutput = False 00529 00530 w = Process( target=self.work ) 00531 w.setDaemon(True) 00532 w.start() 00533 def workerConfig( self ) :
| def GaudiPython::Parallel::Worker::workerConfig | ( | self | ) |
Definition at line 534 of file Parallel.py.
00534 : 00535 ks = self.c.keys() 00536 if 'ApplicationMgr' in ks : 00537 self.c['ApplicationMgr'].OutStream = [] 00538 else : 00539 self.qToParent.put(None) 00540 print 'Worker %i : workerConfig : ApplicationMgr not available for configuration?'%( self.id ) 00541 if 'EventSelector' in ks : 00542 self.c['EventSelector'].Input = [] 00543 else : 00544 print 'Worker %i : workerConfig : EventSelector not available for configuration?'%( self.id ) 00545 try : self.c['HistogramPersistencySvc'].OutputFile = '' 00546 except : print 'Worker-%i: No Histogram output to cancel! Config continues...'%self.id 00547 formatHead = '[Worker %i]'%self.id 00548 if 'MessageSvc' in ks : 00549 self.c['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M' 00550 self.c['MessageSvc'].OutputLevel = INFO 00551 else : 00552 print 'Worker %i : workerConfig : MessageSvc not available for configuration?'%( self.id ) 00553 00554 if self.id : 00555 for k in ks : 00556 if hasattr( self.c[k], 'OutputLevel' ) : self.c[k].OutputLevel = ERROR 00557 00558 if self._app == 'Gauss' : 00559 gs = self.c[ 'GaussSequencer' ] 00560 # Sequencer has two stages : generation and simulation, so let 00561 # the reader generate events one by one, and pass to workers for full Simulation step 00562 ed = [ gs.Members[1] ] 00563 gs.Members = ed 00564 00565 if self._app == 'DaVinci' : 00566 if 'NTupleSvc' in self.c.keys() : 00567 self.c['NTupleSvc'].Output = ["FILE1 DATAFILE='Worker-%i-Hlt12-StatsTuple.root' TYP='ROOT' OPT='NEW'"%self.id] 00568 self.c['NTupleSvc'].OutputLevel = VERBOSE 00569 def workerExecuteWithOutput( self, tbuf ) :
| def GaudiPython::Parallel::Worker::workerExecuteWithOutput | ( | self, | ||
| tbuf | ||||
| ) |
Definition at line 570 of file Parallel.py.
00570 : 00571 loadTES( self, tbuf ) 00572 self.a._evtpro.executeEvent() 00573 buf = dumpTES( self ) 00574 self.outq.put(buf) 00575 # allow the background thread to feed the Queue; not 100% guaranteed to finish before next line 00576 while self.outq._buffer : pass 00577 self.evt.clearStore() 00578 self.completed += 1 00579 return True 00580 def workerExecuteNoOutput( self, tbuf ) :
| def GaudiPython::Parallel::Worker::workerExecuteNoOutput | ( | self, | ||
| tbuf | ||||
| ) |
| def GaudiPython::Parallel::Worker::work | ( | self | ) |
Definition at line 589 of file Parallel.py.
00589 : 00590 # print 'Worker %d: starting...%d, at %5.6f' % (self.id, os.getpid(), alive) 00591 cName = currentProcess().getName() 00592 currentProcess().setName('Worker '+cName) 00593 appendPostConfigAction( self.workerConfig() ) 00594 00595 self.ct = 0 00596 00597 # take first event 00598 buf = self.inq.get() 00599 if buf is not None : 00600 # set up GaudiPython tools 00601 self.a = AppMgr() 00602 self.evt = self.a.evtsvc() 00603 self.hvt = self.a.histsvc() 00604 self.nvt = self.a.ntuplesvc() 00605 first = True 00606 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp) 00607 self.omitHistos = ['/stat/CaloPIDs'] 00608 collectHistos = CollectHistograms( self ) 00609 self.a.addAlgorithm( collectHistos ) 00610 self.a.initialize() 00611 self.a.start() 00612 00613 # worker execution changes depending on output/no-output scenario 00614 if self.eventOutput : 00615 [ self.ts.addOptItem(itm,1) for itm in self.itmlst ] 00616 wFunction = self.workerExecuteWithOutput 00617 else : 00618 wFunction = self.workerExecuteNoOutput 00619 00620 00621 # iter over the Queue to workers, receiving serialized events. When done, workers place None on queue 00622 while self.KeepGoing : 00623 if first : pass 00624 else : buf = self.inq.get() 00625 if buf is not None : 00626 sc = wFunction( buf ) 00627 if sc : pass 00628 self.ct += 1 00629 if first : first = False 00630 else : 00631 self.KeepGoing = False 00632 00633 # print '='*80 00634 # print 'Worker %i Finished'%( self.id ) 00635 self.outq.put(None) 00636 # print line 00637 # print 'Events Recd : %i'%( self.ct ) 00638 # print '='*80 00639 00640 sc = self.finalize() 00641 self.qToParent.put(None) 00642 # print '[ GaudiPython Parallel ] : Worker has sent dict and None flag back to Parent' 00643 else : # if the buffer is None... 00644 self.outq.put(None) 00645 sc = self.finalize() 00646 self.qToParent.put(None) 00647 self.cstatq.put( (self.id, {}) ) 00648 self.cstatq.put( None ) 00649 for item in iter(self.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(self.id) 00650 def finalize( self ) :
| def GaudiPython::Parallel::Worker::finalize | ( | self | ) |
Definition at line 651 of file Parallel.py.
00651 : 00652 self.a.stop() 00653 self.a.finalize() 00654 self.Finished = True 00655 return True 00656 00657 # =========================================================================================== 00658 # The Writer 00659 # =========================================================================================== 00660 class Writer( ) :
Definition at line 506 of file Parallel.py.
Definition at line 507 of file Parallel.py.
Definition at line 508 of file Parallel.py.
Definition at line 509 of file Parallel.py.
Definition at line 510 of file Parallel.py.
Definition at line 511 of file Parallel.py.
Definition at line 512 of file Parallel.py.
Definition at line 513 of file Parallel.py.
Definition at line 514 of file Parallel.py.
Definition at line 515 of file Parallel.py.
Definition at line 516 of file Parallel.py.
Definition at line 517 of file Parallel.py.
Definition at line 518 of file Parallel.py.
Definition at line 519 of file Parallel.py.
GaudiPython::Parallel::Worker::_app [private] |
Definition at line 520 of file Parallel.py.
Definition at line 522 of file Parallel.py.
Definition at line 527 of file Parallel.py.
Definition at line 595 of file Parallel.py.
Definition at line 601 of file Parallel.py.
Definition at line 602 of file Parallel.py.
Definition at line 603 of file Parallel.py.
Definition at line 604 of file Parallel.py.
Definition at line 606 of file Parallel.py.
Definition at line 607 of file Parallel.py.
Definition at line 654 of file Parallel.py.