|
Gaudi Framework, version v21r9 |
| Home | Generated: 3 May 2010 |
Public Member Functions | |
| def | __init__ |
| def | bookDataObject |
| def | bookTH1D |
| def | bookTH2D |
| def | bookTH3D |
| def | bookTProfile |
| def | bookTProfile2D |
| def | writerConfig |
| def | write |
| def | composition |
| def | finalize |
| def | RebuildHistoStore |
Public Attributes | |
| qList | |
| cq | |
| cstatq | |
| rstatq | |
| qToParent | |
| qLimit | |
| workers | |
| output | |
| flags | |
| c | |
| bookingDict | |
| a | |
| evt | |
| hvt | |
| ts | |
| ct | |
| HistoCollection | |
Private Attributes | |
| _app | |
Definition at line 661 of file Parallel.py.
| def GaudiPython::Parallel::Writer::__init__ | ( | self, | ||
| common_queue, | ||||
| out_qList, | ||||
| cstatq, | ||||
| rstatq, | ||||
| qToParent, | ||||
| workers, | ||||
| config, | ||||
| qLimit, | ||||
| _app | ||||
| ) |
Definition at line 662 of file Parallel.py.
00662 : 00663 self.qList = out_qList 00664 self.cq = common_queue 00665 self.cstatq = cstatq 00666 self.rstatq = rstatq 00667 self.qToParent = qToParent 00668 self.qLimit = qLimit 00669 self.workers = workers 00670 self.output = True 00671 self.flags = 0 00672 self.c = config 00673 self._app = _app 00674 00675 # from ParallelStats import constructWriterDict 00676 # self.constructWriterDict = constructWriterDict 00677 00678 self.bookingDict = {} 00679 self.bookingDict['DataObject'] = self.bookDataObject 00680 self.bookingDict['NTuple::Directory'] = self.bookDataObject 00681 self.bookingDict['NTuple::File'] = self.bookDataObject 00682 self.bookingDict['TH1D'] = self.bookTH1D 00683 self.bookingDict['TH2D'] = self.bookTH2D 00684 self.bookingDict['TH3D'] = self.bookTH3D 00685 self.bookingDict['TProfile'] = self.bookTProfile 00686 self.bookingDict['TProfile2D'] = self.bookTProfile2D 00687 00688 w = Process( target=self.write ) 00689 w.start() 00690 def bookDataObject( self, n, o ):
| def GaudiPython::Parallel::Writer::bookDataObject | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
| def GaudiPython::Parallel::Writer::bookTH1D | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
Definition at line 694 of file Parallel.py.
00694 : 00695 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax()) 00696 aida2root(obj).Add(o) 00697 def bookTH2D( self, n, o ) :
| def GaudiPython::Parallel::Writer::bookTH2D | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
Definition at line 698 of file Parallel.py.
00698 : 00699 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax()) 00700 aida2root(obj).Add(o) 00701 def bookTH3D( self, n, o ) :
| def GaudiPython::Parallel::Writer::bookTH3D | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
Definition at line 702 of file Parallel.py.
00702 : 00703 obj = self.hvt._ihs.book(n, o.GetTitle(), o.GetXaxis().GetXbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), 00704 o.GetYaxis().GetXbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(), 00705 o.GetZaxis().GetXbins(), o.GetZaxis().GetXmin(), o.GetZaxis().GetXmax() ) 00706 aida2root(obj).Add(o) 00707 def bookTProfile( self, n, o ) :
| def GaudiPython::Parallel::Writer::bookTProfile | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
Definition at line 708 of file Parallel.py.
00708 : 00709 obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetOption()) 00710 aida2root(obj).Add(o) 00711 def bookTProfile2D( self, n, o ) :
| def GaudiPython::Parallel::Writer::bookTProfile2D | ( | self, | ||
| n, | ||||
| o | ||||
| ) |
Definition at line 712 of file Parallel.py.
00712 : 00713 obj = self.hvt._ihs.bookProf(n, o.GetTitle(), o.GetXaxis().GetNbins(), o.GetXaxis().GetXmin(), o.GetXaxis().GetXmax(), o.GetYaxis().GetNbins(), o.GetYaxis().GetXmin(), o.GetYaxis().GetXmax(), o.GetOption()) 00714 aida2root(obj).Add(o) 00715 00716 def writerConfig( self ) :
| def GaudiPython::Parallel::Writer::writerConfig | ( | self | ) |
Definition at line 717 of file Parallel.py.
00717 : 00718 ks = self.c.keys() 00719 if 'EventSelector' in ks : 00720 self.c['EventSelector'].Input = [] 00721 else : 00722 print 'Writer : writerConfig : EventSelector not available for configuration?' 00723 if 'ApplicationMgr' in ks : 00724 self.c['ApplicationMgr'].TopAlg = [] 00725 else : 00726 print 'Writer : writerConfig : ApplicationMgr not available for configuration?' 00727 if 'MessageSvc' in ks : 00728 self.c['MessageSvc'].Format = '[Writer]% F%18W%S%7W%R%T %0W%M' 00729 else : 00730 print 'Writer : writerConfig : MessageSvc not available for configuration?' 00731 # sometimes the outstreams require that certain algs have completed 00732 # obviously, these algs aren't going to be run on our independent writer 00733 for k in self.c.keys() : 00734 if hasattr(self.c[k], 'RequireAlgs') : self.c[k].RequireAlgs = [] 00735 def write( self ):
| def GaudiPython::Parallel::Writer::write | ( | self | ) |
Definition at line 736 of file Parallel.py.
00736 : 00737 currentProcess().setName('+Writer+') 00738 print 'WRITER : applying PostConfigAction' 00739 appendPostConfigAction( self.writerConfig() ) 00740 print '[ GaudiPython.Parallel ] Writer started : Process %i'%( os.getpid() ) 00741 00742 self.a = AppMgr() 00743 self.a.initialize() 00744 self.a.start() 00745 self.evt = self.a.evtsvc() 00746 self.hvt = self.a.histsvc() 00747 self.ts = gbl.GaudiPython.TESSerializer(self.evt._idp) 00748 00749 self.ct = 0 00750 status = [True]*self.workers 00751 recvd = [0]*self.workers 00752 cqnc = 0 # none count from the common queue 00753 gotOK = True 00754 00755 ind = 0 ; gotWhich = False ; ifblock = [] ; tryBlock = [] ; starttry = None ; outs = [] 00756 00757 waitForFlag = True ; whichQ = 0 00758 while sum(status) > 0 : 00759 whichQ = (whichQ+1)%self.workers 00760 outq = self.qList[whichQ] 00761 trying = True 00762 try : 00763 tbuf = outq.get(timeout=0.01,block=False) 00764 trying = False 00765 except : 00766 continue 00767 if tbuf : 00768 recvd[whichQ] += 1 00769 try : 00770 loadTES( self, tbuf ) 00771 self.a._evtpro.executeEvent() # fire the writing of TES to output file 00772 self.evt.clearStore() # and clear out the TES 00773 self.ct += 1 00774 if not self.ct%20 : print '[ GaudiPython.Parallel ] Writer Progress (n. Events) : %i'%self.ct 00775 except : 00776 print '[ GaudiPython.Parallel ] Writer trying to load a ', type(tbuf), tbuf[:10] , '??... skipping to next' 00777 else : 00778 status[whichQ] = False # that worker is finished... 00779 # print 'Writer received None from worker %i'%( whichQ ) 00780 00781 # print '[ GaudiPython.Parallel ] Writer Complete : %i Events received'%self.ct 00782 if self.output : self.finalize() 00783 else : self.a.stop() ; self.a.finalize() 00784 # self.constructWriterDict() 00785 # self.qToParent.put(self.d) 00786 self.qToParent.put(None) 00787 def composition( self ) :
| def GaudiPython::Parallel::Writer::composition | ( | self | ) |
Definition at line 788 of file Parallel.py.
00788 : 00789 lst = self.hvt.getHistoNames() 00790 record = [] 00791 objects = 0 ; histos = 0 00792 if lst : 00793 for n in lst : 00794 o = self.hvt[ n ] 00795 if type(o) in aidatypes : 00796 histos += 1 00797 record.append( (n, o.entries()) ) 00798 else : 00799 objects += 1 00800 else : 00801 print 'Empty list!' 00802 # print line 00803 # print 'Size of Histo Store : %i'%( len(lst) ) 00804 # print ' - Histos : %i'%( histos ) 00805 # print ' - Objects : %i'%( objects ) 00806 # print line 00807 def finalize( self ) :
| def GaudiPython::Parallel::Writer::finalize | ( | self | ) |
Definition at line 808 of file Parallel.py.
00808 : 00809 nc = 0 00810 self.HistoCollection = [] 00811 for item in iter(self.rstatq.get, None) : self.HistoCollection.append( item ) 00812 while nc < self.workers : 00813 tup = self.cstatq.get() 00814 if tup : self.HistoCollection.append( tup ) # tup is (worker-id, histoDict) 00815 else : nc += 1 00816 self.HistoCollection.sort() 00817 # send signal to Parent that all Histos have been received 00818 self.qToParent.put('h') 00819 self.RebuildHistoStore() 00820 00821 self.a.stop() 00822 self.a.finalize() 00823 # print '[ GaudiPython.Parallel ] Writer Complete.' 00824 def RebuildHistoStore( self ) :
| def GaudiPython::Parallel::Writer::RebuildHistoStore | ( | self | ) |
Definition at line 825 of file Parallel.py.
00825 : 00826 for tup in self.HistoCollection : 00827 workerID, histDict = tup 00828 added = 0 ; registered = 0; booked = 0 00829 for n in histDict.keys() : 00830 o = histDict[ n ] 00831 obj = self.hvt.retrieve( n ) 00832 if obj : 00833 aida2root(obj).Add(o) 00834 added += 1 00835 else : 00836 if o.__class__.__name__ in self.bookingDict.keys() : 00837 self.bookingDict[o.__class__.__name__](n, o) 00838 else : 00839 print 'No booking method for: ', n, o, type(o), o.__class__.__name__ 00840 booked += 1 00841 # print '='*80 00842 # print 'Set of Histos complete' 00843 # print 'Added (histos) : %i'%( added ) 00844 # print 'Registered (objects) : %i'%( registered ) 00845 # print 'Booked (histos) : %i'%( booked ) 00846 # print line 00847 # print 'Size of Histo Store : %i'%( len( self.hvt.getHistoNames() ) ) 00848 # print '='*80 00849 # print 'o'*80 00850 # print 'Writer : All Histogram sets added.' 00851 # self.composition( ) 00852 # print 'o'*80 00853 00854 # == EOF ==================================================================================== # == EOF ====================================================================================
Definition at line 663 of file Parallel.py.
Definition at line 664 of file Parallel.py.
Definition at line 665 of file Parallel.py.
Definition at line 666 of file Parallel.py.
Definition at line 667 of file Parallel.py.
Definition at line 668 of file Parallel.py.
Definition at line 669 of file Parallel.py.
Definition at line 670 of file Parallel.py.
Definition at line 671 of file Parallel.py.
Definition at line 672 of file Parallel.py.
GaudiPython::Parallel::Writer::_app [private] |
Definition at line 673 of file Parallel.py.
Definition at line 678 of file Parallel.py.
Definition at line 742 of file Parallel.py.
Definition at line 745 of file Parallel.py.
Definition at line 746 of file Parallel.py.
Definition at line 747 of file Parallel.py.
Definition at line 749 of file Parallel.py.
Definition at line 810 of file Parallel.py.