00001 from Gaudi.Configuration import *
00002 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE
00003 from ROOT import TBufferFile, TBuffer
00004 from multiprocessing import Process, Queue, JoinableQueue, Event
00005 from multiprocessing import cpu_count, current_process
00006 from multiprocessing.queues import Empty
00007 from pTools import *
00008 import time, sys, os
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 NAP = 0.001
00036 MB = 1024.0*1024.0
00037
00038 WAIT_INITIALISE = 60*5
00039 WAIT_FIRST_EVENT = 60*3
00040 WAIT_SINGLE_EVENT = 60*3
00041 WAIT_FINALISE = 60*2
00042 STEP_INITIALISE = 10
00043 STEP_EVENT = 2
00044 STEP_FINALISE = 5
00045
00046
00047 SMAPS = False
00048
00049
00050
00051
00052
00053
00054 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00055
00056
00057
00058 aidatypes = ( gbl.AIDA.IHistogram,
00059 gbl.AIDA.IHistogram1D,
00060 gbl.AIDA.IHistogram2D,
00061 gbl.AIDA.IHistogram3D,
00062 gbl.AIDA.IProfile1D,
00063 gbl.AIDA.IProfile2D,
00064 gbl.AIDA.IBaseHistogram )
00065
00066
00067 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00068
00069
00070 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
00071 'InputCopyStream' : "events",
00072 'OutputStream' : "events",
00073 'RecordStream' : "records",
00074 'RunRecordStream' : "records",
00075 'SequentialOutputStream' : "events",
00076 'TagCollectionStream' : "tuples" }
00077
00078
00079
00080 class MiniWriter( object ) :
00081 '''
00082 A class to represent a writer in the GaudiPython configuration
00083 It can be non-trivial to access the name of the output file; it may be
00084 specified in the DataSvc, or just on the writer, may be a list, or string
00085 Also, there are three different types of writer (events, records, tuples)
00086 so this bootstrap class provides easy access to this info while configuring
00087 '''
00088 def __init__( self, writer, wType, config ) :
00089 self.w = writer
00090 self.wType = wType
00091
00092
00093
00094 self.key = None
00095 self.output = None
00096 self.ItemList = None
00097 self.OptItemList = None
00098
00099 self.wName = writer.getName()
00100
00101 self.woutput = None
00102 self.datasvcName = None
00103 self.svcOutput = None
00104 if hasattr( self.w, "Output" ) :
00105 self.woutput = self.w.Output
00106 self.getItemLists( config )
00107 self.set( self.wName, self.w.Output )
00108 return
00109 else :
00110
00111
00112 self.datasvcName = self.w.EvtDataSvc
00113 datasvc = config[ self.datasvcName ]
00114 if hasattr( datasvc, "Output" ) :
00115 self.getItemLists( config )
00116 self.set( self.datasvcName, datasvc.Output )
00117 return
00118
00119 def getNewName( self, replaceThis, withThis, extra='' ) :
00120
00121
00122
00123
00124
00125
00126 assert replaceThis.__class__.__name__ == 'str'
00127 assert withThis.__class__.__name__ == 'str'
00128 old = self.output
00129 lst = False
00130 if old.__class__.__name__ == 'list' :
00131 old = self.output[0]
00132 lst = True
00133 new = old.replace( replaceThis, withThis )
00134 new += extra
00135 if lst :
00136 return [ new ]
00137 else :
00138 return new
00139
00140 def getItemLists( self, config ) :
00141
00142 if hasattr( self.w, "ItemList" ) :
00143 self.ItemList = self.w.ItemList
00144 else :
00145 datasvc = config[ self.w.EvtDataSvc ]
00146 if hasattr( datasvc, "ItemList" ) :
00147 self.ItemList = datasvc.ItemList
00148
00149 if hasattr( self.w, "OptItemList" ) :
00150 self.OptItemList = self.w.OptItemList
00151 else :
00152 datasvc = config[ self.w.EvtDataSvc ]
00153 if hasattr( datasvc, "OptItemList" ) :
00154 self.OptItemList = datasvc.OptItemList
00155 return
00156
00157 def set( self, key, output ) :
00158 self.key = key
00159 self.output = output
00160 return
00161
00162 def __repr__( self ) :
00163 s = ""
00164 line = '-'*80
00165 s += (line+'\n')
00166 s += "Writer : %s\n"%( self.wName )
00167 s += "Writer Type : %s\n"%( self.wType )
00168 s += "Writer Output : %s\n"%( self.output )
00169 s += "DataSvc : %s\n"%( self.datasvcName )
00170 s += "DataSvc Output : %s\n"%( self.svcOutput )
00171 s += '\n'
00172 s += "Key for config : %s\n"%( self.key )
00173 s += "Output File : %s\n"%( self.output )
00174 s += "ItemList : %s\n"%( self.ItemList )
00175 s += "OptItemList : %s\n"%( self.OptItemList )
00176 s += (line+'\n')
00177 return s
00178
00179
00180
00181 class CollectHistograms( PyAlgorithm ) :
00182 '''
00183 GaudiPython algorithm used to clean up histos on the Reader and Workers
00184 Only has a finalize method()
00185 This retrieves a dictionary of path:histo objects and sends it to the
00186 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
00187 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00188 at the writer end, there will be an error.
00189 '''
00190 def __init__( self, gmpcomponent ) :
00191 PyAlgorithm.__init__( self )
00192 self._gmpc = gmpcomponent
00193 self.log = self._gmpc.log
00194 return None
00195 def execute( self ) :
00196 return SUCCESS
00197 def finalize( self ) :
00198 self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00199 self._gmpc.hDict = self._gmpc.dumpHistograms( )
00200 ks = self._gmpc.hDict.keys()
00201 self.log.info('%i Objects in Histogram Store'%(len(ks)))
00202
00203
00204
00205 chunk = 100
00206 reps = len(ks)/chunk + 1
00207 for i in xrange(reps) :
00208 someKeys = ks[i*chunk : (i+1)*chunk]
00209 smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00210 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00211
00212 self.log.debug('Signalling end of histos to Writer')
00213 self._gmpc.hq.put( 'HISTOS_SENT' )
00214 self.log.debug( 'Waiting on Sync Event' )
00215 self._gmpc.sEvent.wait()
00216 self.log.debug( 'Histo Sync Event set, clearing and returning' )
00217 self._gmpc.hvt.clearStore()
00218 root = gbl.DataObject()
00219 setOwnership(root, False)
00220 self._gmpc.hvt.setRoot( '/stat', root )
00221 return SUCCESS
00222
00223
00224
00225 class EventCommunicator( object ) :
00226
00227
00228
00229 def __init__( self, GMPComponent, qin, qout ) :
00230 self._gmpc = GMPComponent
00231 self.log = self._gmpc.log
00232
00233 self.maxsize = 50
00234
00235 self.qin = qin
00236 self.qout = qout
00237
00238
00239 self.allsent = False
00240 self.allrecv = False
00241
00242
00243 self.nSent = 0
00244 self.nRecv = 0
00245 self.sizeSent = 0
00246 self.sizeRecv = 0
00247 self.qinTime = 0
00248 self.qoutTime = 0
00249
00250 def send( self, item ) :
00251
00252
00253 assert item.__class__.__name__ == 'tuple'
00254 startTransmission = time.time()
00255 self.qout.put( item )
00256
00257
00258 while self.qout._buffer : time.sleep( NAP )
00259 self.qoutTime += time.time()-startTransmission
00260 self.sizeSent += item[1].Length()
00261 self.nSent += 1
00262 return SUCCESS
00263
00264 def receive( self, timeout=None ) :
00265
00266 startWait = time.time()
00267 try :
00268 itemIn = self.qin.get( timeout=timeout )
00269 except Empty :
00270 return None
00271 self.qinTime += time.time()-startWait
00272 self.nRecv += 1
00273 if itemIn.__class__.__name__ == 'tuple' :
00274 self.sizeRecv += itemIn[1].Length()
00275 else :
00276 self.nRecv -= 1
00277 try :
00278 self.qin.task_done()
00279 except :
00280 self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00281 %(self._gmpc.nodeType))
00282 return itemIn
00283
00284 def finalize( self ) :
00285 self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00286
00287
00288
00289 if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00290 elif self._gmpc.nodeType == 'Writer' : downstream = 0
00291 elif self._gmpc.nodeType == 'Worker' : downstream = 1
00292 for i in xrange(downstream) :
00293 self.qout.put( 'FINISHED' )
00294 if self._gmpc.nodeType != 'Writer' :
00295 self.qout.join()
00296
00297 self.statistics( )
00298
00299 def statistics( self ) :
00300 self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00301 self.log.info ( 'Items Sent : %i'%(self.nSent) )
00302 self.log.info ( 'Items Received : %i'%(self.nRecv) )
00303 self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
00304 self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
00305 self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
00306 self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
00307
00308
00309
00310 class TESSerializer( object ) :
00311 def __init__( self, gaudiTESSerializer, evtDataSvc,
00312 nodeType, nodeID, log ) :
00313 self.T = gaudiTESSerializer
00314 self.evt = evtDataSvc
00315 self.buffersIn = []
00316 self.buffersOut = []
00317 self.nIn = 0
00318 self.nOut = 0
00319 self.tDump = 0.0
00320 self.tLoad = 0.0
00321
00322 self.nodeType = nodeType
00323 self.nodeID = nodeID
00324 self.log = log
00325 def Load( self, tbuf ) :
00326 root = gbl.DataObject()
00327 setOwnership( root, False )
00328 self.evt.setRoot( '/Event', root )
00329 t = time.time()
00330 self.T.loadBuffer( tbuf )
00331 self.tLoad += (time.time() - t)
00332 self.nIn += 1
00333 self.buffersIn.append( tbuf.Length() )
00334 def Dump( self ) :
00335 t = time.time()
00336 tb = TBufferFile( TBuffer.kWrite )
00337 self.T.dumpBuffer(tb)
00338 self.tDump += ( time.time()-t )
00339 self.nOut += 1
00340 self.buffersOut.append( tb.Length() )
00341 return tb
00342 def Report( self ) :
00343 evIn = "Events Loaded : %i"%( self.nIn )
00344 evOut = "Events Dumped : %i"%( self.nOut )
00345 din = sum( self.buffersIn )
00346 dataIn = "Data Loaded : %i"%(din)
00347 dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00348 if self.nIn :
00349 avgIn = "Avg Buf Loaded : %5.2f Mb"\
00350 %( din/(self.nIn*MB) )
00351 maxIn = "Max Buf Loaded : %5.2f Mb"\
00352 %( max(self.buffersIn)/MB )
00353 else :
00354 avgIn = "Avg Buf Loaded : N/A"
00355 maxIn = "Max Buf Loaded : N/A"
00356 dout = sum( self.buffersOut )
00357 dataOut = "Data Dumped : %i"%(dout)
00358 dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00359 if self.nOut :
00360 avgOut = "Avg Buf Dumped : %5.2f Mb"\
00361 %( din/(self.nOut*MB) )
00362 maxOut = "Max Buf Dumped : %5.2f Mb"\
00363 %( max(self.buffersOut)/MB )
00364 else :
00365 avgOut = "Avg Buf Dumped : N/A"
00366 maxOut = "Max Buf Dumped : N/A"
00367 dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
00368 loadTime = "Total Load Time : %5.2f"%( self.tLoad )
00369
00370 lines = evIn ,\
00371 evOut ,\
00372 dataIn ,\
00373 dataInMb ,\
00374 avgIn ,\
00375 maxIn ,\
00376 dataOut ,\
00377 dataOutMb,\
00378 avgOut ,\
00379 maxOut ,\
00380 dumpTime ,\
00381 loadTime
00382 self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00383 for line in lines :
00384 self.log.info( line )
00385 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00386
00387
00388
00389 class GMPComponent( object ) :
00390
00391
00392
00393
00394
00395
00396 def __init__( self, nodeType, nodeID, queues, events, params ) :
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406 self.nodeType = nodeType
00407 current_process().name = nodeType
00408
00409
00410 self.initEvent, eventLoopSyncer, self.finalEvent = events
00411 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer
00412
00413
00414 self.nWorkers, self.sEvent, self.config, self.log = params
00415 self.nodeID = nodeID
00416
00417
00418 self.currentEvent = None
00419
00420
00421 qPair, histq, fq = queues
00422
00423
00424 if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00425
00426 qin, qout = qPair
00427 self.evcom = EventCommunicator( self, qin, qout )
00428 else :
00429
00430 assert self.nodeType == 'Writer'
00431 self.evcoms = []
00432 qsin = qPair[0]
00433 for q in qsin :
00434 ec = EventCommunicator( self, q, None )
00435 self.evcoms.append( ec )
00436
00437 self.hq = histq
00438
00439 self.fq = fq
00440
00441
00442
00443 self.nIn = 0
00444 self.nOut = 0
00445
00446
00447 self.stat = SUCCESS
00448
00449
00450 self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00451
00452
00453
00454 self.iTime = 0.0
00455 self.rTime = 0.0
00456 self.fTime = 0.0
00457 self.firstEvTime = 0.0
00458 self.tTime = 0.0
00459
00460
00461 self.proc = Process( target=self.Engine )
00462
00463 def Start( self ) :
00464
00465 self.proc.start()
00466
00467 def Engine( self ) :
00468
00469
00470 pass
00471
00472 def processConfiguration( self ) :
00473
00474 pass
00475
00476 def SetupGaudiPython( self ) :
00477
00478
00479 self.a = AppMgr()
00480 if SMAPS :
00481 from AlgSmapShot import SmapShot
00482 smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00483 ss = SmapShot( logname=smapsLog )
00484 self.a.addAlgorithm( ss )
00485 self.evt = self.a.evtsvc()
00486 self.hvt = self.a.histsvc()
00487 self.fsr = self.a.filerecordsvc()
00488 self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00489 self.ts = gbl.GaudiPython.TESSerializer( self.evt._idp, self.pers )
00490 self.TS = TESSerializer( self.ts, self.evt,
00491 self.nodeType, self.nodeID, self.log )
00492 return SUCCESS
00493
00494 def StartGaudiPython( self ) :
00495 self.a.initialize()
00496 self.a.start()
00497 return SUCCESS
00498
00499 def LoadTES( self, tbufferfile ) :
00500 root = gbl.DataObject()
00501 setOwnership(root, False)
00502 self.evt.setRoot( '/Event', root )
00503 self.ts.loadBuffer(tbufferfile)
00504
00505 def getEventNumber( self ) :
00506
00507
00508
00509
00510
00511 lst = [ '/Event/Gen/Header',
00512 '/Event/Rec/Header' ]
00513 for l in lst :
00514 path = l
00515 try :
00516 n = self.evt[path].evtNumber()
00517 return n
00518 except :
00519
00520 continue
00521
00522
00523
00524 try :
00525 n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00526 return n
00527 except :
00528 pass
00529
00530
00531 if self.nIn > 0 or self.nOut > 0 :
00532 pass
00533 else :
00534 self.log.warning('Could not determine Event Number')
00535 return -1
00536
00537 def IdentifyWriters( self ) :
00538
00539
00540
00541 d = {}
00542 keys = [ "events", "records", "tuples", "histos" ]
00543 for k in keys :
00544 d[k] = []
00545
00546
00547 wkeys = WRITERTYPES.keys()
00548 for v in self.config.values() :
00549 if v.__class__.__name__ in wkeys :
00550 writerType = WRITERTYPES[ v.__class__.__name__ ]
00551 d[writerType].append( MiniWriter(v, writerType, self.config) )
00552 if self.nodeID == 0 :
00553 self.log.info('Writer Found : %s'%(v.name()))
00554
00555
00556 if 'HistogramPersistencySvc' in self.config.keys() :
00557 hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00558 d[ "histos" ].append( hfile )
00559 return d
00560
00561 def dumpHistograms( self ) :
00562 '''
00563 Method used by the GaudiPython algorithm CollectHistos
00564 to obtain a dictionary of form { path : object }
00565 representing the Histogram Store
00566 '''
00567 nlist = self.hvt.getHistoNames( )
00568 histDict = {}
00569 objects = 0 ; histos = 0
00570 if nlist :
00571 for n in nlist :
00572 o = self.hvt[ n ]
00573 if type(o) in aidatypes :
00574 o = aida2root(o)
00575 histos += 1
00576 else :
00577 objects += 1
00578 histDict[ n ] = o
00579 else :
00580 print 'WARNING : no histograms to recover?'
00581 return histDict
00582
00583 def Initialize( self ) :
00584 start = time.time()
00585 self.processConfiguration( )
00586 self.SetupGaudiPython( )
00587
00588 self.initEvent.set()
00589 self.StartGaudiPython( )
00590 self.iTime = time.time() - start
00591
00592 def Finalize( self ) :
00593 start = time.time()
00594 self.a.stop()
00595 self.a.finalize()
00596 self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00597 self.finalEvent.set()
00598 self.fTime = time.time() - start
00599
00600 def Report( self ) :
00601 self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00602 allTime = "Alive Time : %5.2f"%(self.tTime)
00603 initTime = "Init Time : %5.2f"%(self.iTime)
00604 frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00605 runTime = "Run Time : %5.2f"%(self.rTime)
00606 finTime = "Finalise Time : %5.2f"%(self.fTime)
00607 tup = ( allTime, initTime, frstTime, runTime, finTime )
00608 for t in tup :
00609 self.log.info( t )
00610 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00611
00612 self.TS.Report()
00613
00614
00615
00616 class Reader( GMPComponent ) :
00617 def __init__( self, queues, events, params ) :
00618 GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00619
00620 def processConfiguration( self ) :
00621
00622
00623
00624
00625 self.config[ 'ApplicationMgr' ].TopAlg = []
00626 self.config[ 'ApplicationMgr' ].OutStream = []
00627 if "HistogramPersistencySvc" in self.config.keys() :
00628 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00629 self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
00630 self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
00631
00632 def DumpEvent( self ) :
00633 tb = TBufferFile( TBuffer.kWrite )
00634
00635 self.ts.dumpBuffer( tb )
00636
00637 return tb
00638
00639 def DoFirstEvent( self ) :
00640
00641
00642 startFirst = time.time()
00643 self.log.info('Reader : First Event')
00644 if self.nOut == self.evtMax :
00645 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00646 self.lastEvent.set()
00647 return SUCCESS
00648 else :
00649
00650 self.a.run(1)
00651 if not bool(self.evt['/Event']) :
00652 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00653 self.lastEvent.set()
00654 return SUCCESS
00655 else :
00656
00657 try :
00658 lst = self.evt.getList()
00659 except :
00660 self.log.critical('Reader could not acquire TES List!')
00661 self.lastEvent.set()
00662 return FAILURE
00663 self.log.info('Reader : TES List : %i items'%(len(lst)))
00664 for l in lst :
00665 self.ts.addItem(l)
00666 self.currentEvent = self.getEventNumber( )
00667 tb = self.TS.Dump( )
00668 self.log.info('First Event Sent')
00669 self.evcom.send( (self.currentEvent, tb) )
00670 self.nOut += 1
00671 self.eventLoopSyncer.set()
00672 self.evt.clearStore( )
00673 self.firstEvTime = time.time()-startFirst
00674 return SUCCESS
00675
00676 def Engine( self ) :
00677
00678 startEngine = time.time()
00679 self.log.name = 'Reader'
00680 self.log.info('Reader Process starting')
00681
00682 self.Initialize()
00683
00684
00685 self.a.addAlgorithm( CollectHistograms(self) )
00686
00687 self.log.info('Reader Beginning Distribution')
00688 sc = self.DoFirstEvent( )
00689 if sc.isSuccess() :
00690 self.log.info('Reader First Event OK')
00691 else :
00692 self.log.critical('Reader Failed on First Event')
00693 self.stat = FAILURE
00694
00695
00696 while True :
00697
00698 if self.nOut == self.evtMax :
00699 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00700 break
00701
00702 if not self.stat.isSuccess() :
00703 self.log.critical( 'Reader is Damaged!' )
00704 break
00705
00706 t = time.time()
00707 self.a.run(1)
00708 self.rTime += (time.time()-t)
00709 if not bool(self.evt['/Event']) :
00710 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00711 break
00712 self.currentEvent = self.getEventNumber( )
00713 tb = self.TS.Dump( )
00714 self.evcom.send( (self.currentEvent, tb) )
00715
00716 self.nOut += 1
00717 self.eventLoopSyncer.set()
00718 self.evt.clearStore( )
00719 self.log.info('Setting <Last> Event')
00720 self.lastEvent.set()
00721
00722
00723 self.log.info( 'Reader : Event Distribution complete.' )
00724 self.evcom.finalize()
00725 self.Finalize()
00726 self.tTime = time.time() - startEngine
00727 self.Report()
00728
00729
00730
00731 class Worker( GMPComponent ) :
00732 def __init__( self, workerID, queues, events, params ) :
00733 GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00734
00735 self.writerDict = self.IdentifyWriters( )
00736
00737 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00738 self.log.debug("Worker-%i Created OK"%(self.nodeID))
00739 self.eventOutput = True
00740
00741 def processConfiguration( self ) :
00742
00743
00744
00745
00746 self.config[ 'EventSelector' ].Input = []
00747 self.config[ 'ApplicationMgr' ].OutStream = []
00748 if "HistogramPersistencySvc" in self.config.keys() :
00749 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00750 formatHead = '[Worker-%i] '%(self.nodeID)
00751 self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00752
00753 for key, lst in self.writerDict.iteritems() :
00754 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00755
00756 for m in self.writerDict[ "tuples" ] :
00757
00758
00759 newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00760 self.config[ m.key ].Output = newName
00761
00762
00763 if self.nodeID == 0 :
00764 pass
00765 else :
00766 self.config[ 'MessageSvc' ].OutputLevel = ERROR
00767
00768 def Engine( self ) :
00769 startEngine = time.time()
00770 self.log.name = "Worker-%i"%(self.nodeID)
00771 self.log.info("Worker %i starting Engine"%(self.nodeID))
00772 self.Initialize()
00773 self.filerecordsAgent = FileRecordsAgent(self)
00774
00775
00776 self.log.info('EVT WRITERS ON WORKER : %i'\
00777 %( len(self.writerDict['events'])))
00778
00779 nEventWriters = len( self.writerDict[ "events" ] )
00780 if nEventWriters :
00781 for m in self.writerDict[ "events" ] :
00782 for item in m.ItemList :
00783 self.log.debug(' adding ItemList Item to ts : %s'%(item))
00784 self.ts.addItem( item )
00785 for item in m.OptItemList :
00786 self.log.debug(' adding Optional Item to ts : %s'%(item))
00787 self.ts.addOptItem( item )
00788 else :
00789 self.log.info( 'There is no Event Output for this app' )
00790 self.eventOutput = False
00791
00792
00793 self.a.addAlgorithm( CollectHistograms(self) )
00794
00795
00796 self.log.name = "Worker-%i"%(self.nodeID)
00797 Go = True
00798 while Go :
00799 packet = self.evcom.receive( )
00800 if packet : pass
00801 else : continue
00802 if packet == 'FINISHED' : break
00803 evtNumber, tbin = packet
00804 self.nIn += 1
00805 self.TS.Load( tbin )
00806
00807 t = time.time()
00808 sc = self.a.executeEvent()
00809 if self.nIn == 1 :
00810 self.firstEvTime = time.time()-t
00811 else :
00812 self.rTime += (time.time()-t)
00813 if sc.isSuccess() :
00814 pass
00815 else :
00816 self.log.warning('Did not Execute Event')
00817 self.evt.clearStore()
00818 continue
00819 if self.isEventPassed() :
00820 pass
00821 else :
00822 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00823 self.evt.clearStore()
00824 continue
00825 if self.eventOutput :
00826
00827
00828 self.currentEvent = self.getEventNumber( )
00829 tb = self.TS.Dump( )
00830 self.evcom.send( (self.currentEvent, tb) )
00831 self.nOut += 1
00832 self.eventLoopSyncer.set()
00833 self.evt.clearStore( )
00834 self.log.info('Setting <Last> Event')
00835 self.lastEvent.set()
00836
00837 self.evcom.finalize()
00838 self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00839
00840 self.filerecordsAgent.SendFileRecords()
00841 self.Finalize()
00842 self.tTime = time.time()-startEngine
00843 self.Report()
00844
00845 def getCheckAlgs( self ) :
00846 '''
00847 For some output writers, a check is performed to see if the event has
00848 executed certain algorithms.
00849 These reside in the AcceptAlgs property for those writers
00850 '''
00851 acc = []
00852 req = []
00853 vet = []
00854 for m in self.writerDict[ "events" ] :
00855 if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
00856 if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00857 if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
00858 return (acc, req, vet)
00859
00860
00861 def checkExecutedPassed( self, algName ) :
00862 if self.a.algorithm( algName )._ialg.isExecuted()\
00863 and self.a.algorithm( algName )._ialg.filterPassed() :
00864 return True
00865 else :
00866 return False
00867
00868 def isEventPassed( self ) :
00869 '''
00870 Check the algorithm status for an event.
00871 Depending on output writer settings, the event
00872 may be declined based on various criteria.
00873 This is a transcript of the check that occurs in GaudiSvc::OutputStream
00874 '''
00875 passed = False
00876
00877 self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00878 if self.acceptAlgs :
00879 for name in self.acceptAlgs :
00880 if self.checkExecutedPassed( name ) :
00881 passed = True
00882 break
00883 else :
00884 passed = True
00885
00886 self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00887 for name in self.requireAlgs :
00888 if self.checkExecutedPassed( name ) :
00889 pass
00890 else :
00891 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00892 passed = False
00893
00894 self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00895 for name in self.vetoAlgs :
00896 if self.checkExecutedPassed( name ) :
00897 pass
00898 else :
00899 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00900 passed = False
00901 return passed
00902
00903
00904
00905 class Writer( GMPComponent ) :
00906 def __init__( self, queues, events, params ) :
00907 GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00908
00909 self.writerDict = self.IdentifyWriters( )
00910
00911 self.status = [False]*self.nWorkers
00912 self.log.name = "Writer--2"
00913
00914 def processConfiguration( self ) :
00915
00916
00917
00918 self.config[ 'ApplicationMgr' ].TopAlg = []
00919 self.config[ 'EventSelector' ].Input = []
00920
00921 self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00922
00923
00924 for key, lst in self.writerDict.iteritems() :
00925 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00926
00927
00928
00929
00930
00931 for m in self.writerDict[ "events" ] :
00932 self.log.debug( 'Processing Event Writer : %s'%(m) )
00933 newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00934 self.config[ m.key ].Output = newName
00935
00936
00937
00938
00939
00940
00941
00942
00943 for m in self.writerDict[ "records" ] :
00944 self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
00945 newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
00946 extra=" OPT='RECREATE'" )
00947 self.config[ m.key ].Output = newName
00948
00949
00950 hs = "HistogramPersistencySvc"
00951 n = None
00952 if hs in self.config.keys() :
00953 n = self.config[ hs ].OutputFile
00954 if n :
00955 newName=self.config[hs].OutputFile.replace('.',\
00956 '.p%i.'%(self.nWorkers))
00957 self.config[ hs ].OutputFile = newName
00958
00959 def Engine( self ) :
00960 startEngine = time.time()
00961 self.Initialize()
00962 self.histoAgent = HistoAgent( self )
00963 self.filerecordsAgent = FileRecordsAgent( self )
00964
00965
00966 Go = True
00967 current = -1
00968 stopCriteria = self.nWorkers
00969 while Go :
00970 current = (current+1)%self.nWorkers
00971 packet = self.evcoms[current].receive( timeout=0.01 )
00972 if packet == None :
00973 continue
00974 if packet == 'FINISHED' :
00975 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
00976 self.status[current] = True
00977 if all(self.status) :
00978 self.log.info('FINISHED recd from all workers, break loop')
00979 break
00980 continue
00981
00982 self.nIn += 1
00983 evtNumber, tbin = packet
00984 self.TS.Load( tbin )
00985 t = time.time()
00986 self.a.executeEvent()
00987 self.rTime += ( time.time()-t )
00988 self.currentEvent = self.getEventNumber( )
00989 self.evt.clearStore( )
00990 self.eventLoopSyncer.set()
00991 self.log.name = "Writer--2"
00992 self.log.info('Setting <Last> Event')
00993 self.lastEvent.set()
00994
00995
00996 [ e.finalize() for e in self.evcoms ]
00997
00998 sc = self.histoAgent.Receive()
00999 sc = self.histoAgent.RebuildHistoStore()
01000 if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01001 else : self.log.warning( 'Histo Store Error in Rebuild' )
01002
01003
01004 sc = self.filerecordsAgent.Receive()
01005 self.filerecordsAgent.Rebuild()
01006 self.Finalize()
01007 self.rTime = time.time()-startEngine
01008 self.Report()
01009
01010
01011
01012
01013
01014
01015
01016 class Coord( object ) :
01017 def __init__( self, nWorkers, config, log ) :
01018
01019 self.log = log
01020 self.config = config
01021
01022 self.log.name = 'GaudiPython-Parallel-Logger'
01023 self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01024
01025 if nWorkers == -1 :
01026
01027 self.nWorkers = cpu_count()
01028 else :
01029 self.nWorkers = nWorkers
01030
01031
01032 self.qs = self.SetupQueues( )
01033 self.hq = JoinableQueue( )
01034 self.fq = JoinableQueue( )
01035
01036
01037 self.sInit = Syncer( self.nWorkers, self.log,
01038 limit=WAIT_INITIALISE,
01039 step=STEP_INITIALISE )
01040 self.sRun = Syncer( self.nWorkers, self.log,
01041 manyEvents=True,
01042 limit=WAIT_SINGLE_EVENT,
01043 step=STEP_EVENT,
01044 firstEvent=WAIT_FIRST_EVENT )
01045 self.sFin = Syncer( self.nWorkers, self.log,
01046 limit=WAIT_FINALISE,
01047 step=STEP_FINALISE )
01048
01049 self.histSyncEvent = Event()
01050
01051
01052 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01053
01054
01055 self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01056 self.workers = []
01057 for i in xrange( self.nWorkers ) :
01058 wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01059 self.workers.append( wk )
01060 self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01061
01062 self.system = []
01063 self.system.append(self.writer)
01064 [ self.system.append(w) for w in self.workers ]
01065 self.system.append(self.reader)
01066
01067 def getSyncEvents( self, nodeID ) :
01068 init = self.sInit.d[nodeID].event
01069 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01070 fin = self.sFin.d[nodeID].event
01071 return ( init, run, fin )
01072
01073 def getQueues( self, nodeID ) :
01074 eventQ = self.qs[ nodeID ]
01075 histQ = self.hq
01076 fsrQ = self.fq
01077 return ( eventQ, histQ, fsrQ )
01078
01079 def Go( self ) :
01080
01081
01082 self.log.name = 'GaudiPython-Parallel-Logger'
01083 self.log.info( 'INITIALISING SYSTEM' )
01084 for p in self.system :
01085 p.Start()
01086 sc = self.sInit.syncAll(step="Initialise")
01087 if sc : pass
01088 else : self.Terminate() ; return FAILURE
01089
01090
01091 self.log.name = 'GaudiPython-Parallel-Logger'
01092 self.log.info( 'RUNNING SYSTEM' )
01093 sc = self.sRun.syncAll(step="Run")
01094 if sc : pass
01095 else : self.Terminate() ; return FAILURE
01096
01097
01098 self.log.name = 'GaudiPython-Parallel-Logger'
01099 self.log.info( 'FINALISING SYSTEM' )
01100 sc = self.sFin.syncAll(step="Finalise")
01101 if sc : pass
01102 else : self.Terminate() ; return FAILURE
01103
01104
01105 self.log.info( "Cleanly join all Processes" )
01106 self.Stop()
01107 self.log.info( "Report Total Success to Main.py" )
01108 return SUCCESS
01109
01110 def Terminate( self ) :
01111
01112 self.writer.proc.terminate()
01113 [ w.proc.terminate() for w in self.workers]
01114 self.reader.proc.terminate()
01115
01116 def Stop( self ) :
01117
01118 self.system.reverse()
01119 for s in self.system :
01120 s.proc.join()
01121 return SUCCESS
01122
01123 def SetupQueues( self ) :
01124
01125
01126
01127
01128
01129
01130
01131 rwk = JoinableQueue()
01132
01133 workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01134 d = {}
01135 d[-1] = (None, rwk)
01136 d[-2] = (workersWriter, None)
01137 for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
01138 return d
01139
01140