Gaudi Framework, version v21r8

Home   Generated: 17 Mar 2010

GaudiPython::Parallel::Worker Class Reference

List of all members.

Public Member Functions

def __init__
def workerConfig
def workerExecuteWithOutput
def workerExecuteNoOutput
def work
def finalize

Public Attributes

 id
 c
 inq
 qLimit
 commonQ
 cstatq
 qToParent
 nprocs
 outList
 tempCt
 putErrs
 completed
 KeepGoing
 itmlst
 outq
 eventOutput
 ct
 a
 evt
 hvt
 nvt
 ts
 omitHistos
 Finished

Private Attributes

 _app


Detailed Description

Definition at line 499 of file Parallel.py.


Member Function Documentation

def GaudiPython::Parallel::Worker::__init__ (   self,
  wid,
  inq,
  cq,
  outq,
  cstatq,
  qToParent,
  nprocs,
  config,
  qLimit,
  _app,
  itemlist = None 
)

Definition at line 500 of file Parallel.py.

00500                                                                                                              :
00501 
00502         # wid      : an integer (0...Nworkers-1) identifying the worker
00503         # in_q     : the common queue from reader to workers
00504         # out_q    : the queue from a worker to the writer (unique queue for each worker)
00505         # common_q : the common queue from workers to writer, along which 'id' will be sent on event completion
00506         self.id           = wid
00507         self.c            = config
00508         self.inq          = inq
00509         self.qLimit       = qLimit
00510         self.commonQ      = cq
00511         self.cstatq       = cstatq
00512         self.qToParent    = qToParent
00513         self.nprocs       = nprocs
00514         self.outList      = []
00515         self.tempCt       = 0
00516         self.putErrs      = 0
00517         self.completed    = 0
00518         self.KeepGoing    = True
00519         self.itmlst       = itemlist
00520         self._app      = _app
00521 
00522         self.outq         = outq
00523 
00524         # from ParallelStats import constructWorkerDict
00525         # self.constructWorkerDict = constructWorkerDict
00526 
00527         if self.itmlst : self.eventOutput = True
00528         else           : self.eventOutput = False
00529 
00530         w = Process( target=self.work )
00531         w.setDaemon(True)
00532         w.start()
00533 
    def workerConfig( self ) :

def GaudiPython::Parallel::Worker::workerConfig (   self  ) 

Definition at line 534 of file Parallel.py.

00534                              :
00535         ks = self.c.keys()
00536         if 'ApplicationMgr' in ks :
00537             self.c['ApplicationMgr'].OutStream = []
00538         else :
00539             self.qToParent.put(None)
00540             print 'Worker %i : workerConfig : ApplicationMgr not available for configuration?'%( self.id )
00541         if 'EventSelector' in ks :
00542             self.c['EventSelector'].Input      = []
00543         else :
00544             print 'Worker %i : workerConfig : EventSelector not available for configuration?'%( self.id )
00545         try    : self.c['HistogramPersistencySvc'].OutputFile = ''
00546         except : print 'Worker-%i: No Histogram output to cancel! Config continues...'%self.id
00547         formatHead = '[Worker %i]'%self.id
00548         if 'MessageSvc' in ks :
00549             self.c['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00550             self.c['MessageSvc'].OutputLevel = INFO
00551         else :
00552             print 'Worker %i : workerConfig : MessageSvc not available for configuration?'%( self.id )
00553 
00554         if self.id :
00555             for k in ks :
00556                 if hasattr( self.c[k], 'OutputLevel' ) : self.c[k].OutputLevel = ERROR
00557 
00558         if self._app == 'Gauss' :
00559             gs = self.c[ 'GaussSequencer' ]
00560             # Sequencer has two stages : generation and simulation, so let
00561             # the reader generate events one by one, and pass to workers for full Simulation step
00562             ed = [ gs.Members[1] ]
00563             gs.Members = ed
00564 
00565         if self._app == 'DaVinci' :
00566             if 'NTupleSvc' in self.c.keys() :
00567                 self.c['NTupleSvc'].Output = ["FILE1 DATAFILE='Worker-%i-Hlt12-StatsTuple.root' TYP='ROOT' OPT='NEW'"%self.id]
00568                 self.c['NTupleSvc'].OutputLevel = VERBOSE
00569 
    def workerExecuteWithOutput( self, tbuf ) :

def GaudiPython::Parallel::Worker::workerExecuteWithOutput (   self,
  tbuf 
)

Definition at line 570 of file Parallel.py.

00570                                               :
00571         loadTES( self, tbuf )
00572         self.a._evtpro.executeEvent()
00573         buf = dumpTES( self )
00574         self.outq.put(buf)
00575         # allow the background thread to feed the Queue; not 100% guaranteed to finish before next line
00576         while self.outq._buffer : pass
00577         self.evt.clearStore()
00578         self.completed += 1
00579         return True
00580 
    def workerExecuteNoOutput( self, tbuf ) :

def GaudiPython::Parallel::Worker::workerExecuteNoOutput (   self,
  tbuf 
)

Definition at line 581 of file Parallel.py.

00581                                             :
00582         loadTES( self, tbuf )
00583         self.a._evtpro.executeEvent()
00584         self.outq.put('dummy')
00585         self.evt.clearStore()
00586         self.completed += 1
00587         return True
00588 
    def work( self ):

def GaudiPython::Parallel::Worker::work (   self  ) 

Definition at line 589 of file Parallel.py.

00589                     :
00590         # print 'Worker %d: starting...%d, at %5.6f' % (self.id, os.getpid(), alive)
00591         cName = currentProcess().getName()
00592         currentProcess().setName('Worker '+cName)
00593         appendPostConfigAction( self.workerConfig() )
00594 
00595         self.ct = 0
00596 
00597         # take first event
00598         buf = self.inq.get()
00599         if buf is not None :
00600             # set up GaudiPython tools
00601             self.a   = AppMgr()
00602             self.evt = self.a.evtsvc()
00603             self.hvt = self.a.histsvc()
00604             self.nvt = self.a.ntuplesvc()
00605             first = True
00606             self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp)
00607             self.omitHistos = ['/stat/CaloPIDs']
00608             collectHistos = CollectHistograms( self )
00609             self.a.addAlgorithm( collectHistos )
00610             self.a.initialize()
00611             self.a.start()
00612 
00613             # worker execution changes depending on output/no-output scenario
00614             if self.eventOutput :
00615                 [ self.ts.addOptItem(itm,1) for itm in self.itmlst ]
00616                 wFunction = self.workerExecuteWithOutput
00617             else :
00618                 wFunction = self.workerExecuteNoOutput
00619 
00620 
00621             # iter over the Queue to workers, receiving serialized events. When done, workers place None on queue
00622             while self.KeepGoing :
00623                 if first : pass
00624                 else : buf = self.inq.get()
00625                 if buf is not None :
00626                     sc = wFunction( buf )
00627                     if sc : pass
00628                     self.ct += 1
00629                     if first : first = False
00630                 else :
00631                     self.KeepGoing = False
00632 
00633             # print '='*80
00634             # print 'Worker %i Finished'%( self.id )
00635             self.outq.put(None)
00636             # print line
00637             # print 'Events Recd : %i'%( self.ct )
00638             # print '='*80
00639 
00640             sc = self.finalize()
00641             self.qToParent.put(None)
00642             # print '[ GaudiPython Parallel ] : Worker has sent dict and None flag back to Parent'
00643         else : # if the buffer is None...
00644             self.outq.put(None)
00645             sc = self.finalize()
00646             self.qToParent.put(None)
00647             self.cstatq.put( (self.id, {}) )
00648             self.cstatq.put( None )
00649             for item in iter(self.commonQ.get, None) : print 'Worker %i : Got an item on the Common Queue?'%(self.id)
00650 
    def finalize( self ) :

def GaudiPython::Parallel::Worker::finalize (   self  ) 

Definition at line 651 of file Parallel.py.

00651                          :
00652         self.a.stop()
00653         self.a.finalize()
00654         self.Finished = True
00655         return True
00656 
00657 # ===========================================================================================
00658 # The Writer
00659 # ===========================================================================================
00660 
class Writer( ) :


Member Data Documentation

Definition at line 506 of file Parallel.py.

Definition at line 507 of file Parallel.py.

Definition at line 508 of file Parallel.py.

Definition at line 509 of file Parallel.py.

Definition at line 510 of file Parallel.py.

Definition at line 511 of file Parallel.py.

Definition at line 512 of file Parallel.py.

Definition at line 513 of file Parallel.py.

Definition at line 514 of file Parallel.py.

Definition at line 515 of file Parallel.py.

Definition at line 516 of file Parallel.py.

Definition at line 517 of file Parallel.py.

Definition at line 518 of file Parallel.py.

Definition at line 519 of file Parallel.py.

Definition at line 520 of file Parallel.py.

Definition at line 522 of file Parallel.py.

Definition at line 527 of file Parallel.py.

Definition at line 595 of file Parallel.py.

Definition at line 601 of file Parallel.py.

Definition at line 602 of file Parallel.py.

Definition at line 603 of file Parallel.py.

Definition at line 604 of file Parallel.py.

Definition at line 606 of file Parallel.py.

Definition at line 607 of file Parallel.py.

Definition at line 654 of file Parallel.py.


The documentation for this class was generated from the following file:

Generated at Wed Mar 17 18:21:56 2010 for Gaudi Framework, version v21r8 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004