00001 from Gaudi.Configuration import *
00002 from Configurables import EvtCounter
00003 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
00004 from ROOT import TBufferFile, TBuffer
00005 import multiprocessing
00006 from multiprocessing import Process, Queue, JoinableQueue, Event
00007 from multiprocessing import cpu_count, current_process
00008 from multiprocessing.queues import Empty
00009 from pTools import *
00010 import time, sys, os
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 NAP = 0.001
00038 MB = 1024.0*1024.0
00039
00040 WAIT_INITIALISE = 60*5
00041 WAIT_FIRST_EVENT = 60*3
00042 WAIT_SINGLE_EVENT = 60*6
00043 WAIT_FINALISE = 60*2
00044 STEP_INITIALISE = 10
00045 STEP_EVENT = 2
00046 STEP_FINALISE = 5
00047
00048
00049 SMAPS = False
00050
00051
00052
00053
00054
00055
00056 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00057
00058
00059
00060 aidatypes = ( gbl.AIDA.IHistogram,
00061 gbl.AIDA.IHistogram1D,
00062 gbl.AIDA.IHistogram2D,
00063 gbl.AIDA.IHistogram3D,
00064 gbl.AIDA.IProfile1D,
00065 gbl.AIDA.IProfile2D,
00066 gbl.AIDA.IBaseHistogram )
00067
00068
00069 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00070
00071
00072 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
00073 'InputCopyStream' : "events",
00074 'OutputStream' : "events",
00075 'RecordStream' : "records",
00076 'RunRecordStream' : "records",
00077 'SequentialOutputStream' : "events",
00078 'TagCollectionStream' : "tuples" }
00079
00080
00081
00082 class MiniWriter( object ) :
00083 '''
00084 A class to represent a writer in the GaudiPython configuration
00085 It can be non-trivial to access the name of the output file; it may be
00086 specified in the DataSvc, or just on the writer, may be a list, or string
00087 Also, there are three different types of writer (events, records, tuples)
00088 so this bootstrap class provides easy access to this info while configuring
00089 '''
00090 def __init__( self, writer, wType, config ) :
00091 self.w = writer
00092 self.wType = wType
00093
00094
00095
00096 self.key = None
00097 self.output = None
00098 self.ItemList = None
00099 self.OptItemList = None
00100
00101 self.wName = writer.getName()
00102
00103 self.woutput = None
00104 self.datasvcName = None
00105 self.svcOutput = None
00106 if hasattr( self.w, "Output" ) :
00107 self.woutput = self.w.Output
00108 self.getItemLists( config )
00109 self.set( self.wName, self.w.Output )
00110 return
00111 else :
00112
00113
00114 self.datasvcName = self.w.EvtDataSvc
00115 datasvc = config[ self.datasvcName ]
00116 if hasattr( datasvc, "Output" ) :
00117 self.getItemLists( config )
00118 self.set( self.datasvcName, datasvc.Output )
00119 return
00120
00121 def getNewName( self, replaceThis, withThis, extra='' ) :
00122
00123
00124
00125
00126
00127
00128 assert replaceThis.__class__.__name__ == 'str'
00129 assert withThis.__class__.__name__ == 'str'
00130 old = self.output
00131 lst = False
00132 if old.__class__.__name__ == 'list' :
00133 old = self.output[0]
00134 lst = True
00135 new = old.replace( replaceThis, withThis )
00136 new += extra
00137 if lst :
00138 return [ new ]
00139 else :
00140 return new
00141
00142 def getItemLists( self, config ) :
00143
00144 if hasattr( self.w, "ItemList" ) :
00145 self.ItemList = self.w.ItemList
00146 else :
00147 datasvc = config[ self.w.EvtDataSvc ]
00148 if hasattr( datasvc, "ItemList" ) :
00149 self.ItemList = datasvc.ItemList
00150
00151 if hasattr( self.w, "OptItemList" ) :
00152 self.OptItemList = self.w.OptItemList
00153 else :
00154 datasvc = config[ self.w.EvtDataSvc ]
00155 if hasattr( datasvc, "OptItemList" ) :
00156 self.OptItemList = datasvc.OptItemList
00157 return
00158
00159 def set( self, key, output ) :
00160 self.key = key
00161 self.output = output
00162 return
00163
00164 def __repr__( self ) :
00165 s = ""
00166 line = '-'*80
00167 s += (line+'\n')
00168 s += "Writer : %s\n"%( self.wName )
00169 s += "Writer Type : %s\n"%( self.wType )
00170 s += "Writer Output : %s\n"%( self.output )
00171 s += "DataSvc : %s\n"%( self.datasvcName )
00172 s += "DataSvc Output : %s\n"%( self.svcOutput )
00173 s += '\n'
00174 s += "Key for config : %s\n"%( self.key )
00175 s += "Output File : %s\n"%( self.output )
00176 s += "ItemList : %s\n"%( self.ItemList )
00177 s += "OptItemList : %s\n"%( self.OptItemList )
00178 s += (line+'\n')
00179 return s
00180
00181
00182
00183 class CollectHistograms( PyAlgorithm ) :
00184 '''
00185 GaudiPython algorithm used to clean up histos on the Reader and Workers
00186 Only has a finalize method()
00187 This retrieves a dictionary of path:histo objects and sends it to the
00188 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
00189 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00190 at the writer end, there will be an error.
00191 '''
00192 def __init__( self, gmpcomponent ) :
00193 PyAlgorithm.__init__( self )
00194 self._gmpc = gmpcomponent
00195 self.log = self._gmpc.log
00196 return None
00197 def execute( self ) :
00198 return SUCCESS
00199 def finalize( self ) :
00200 self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00201 self._gmpc.hDict = self._gmpc.dumpHistograms( )
00202 ks = self._gmpc.hDict.keys()
00203 self.log.info('%i Objects in Histogram Store'%(len(ks)))
00204
00205
00206
00207 chunk = 100
00208 reps = len(ks)/chunk + 1
00209 for i in xrange(reps) :
00210 someKeys = ks[i*chunk : (i+1)*chunk]
00211 smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00212 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00213
00214 self.log.debug('Signalling end of histos to Writer')
00215 self._gmpc.hq.put( 'HISTOS_SENT' )
00216 self.log.debug( 'Waiting on Sync Event' )
00217 self._gmpc.sEvent.wait()
00218 self.log.debug( 'Histo Sync Event set, clearing and returning' )
00219 self._gmpc.hvt.clearStore()
00220 root = gbl.DataObject()
00221 setOwnership(root, False)
00222 self._gmpc.hvt.setRoot( '/stat', root )
00223 return SUCCESS
00224
00225
00226
00227 class EventCommunicator( object ) :
00228
00229
00230
00231 def __init__( self, GMPComponent, qin, qout ) :
00232 self._gmpc = GMPComponent
00233 self.log = self._gmpc.log
00234
00235 self.maxsize = 50
00236
00237 self.qin = qin
00238 self.qout = qout
00239
00240
00241 self.allsent = False
00242 self.allrecv = False
00243
00244
00245 self.nSent = 0
00246 self.nRecv = 0
00247 self.sizeSent = 0
00248 self.sizeRecv = 0
00249 self.qinTime = 0
00250 self.qoutTime = 0
00251
00252 def send( self, item ) :
00253
00254
00255 assert item.__class__.__name__ == 'tuple'
00256 startTransmission = time.time()
00257 self.qout.put( item )
00258
00259
00260 while self.qout._buffer : time.sleep( NAP )
00261 self.qoutTime += time.time()-startTransmission
00262 self.sizeSent += item[1].Length()
00263 self.nSent += 1
00264 return SUCCESS
00265
00266 def receive( self, timeout=None ) :
00267
00268 startWait = time.time()
00269 try :
00270 itemIn = self.qin.get( timeout=timeout )
00271 except Empty :
00272 return None
00273 self.qinTime += time.time()-startWait
00274 self.nRecv += 1
00275 if itemIn.__class__.__name__ == 'tuple' :
00276 self.sizeRecv += itemIn[1].Length()
00277 else :
00278 self.nRecv -= 1
00279 try :
00280 self.qin.task_done()
00281 except :
00282 self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00283 %(self._gmpc.nodeType))
00284 return itemIn
00285
00286 def finalize( self ) :
00287 self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00288
00289
00290
00291 if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00292 elif self._gmpc.nodeType == 'Writer' : downstream = 0
00293 elif self._gmpc.nodeType == 'Worker' : downstream = 1
00294 for i in xrange(downstream) :
00295 self.qout.put( 'FINISHED' )
00296 if self._gmpc.nodeType != 'Writer' :
00297 self.qout.join()
00298
00299 self.statistics( )
00300
00301 def statistics( self ) :
00302 self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00303 self.log.info ( 'Items Sent : %i'%(self.nSent) )
00304 self.log.info ( 'Items Received : %i'%(self.nRecv) )
00305 self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
00306 self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
00307 self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
00308 self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
00309
00310
00311
00312 class TESSerializer( object ) :
00313 def __init__( self, gaudiTESSerializer, evtDataSvc,
00314 nodeType, nodeID, log ) :
00315 self.T = gaudiTESSerializer
00316 self.evt = evtDataSvc
00317 self.buffersIn = []
00318 self.buffersOut = []
00319 self.nIn = 0
00320 self.nOut = 0
00321 self.tDump = 0.0
00322 self.tLoad = 0.0
00323
00324 self.nodeType = nodeType
00325 self.nodeID = nodeID
00326 self.log = log
00327 def Load( self, tbuf ) :
00328 root = gbl.DataObject()
00329 setOwnership( root, False )
00330 self.evt.setRoot( '/Event', root )
00331 t = time.time()
00332 self.T.loadBuffer( tbuf )
00333 self.tLoad += (time.time() - t)
00334 self.nIn += 1
00335 self.buffersIn.append( tbuf.Length() )
00336 def Dump( self ) :
00337 t = time.time()
00338 tb = TBufferFile( TBuffer.kWrite )
00339 self.T.dumpBuffer(tb)
00340 self.tDump += ( time.time()-t )
00341 self.nOut += 1
00342 self.buffersOut.append( tb.Length() )
00343 return tb
00344 def Report( self ) :
00345 evIn = "Events Loaded : %i"%( self.nIn )
00346 evOut = "Events Dumped : %i"%( self.nOut )
00347 din = sum( self.buffersIn )
00348 dataIn = "Data Loaded : %i"%(din)
00349 dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00350 if self.nIn :
00351 avgIn = "Avg Buf Loaded : %5.2f Mb"\
00352 %( din/(self.nIn*MB) )
00353 maxIn = "Max Buf Loaded : %5.2f Mb"\
00354 %( max(self.buffersIn)/MB )
00355 else :
00356 avgIn = "Avg Buf Loaded : N/A"
00357 maxIn = "Max Buf Loaded : N/A"
00358 dout = sum( self.buffersOut )
00359 dataOut = "Data Dumped : %i"%(dout)
00360 dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00361 if self.nOut :
00362 avgOut = "Avg Buf Dumped : %5.2f Mb"\
00363 %( din/(self.nOut*MB) )
00364 maxOut = "Max Buf Dumped : %5.2f Mb"\
00365 %( max(self.buffersOut)/MB )
00366 else :
00367 avgOut = "Avg Buf Dumped : N/A"
00368 maxOut = "Max Buf Dumped : N/A"
00369 dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
00370 loadTime = "Total Load Time : %5.2f"%( self.tLoad )
00371
00372 lines = evIn ,\
00373 evOut ,\
00374 dataIn ,\
00375 dataInMb ,\
00376 avgIn ,\
00377 maxIn ,\
00378 dataOut ,\
00379 dataOutMb,\
00380 avgOut ,\
00381 maxOut ,\
00382 dumpTime ,\
00383 loadTime
00384 self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00385 for line in lines :
00386 self.log.info( line )
00387 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00388
00389
00390
00391 class GMPComponent( object ) :
00392
00393
00394
00395
00396
00397
00398 def __init__( self, nodeType, nodeID, queues, events, params ) :
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408 self.nodeType = nodeType
00409 current_process().name = nodeType
00410
00411
00412 self.initEvent, eventLoopSyncer, self.finalEvent = events
00413 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer
00414
00415
00416 self.nWorkers, self.sEvent, self.config, self.log = params
00417 self.nodeID = nodeID
00418
00419
00420 self.currentEvent = None
00421
00422
00423 qPair, histq, fq = queues
00424
00425
00426 if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00427
00428 qin, qout = qPair
00429 self.evcom = EventCommunicator( self, qin, qout )
00430 else :
00431
00432 assert self.nodeType == 'Writer'
00433 self.evcoms = []
00434 qsin = qPair[0]
00435 for q in qsin :
00436 ec = EventCommunicator( self, q, None )
00437 self.evcoms.append( ec )
00438
00439 self.hq = histq
00440
00441 self.fq = fq
00442
00443
00444
00445 self.nIn = 0
00446 self.nOut = 0
00447
00448
00449 self.stat = SUCCESS
00450
00451
00452 self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00453
00454
00455
00456 self.iTime = 0.0
00457 self.rTime = 0.0
00458 self.fTime = 0.0
00459 self.firstEvTime = 0.0
00460 self.tTime = 0.0
00461
00462
00463 self.proc = Process( target=self.Engine )
00464
00465 self.num = 0
00466
00467 ks = self.config.keys()
00468 self.app = None
00469 list = ["Brunel", "DaVinci", "Boole", "Gauss"]
00470 for k in list:
00471 if k in ks: self.app = k
00472
00473 def Start( self ) :
00474
00475 self.proc.start()
00476
00477 def Engine( self ) :
00478
00479
00480 pass
00481
00482 def processConfiguration( self ) :
00483
00484 pass
00485
00486 def SetupGaudiPython( self ) :
00487
00488
00489 self.a = AppMgr()
00490 if SMAPS :
00491 from AlgSmapShot import SmapShot
00492 smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00493 ss = SmapShot( logname=smapsLog )
00494 self.a.addAlgorithm( ss )
00495 self.evt = self.a.evtsvc()
00496 self.hvt = self.a.histsvc()
00497 self.fsr = self.a.filerecordsvc()
00498 self.inc = self.a.service('IncidentSvc','IIncidentSvc')
00499 self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00500 self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
00501 self.TS = TESSerializer( self.ts, self.evt,
00502 self.nodeType, self.nodeID, self.log )
00503 return SUCCESS
00504
00505 def StartGaudiPython( self ) :
00506 self.a.initialize()
00507 self.a.start()
00508 return SUCCESS
00509
00510 def LoadTES( self, tbufferfile ) :
00511 root = gbl.DataObject()
00512 setOwnership(root, False)
00513 self.evt.setRoot( '/Event', root )
00514 self.ts.loadBuffer(tbufferfile)
00515
00516 def getEventNumber( self ) :
00517 if self.app != 'Gauss':
00518
00519
00520
00521
00522
00523 lst = [ '/Event/Gen/Header',
00524 '/Event/Rec/Header' ]
00525 for l in lst :
00526 path = l
00527 try :
00528 n = self.evt[path].evtNumber()
00529 return n
00530 except :
00531
00532 continue
00533
00534
00535
00536 try :
00537 n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00538 return n
00539 except :
00540 pass
00541
00542
00543 if self.nIn > 0 or self.nOut > 0 :
00544 pass
00545 else :
00546 self.log.warning('Could not determine Event Number')
00547 return -1
00548 else:
00549 if self.nodeID == -1:
00550 self.num = self.num + 1
00551 self.log.info( 'Evt Number %d' % self.num )
00552 return self.num
00553
00554 def IdentifyWriters( self ) :
00555
00556
00557
00558 d = {}
00559 keys = [ "events", "records", "tuples", "histos" ]
00560 for k in keys :
00561 d[k] = []
00562
00563
00564 wkeys = WRITERTYPES.keys()
00565 for v in self.config.values() :
00566 if v.__class__.__name__ in wkeys :
00567 writerType = WRITERTYPES[ v.__class__.__name__ ]
00568 d[writerType].append( MiniWriter(v, writerType, self.config) )
00569 if self.nodeID == 0 :
00570 self.log.info('Writer Found : %s'%(v.name()))
00571
00572
00573 if 'HistogramPersistencySvc' in self.config.keys() :
00574 hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00575 d[ "histos" ].append( hfile )
00576 return d
00577
00578 def dumpHistograms( self ) :
00579 '''
00580 Method used by the GaudiPython algorithm CollectHistos
00581 to obtain a dictionary of form { path : object }
00582 representing the Histogram Store
00583 '''
00584 nlist = self.hvt.getHistoNames( )
00585 histDict = {}
00586 objects = 0 ; histos = 0
00587 if nlist :
00588 for n in nlist :
00589 o = self.hvt[ n ]
00590 if type(o) in aidatypes :
00591 o = aida2root(o)
00592 histos += 1
00593 else :
00594 objects += 1
00595 histDict[ n ] = o
00596 else :
00597 print 'WARNING : no histograms to recover?'
00598 return histDict
00599
00600 def Initialize( self ) :
00601 start = time.time()
00602 self.processConfiguration( )
00603 self.SetupGaudiPython( )
00604 self.initEvent.set()
00605 self.StartGaudiPython( )
00606
00607 if self.app == 'Gauss':
00608
00609 tool = self.a.tool( "ToolSvc.EvtCounter" )
00610 self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
00611 self.log.info("interface %s" %self.cntr)
00612 else:
00613 self.cntr = None
00614
00615 self.iTime = time.time() - start
00616
00617 def Finalize( self ) :
00618 start = time.time()
00619 self.a.stop()
00620 self.a.finalize()
00621 self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00622 self.finalEvent.set()
00623 self.fTime = time.time() - start
00624
00625 def Report( self ) :
00626 self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00627 allTime = "Alive Time : %5.2f"%(self.tTime)
00628 initTime = "Init Time : %5.2f"%(self.iTime)
00629 frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00630 runTime = "Run Time : %5.2f"%(self.rTime)
00631 finTime = "Finalise Time : %5.2f"%(self.fTime)
00632 tup = ( allTime, initTime, frstTime, runTime, finTime )
00633 for t in tup :
00634 self.log.info( t )
00635 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00636
00637 self.TS.Report()
00638
00639
00640
00641 class Reader( GMPComponent ) :
00642 def __init__( self, queues, events, params ) :
00643 GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00644
00645 def processConfiguration( self ) :
00646
00647
00648
00649
00650 self.config[ 'ApplicationMgr' ].TopAlg = []
00651 self.config[ 'ApplicationMgr' ].OutStream = []
00652 if "HistogramPersistencySvc" in self.config.keys() :
00653 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00654 self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
00655 self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
00656
00657 def DumpEvent( self ) :
00658 tb = TBufferFile( TBuffer.kWrite )
00659
00660 self.ts.dumpBuffer( tb )
00661
00662 return tb
00663
00664 def DoFirstEvent( self ) :
00665
00666
00667 startFirst = time.time()
00668 self.log.info('Reader : First Event')
00669 if self.nOut == self.evtMax :
00670 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00671 self.lastEvent.set()
00672 return SUCCESS
00673 else :
00674
00675 self.a.run(1)
00676 if not bool(self.evt['/Event']) :
00677 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00678 self.lastEvent.set()
00679 return SUCCESS
00680 else :
00681
00682 if self.app == "Gauss":
00683 lst = self.evt.getHistoNames()
00684 else:
00685 try :
00686 lst = self.evt.getList()
00687 if self.app == "DaVinci":
00688 daqnode = self.evt.retrieveObject( '/Event/DAQ' ).registry()
00689 setOwnership( daqnode, False )
00690 self.evt.getList( daqnode, lst, daqnode.address().par() )
00691 except :
00692 self.log.critical('Reader could not acquire TES List!')
00693 self.lastEvent.set()
00694 return FAILURE
00695 self.log.info('Reader : TES List : %i items'%(len(lst)))
00696 for l in lst :
00697 self.ts.addItem(l)
00698 self.currentEvent = self.getEventNumber( )
00699 tb = self.TS.Dump( )
00700 self.log.info('First Event Sent')
00701 self.evcom.send( (self.currentEvent, tb) )
00702 self.nOut += 1
00703 self.eventLoopSyncer.set()
00704 self.evt.clearStore( )
00705 self.firstEvTime = time.time()-startFirst
00706 return SUCCESS
00707
00708 def Engine( self ) :
00709
00710 startEngine = time.time()
00711 self.log.name = 'Reader'
00712 self.log.info('Reader Process starting')
00713
00714 self.Initialize()
00715
00716
00717 self.a.addAlgorithm( CollectHistograms(self) )
00718
00719 self.log.info('Reader Beginning Distribution')
00720 sc = self.DoFirstEvent( )
00721 if sc.isSuccess() :
00722 self.log.info('Reader First Event OK')
00723 else :
00724 self.log.critical('Reader Failed on First Event')
00725 self.stat = FAILURE
00726
00727
00728 while True :
00729
00730 if self.nOut == self.evtMax :
00731 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00732 break
00733
00734 if not self.stat.isSuccess() :
00735 self.log.critical( 'Reader is Damaged!' )
00736 break
00737
00738 t = time.time()
00739 self.a.run(1)
00740 self.rTime += (time.time()-t)
00741 if not bool(self.evt['/Event']) :
00742 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00743 break
00744 self.currentEvent = self.getEventNumber( )
00745 tb = self.TS.Dump( )
00746 self.evcom.send( (self.currentEvent, tb) )
00747
00748 self.nOut += 1
00749 self.eventLoopSyncer.set()
00750 self.evt.clearStore( )
00751 self.log.info('Setting <Last> Event')
00752 self.lastEvent.set()
00753
00754
00755 self.log.info( 'Reader : Event Distribution complete.' )
00756 self.evcom.finalize()
00757 self.Finalize()
00758 self.tTime = time.time() - startEngine
00759 self.Report()
00760
00761
00762
00763 class Worker( GMPComponent ) :
00764 def __init__( self, workerID, queues, events, params ) :
00765 GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00766
00767 self.writerDict = self.IdentifyWriters( )
00768
00769 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00770 self.log.debug("Worker-%i Created OK"%(self.nodeID))
00771 self.eventOutput = True
00772
00773 def processConfiguration( self ) :
00774
00775
00776
00777
00778 self.config[ 'EventSelector' ].Input = []
00779 self.config[ 'ApplicationMgr' ].OutStream = []
00780 if "HistogramPersistencySvc" in self.config.keys() :
00781 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00782 formatHead = '[Worker-%i] '%(self.nodeID)
00783 self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00784
00785 for key, lst in self.writerDict.iteritems() :
00786 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00787
00788 for m in self.writerDict[ "tuples" ] :
00789
00790
00791 newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00792 self.config[ m.key ].Output = newName
00793
00794
00795
00796
00797
00798
00799
00800 try:
00801 if "ToolSvc.EvtCounter" not in self.config:
00802 from Configurables import EvtCounter
00803 counter = EvtCounter()
00804 else:
00805 counter = self.config["ToolSvc.EvtCounter"]
00806 counter.UseIncident = False
00807 except:
00808
00809 self.log.warning('Cannot configure EvtCounter')
00810
00811 def Engine( self ) :
00812 startEngine = time.time()
00813 self.log.name = "Worker-%i"%(self.nodeID)
00814 self.log.info("Worker %i starting Engine"%(self.nodeID))
00815 self.Initialize()
00816 self.filerecordsAgent = FileRecordsAgent(self)
00817
00818
00819 self.log.info('EVT WRITERS ON WORKER : %i'\
00820 %( len(self.writerDict['events'])))
00821
00822 nEventWriters = len( self.writerDict[ "events" ] )
00823 if nEventWriters :
00824 itemList = set()
00825 optItemList = set()
00826 for m in self.writerDict[ "events" ] :
00827 for item in m.ItemList :
00828 hsh = item.find( '#' )
00829 if hsh != -1:
00830 item = item[:hsh]
00831 itemList.add( item )
00832 for item in m.OptItemList :
00833 hsh = item.find( '#' )
00834 if hsh != -1:
00835 item = item[:hsh]
00836 optItemList.add( item )
00837
00838 itemList -= optItemList
00839 for item in sorted( itemList ):
00840 self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
00841 self.ts.addItem( item )
00842 for item in sorted( optItemList ):
00843 self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
00844 self.ts.addOptItem( item )
00845 else :
00846 self.log.info( 'There is no Event Output for this app' )
00847 self.eventOutput = False
00848
00849
00850 self.a.addAlgorithm( CollectHistograms(self) )
00851
00852
00853 self.log.name = "Worker-%i"%(self.nodeID)
00854 Go = True
00855 while Go :
00856 packet = self.evcom.receive( )
00857 if packet : pass
00858 else : continue
00859 if packet == 'FINISHED' : break
00860 evtNumber, tbin = packet
00861 if self.cntr != None:
00862
00863 self.cntr.setEventCounter( evtNumber )
00864
00865 self.nIn += 1
00866 self.TS.Load( tbin )
00867
00868 t = time.time()
00869 sc = self.a.executeEvent()
00870 if self.nIn == 1 :
00871 self.firstEvTime = time.time()-t
00872 else :
00873 self.rTime += (time.time()-t)
00874 if sc.isSuccess() :
00875 pass
00876 else :
00877 self.log.warning('Did not Execute Event')
00878 self.evt.clearStore()
00879 continue
00880 if self.isEventPassed() :
00881 pass
00882 else :
00883 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00884 self.evt.clearStore()
00885 continue
00886 if self.eventOutput :
00887
00888
00889 self.currentEvent = self.getEventNumber( )
00890 tb = self.TS.Dump( )
00891 self.evcom.send( (self.currentEvent, tb) )
00892 self.nOut += 1
00893 self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
00894 self.eventLoopSyncer.set()
00895 self.evt.clearStore( )
00896 self.log.info('Setting <Last> Event')
00897 self.lastEvent.set()
00898
00899 self.evcom.finalize()
00900 self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00901
00902 self.filerecordsAgent.SendFileRecords()
00903 self.Finalize()
00904 self.tTime = time.time()-startEngine
00905 self.Report()
00906
00907 def getCheckAlgs( self ) :
00908 '''
00909 For some output writers, a check is performed to see if the event has
00910 executed certain algorithms.
00911 These reside in the AcceptAlgs property for those writers
00912 '''
00913 acc = []
00914 req = []
00915 vet = []
00916 for m in self.writerDict[ "events" ] :
00917 if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
00918 if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00919 if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
00920 return (acc, req, vet)
00921
00922
00923 def checkExecutedPassed( self, algName ) :
00924 if self.a.algorithm( algName )._ialg.isExecuted()\
00925 and self.a.algorithm( algName )._ialg.filterPassed() :
00926 return True
00927 else :
00928 return False
00929
00930 def isEventPassed( self ) :
00931 '''
00932 Check the algorithm status for an event.
00933 Depending on output writer settings, the event
00934 may be declined based on various criteria.
00935 This is a transcript of the check that occurs in GaudiSvc::OutputStream
00936 '''
00937 passed = False
00938
00939 self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00940 if self.acceptAlgs :
00941 for name in self.acceptAlgs :
00942 if self.checkExecutedPassed( name ) :
00943 passed = True
00944 break
00945 else :
00946 passed = True
00947
00948 self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00949 for name in self.requireAlgs :
00950 if self.checkExecutedPassed( name ) :
00951 pass
00952 else :
00953 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00954 passed = False
00955
00956 self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00957 for name in self.vetoAlgs :
00958 if self.checkExecutedPassed( name ) :
00959 pass
00960 else :
00961 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00962 passed = False
00963 return passed
00964
00965
00966
00967 class Writer( GMPComponent ) :
00968 def __init__( self, queues, events, params ) :
00969 GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00970
00971 self.writerDict = self.IdentifyWriters( )
00972
00973 self.status = [False]*self.nWorkers
00974 self.log.name = "Writer--2"
00975
00976 def processConfiguration( self ) :
00977
00978
00979
00980 self.config[ 'ApplicationMgr' ].TopAlg = []
00981 self.config[ 'EventSelector' ].Input = []
00982
00983 self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00984
00985
00986 for key, lst in self.writerDict.iteritems() :
00987 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00988
00989
00990
00991
00992
00993 for m in self.writerDict[ "events" ] :
00994 self.log.debug( 'Processing Event Writer : %s'%(m) )
00995 newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00996 self.config[ m.key ].Output = newName
00997
00998
00999
01000
01001
01002
01003
01004
01005 for m in self.writerDict[ "records" ] :
01006 self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
01007 newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
01008 extra=" OPT='RECREATE'" )
01009 self.config[ m.key ].Output = newName
01010
01011
01012 hs = "HistogramPersistencySvc"
01013 n = None
01014 if hs in self.config.keys() :
01015 n = self.config[ hs ].OutputFile
01016 if n :
01017 newName=self.config[hs].OutputFile.replace('.',\
01018 '.p%i.'%(self.nWorkers))
01019 self.config[ hs ].OutputFile = newName
01020
01021 def Engine( self ) :
01022 startEngine = time.time()
01023 self.Initialize()
01024 self.histoAgent = HistoAgent( self )
01025 self.filerecordsAgent = FileRecordsAgent( self )
01026
01027
01028 Go = True
01029 current = -1
01030 stopCriteria = self.nWorkers
01031 while Go :
01032 current = (current+1)%self.nWorkers
01033 packet = self.evcoms[current].receive( timeout=0.01 )
01034 if packet == None :
01035 continue
01036 if packet == 'FINISHED' :
01037 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
01038 self.status[current] = True
01039 if all(self.status) :
01040 self.log.info('FINISHED recd from all workers, break loop')
01041 break
01042 continue
01043
01044 self.nIn += 1
01045 evtNumber, tbin = packet
01046 self.TS.Load( tbin )
01047 t = time.time()
01048 self.a.executeEvent()
01049 self.rTime += ( time.time()-t )
01050 self.currentEvent = self.getEventNumber( )
01051 self.evt.clearStore( )
01052 self.eventLoopSyncer.set()
01053 self.log.name = "Writer--2"
01054 self.log.info('Setting <Last> Event')
01055 self.lastEvent.set()
01056
01057
01058 [ e.finalize() for e in self.evcoms ]
01059
01060 sc = self.histoAgent.Receive()
01061 sc = self.histoAgent.RebuildHistoStore()
01062 if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01063 else : self.log.warning( 'Histo Store Error in Rebuild' )
01064
01065
01066 sc = self.filerecordsAgent.Receive()
01067 self.filerecordsAgent.Rebuild()
01068 self.Finalize()
01069
01070 self.Report()
01071
01072
01073
01074
01075
01076
01077
01078 class Coord( object ) :
01079 def __init__( self, nWorkers, config, log ) :
01080
01081 self.log = log
01082 self.config = config
01083
01084 self.log.name = 'GaudiPython-Parallel-Logger'
01085 self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01086
01087 if nWorkers == -1 :
01088
01089 self.nWorkers = cpu_count()
01090 else :
01091 self.nWorkers = nWorkers
01092
01093
01094 self.qs = self.SetupQueues( )
01095 self.hq = JoinableQueue( )
01096 self.fq = JoinableQueue( )
01097
01098
01099 self.sInit = Syncer( self.nWorkers, self.log,
01100 limit=WAIT_INITIALISE,
01101 step=STEP_INITIALISE )
01102 self.sRun = Syncer( self.nWorkers, self.log,
01103 manyEvents=True,
01104 limit=WAIT_SINGLE_EVENT,
01105 step=STEP_EVENT,
01106 firstEvent=WAIT_FIRST_EVENT )
01107 self.sFin = Syncer( self.nWorkers, self.log,
01108 limit=WAIT_FINALISE,
01109 step=STEP_FINALISE )
01110
01111 self.histSyncEvent = Event()
01112
01113
01114 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01115
01116
01117 self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01118 self.workers = []
01119 for i in xrange( self.nWorkers ) :
01120 wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01121 self.workers.append( wk )
01122 self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01123
01124 self.system = []
01125 self.system.append(self.writer)
01126 [ self.system.append(w) for w in self.workers ]
01127 self.system.append(self.reader)
01128
01129 def getSyncEvents( self, nodeID ) :
01130 init = self.sInit.d[nodeID].event
01131 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01132 fin = self.sFin.d[nodeID].event
01133 return ( init, run, fin )
01134
01135 def getQueues( self, nodeID ) :
01136 eventQ = self.qs[ nodeID ]
01137 histQ = self.hq
01138 fsrQ = self.fq
01139 return ( eventQ, histQ, fsrQ )
01140
01141 def Go( self ) :
01142
01143
01144 self.log.name = 'GaudiPython-Parallel-Logger'
01145 self.log.info( 'INITIALISING SYSTEM' )
01146 for p in self.system :
01147 p.Start()
01148 sc = self.sInit.syncAll(step="Initialise")
01149 if sc == SUCCESS: pass
01150 else : self.Terminate() ; return FAILURE
01151
01152
01153 self.log.name = 'GaudiPython-Parallel-Logger'
01154 self.log.info( 'RUNNING SYSTEM' )
01155 sc = self.sRun.syncAll(step="Run")
01156 if sc == SUCCESS: pass
01157 else : self.Terminate() ; return FAILURE
01158
01159
01160 self.log.name = 'GaudiPython-Parallel-Logger'
01161 self.log.info( 'FINALISING SYSTEM' )
01162 sc = self.sFin.syncAll(step="Finalise")
01163 if sc == SUCCESS: pass
01164 else : self.Terminate() ; return FAILURE
01165
01166
01167 self.log.info( "Cleanly join all Processes" )
01168 self.Stop()
01169 self.log.info( "Report Total Success to Main.py" )
01170 return SUCCESS
01171
01172 def Terminate( self ) :
01173
01174 children = multiprocessing.active_children()
01175 for i in children:
01176 i.terminate()
01177
01178
01179
01180
01181
01182 def Stop( self ) :
01183
01184 self.system.reverse()
01185 for s in self.system :
01186 s.proc.join()
01187 return SUCCESS
01188
01189 def SetupQueues( self ) :
01190
01191
01192
01193
01194
01195
01196
01197 rwk = JoinableQueue()
01198
01199 workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01200 d = {}
01201 d[-1] = (None, rwk)
01202 d[-2] = (workersWriter, None)
01203 for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
01204 return d
01205
01206