00001 from Gaudi.Configuration import *
00002 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE
00003 from ROOT import TBufferFile, TBuffer
00004 from multiprocessing import Process, Queue, JoinableQueue, Event
00005 from multiprocessing import cpu_count, current_process
00006 from multiprocessing.queues import Empty
00007 from pTools import *
00008 import time, sys, os
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 NAP = 0.001
00036 MB = 1024.0*1024.0
00037
00038 WAIT_INITIALISE = 60*5
00039 WAIT_FIRST_EVENT = 60*3
00040 WAIT_SINGLE_EVENT = 60*3
00041 WAIT_FINALISE = 60*2
00042 STEP_INITIALISE = 10
00043 STEP_EVENT = 2
00044 STEP_FINALISE = 5
00045
00046
00047 SMAPS = False
00048
00049
00050
00051
00052
00053
00054 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00055
00056
00057
00058 aidatypes = ( gbl.AIDA.IHistogram,
00059 gbl.AIDA.IHistogram1D,
00060 gbl.AIDA.IHistogram2D,
00061 gbl.AIDA.IHistogram3D,
00062 gbl.AIDA.IProfile1D,
00063 gbl.AIDA.IProfile2D,
00064 gbl.AIDA.IBaseHistogram )
00065
00066
00067 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
00068
00069
00070 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
00071 'InputCopyStream' : "events",
00072 'OutputStream' : "events",
00073 'RecordStream' : "records",
00074 'RunRecordStream' : "records",
00075 'SequentialOutputStream' : "events",
00076 'TagCollectionStream' : "tuples" }
00077
00078
00079
00080 class MiniWriter( object ) :
00081 '''
00082 A class to represent a writer in the GaudiPython configuration
00083 It can be non-trivial to access the name of the output file; it may be
00084 specified in the DataSvc, or just on the writer, may be a list, or string
00085 Also, there are three different types of writer (events, records, tuples)
00086 so this bootstrap class provides easy access to this info while configuring
00087 '''
00088 def __init__( self, writer, wType, config ) :
00089 self.w = writer
00090 self.wType = wType
00091
00092
00093
00094 self.key = None
00095 self.output = None
00096 self.ItemList = None
00097 self.OptItemList = None
00098
00099 self.wName = writer.getName()
00100
00101 self.woutput = None
00102 self.datasvcName = None
00103 self.svcOutput = None
00104 if hasattr( self.w, "Output" ) :
00105 self.woutput = self.w.Output
00106 self.getItemLists( config )
00107 self.set( self.wName, self.w.Output )
00108 return
00109 else :
00110
00111
00112 self.datasvcName = self.w.EvtDataSvc
00113 datasvc = config[ self.datasvcName ]
00114 if hasattr( datasvc, "Output" ) :
00115 self.getItemLists( config )
00116 self.set( self.datasvcName, datasvc.Output )
00117 return
00118
00119 def getNewName( self, replaceThis, withThis, extra='' ) :
00120
00121
00122
00123
00124
00125
00126 assert replaceThis.__class__.__name__ == 'str'
00127 assert withThis.__class__.__name__ == 'str'
00128 old = self.output
00129 lst = False
00130 if old.__class__.__name__ == 'list' :
00131 old = self.output[0]
00132 lst = True
00133 new = old.replace( replaceThis, withThis )
00134 new += extra
00135 if lst :
00136 return [ new ]
00137 else :
00138 return new
00139
00140 def getItemLists( self, config ) :
00141
00142 if hasattr( self.w, "ItemList" ) :
00143 self.ItemList = self.w.ItemList
00144 else :
00145 datasvc = config[ self.w.EvtDataSvc ]
00146 if hasattr( datasvc, "ItemList" ) :
00147 self.ItemList = datasvc.ItemList
00148
00149 if hasattr( self.w, "OptItemList" ) :
00150 self.OptItemList = self.w.OptItemList
00151 else :
00152 datasvc = config[ self.w.EvtDataSvc ]
00153 if hasattr( datasvc, "OptItemList" ) :
00154 self.OptItemList = datasvc.OptItemList
00155 return
00156
00157 def set( self, key, output ) :
00158 self.key = key
00159 self.output = output
00160 return
00161
00162 def __repr__( self ) :
00163 s = ""
00164 line = '-'*80
00165 s += (line+'\n')
00166 s += "Writer : %s\n"%( self.wName )
00167 s += "Writer Type : %s\n"%( self.wType )
00168 s += "Writer Output : %s\n"%( self.output )
00169 s += "DataSvc : %s\n"%( self.datasvcName )
00170 s += "DataSvc Output : %s\n"%( self.svcOutput )
00171 s += '\n'
00172 s += "Key for config : %s\n"%( self.key )
00173 s += "Output File : %s\n"%( self.output )
00174 s += "ItemList : %s\n"%( self.ItemList )
00175 s += "OptItemList : %s\n"%( self.OptItemList )
00176 s += (line+'\n')
00177 return s
00178
00179
00180
00181 class CollectHistograms( PyAlgorithm ) :
00182 '''
00183 GaudiPython algorithm used to clean up histos on the Reader and Workers
00184 Only has a finalize method()
00185 This retrieves a dictionary of path:histo objects and sends it to the
00186 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
00187 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
00188 at the writer end, there will be an error.
00189 '''
00190 def __init__( self, gmpcomponent ) :
00191 PyAlgorithm.__init__( self )
00192 self._gmpc = gmpcomponent
00193 self.log = self._gmpc.log
00194 return None
00195 def execute( self ) :
00196 return SUCCESS
00197 def finalize( self ) :
00198 self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
00199 self._gmpc.hDict = self._gmpc.dumpHistograms( )
00200 ks = self._gmpc.hDict.keys()
00201 self.log.info('%i Objects in Histogram Store'%(len(ks)))
00202
00203
00204
00205 chunk = 100
00206 reps = len(ks)/chunk + 1
00207 for i in xrange(reps) :
00208 someKeys = ks[i*chunk : (i+1)*chunk]
00209 smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
00210 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
00211
00212 self.log.debug('Signalling end of histos to Writer')
00213 self._gmpc.hq.put( 'HISTOS_SENT' )
00214 self.log.debug( 'Waiting on Sync Event' )
00215 self._gmpc.sEvent.wait()
00216 self.log.debug( 'Histo Sync Event set, clearing and returning' )
00217 self._gmpc.hvt.clearStore()
00218 root = gbl.DataObject()
00219 setOwnership(root, False)
00220 self._gmpc.hvt.setRoot( '/stat', root )
00221 return SUCCESS
00222
00223
00224
00225 class EventCommunicator( object ) :
00226
00227
00228
00229 def __init__( self, GMPComponent, qin, qout ) :
00230 self._gmpc = GMPComponent
00231 self.log = self._gmpc.log
00232
00233 self.maxsize = 50
00234
00235 self.qin = qin
00236 self.qout = qout
00237
00238
00239 self.allsent = False
00240 self.allrecv = False
00241
00242
00243 self.nSent = 0
00244 self.nRecv = 0
00245 self.sizeSent = 0
00246 self.sizeRecv = 0
00247 self.qinTime = 0
00248 self.qoutTime = 0
00249
00250 def send( self, item ) :
00251
00252
00253 assert item.__class__.__name__ == 'tuple'
00254 startTransmission = time.time()
00255 self.qout.put( item )
00256
00257
00258 while self.qout._buffer : time.sleep( NAP )
00259 self.qoutTime += time.time()-startTransmission
00260 self.sizeSent += item[1].Length()
00261 self.nSent += 1
00262 return SUCCESS
00263
00264 def receive( self, timeout=None ) :
00265
00266 startWait = time.time()
00267 try :
00268 itemIn = self.qin.get( timeout=timeout )
00269 except Empty :
00270 return None
00271 self.qinTime += time.time()-startWait
00272 self.nRecv += 1
00273 if itemIn.__class__.__name__ == 'tuple' :
00274 self.sizeRecv += itemIn[1].Length()
00275 else :
00276 self.nRecv -= 1
00277 try :
00278 self.qin.task_done()
00279 except :
00280 self._gmpc.log.warning('TASK_DONE called too often by : %s'\
00281 %(self._gmpc.nodeType))
00282 return itemIn
00283
00284 def finalize( self ) :
00285 self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
00286
00287
00288
00289 if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
00290 elif self._gmpc.nodeType == 'Writer' : downstream = 0
00291 elif self._gmpc.nodeType == 'Worker' : downstream = 1
00292 for i in xrange(downstream) :
00293 self.qout.put( 'FINISHED' )
00294 if self._gmpc.nodeType != 'Writer' :
00295 self.qout.join()
00296
00297 self.statistics( )
00298
00299 def statistics( self ) :
00300 self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
00301 self.log.info ( 'Items Sent : %i'%(self.nSent) )
00302 self.log.info ( 'Items Received : %i'%(self.nRecv) )
00303 self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
00304 self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
00305 self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
00306 self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
00307
00308
00309
00310 class TESSerializer( object ) :
00311 def __init__( self, gaudiTESSerializer, evtDataSvc,
00312 nodeType, nodeID, log ) :
00313 self.T = gaudiTESSerializer
00314 self.evt = evtDataSvc
00315 self.buffersIn = []
00316 self.buffersOut = []
00317 self.nIn = 0
00318 self.nOut = 0
00319 self.tDump = 0.0
00320 self.tLoad = 0.0
00321
00322 self.nodeType = nodeType
00323 self.nodeID = nodeID
00324 self.log = log
00325 def Load( self, tbuf ) :
00326 root = gbl.DataObject()
00327 setOwnership( root, False )
00328 self.evt.setRoot( '/Event', root )
00329 t = time.time()
00330 self.T.loadBuffer( tbuf )
00331 self.tLoad += (time.time() - t)
00332 self.nIn += 1
00333 self.buffersIn.append( tbuf.Length() )
00334 def Dump( self ) :
00335 t = time.time()
00336 tb = TBufferFile( TBuffer.kWrite )
00337 self.T.dumpBuffer(tb)
00338 self.tDump += ( time.time()-t )
00339 self.nOut += 1
00340 self.buffersOut.append( tb.Length() )
00341 return tb
00342 def Report( self ) :
00343 evIn = "Events Loaded : %i"%( self.nIn )
00344 evOut = "Events Dumped : %i"%( self.nOut )
00345 din = sum( self.buffersIn )
00346 dataIn = "Data Loaded : %i"%(din)
00347 dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
00348 if self.nIn :
00349 avgIn = "Avg Buf Loaded : %5.2f Mb"\
00350 %( din/(self.nIn*MB) )
00351 maxIn = "Max Buf Loaded : %5.2f Mb"\
00352 %( max(self.buffersIn)/MB )
00353 else :
00354 avgIn = "Avg Buf Loaded : N/A"
00355 maxIn = "Max Buf Loaded : N/A"
00356 dout = sum( self.buffersOut )
00357 dataOut = "Data Dumped : %i"%(dout)
00358 dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
00359 if self.nOut :
00360 avgOut = "Avg Buf Dumped : %5.2f Mb"\
00361 %( din/(self.nOut*MB) )
00362 maxOut = "Max Buf Dumped : %5.2f Mb"\
00363 %( max(self.buffersOut)/MB )
00364 else :
00365 avgOut = "Avg Buf Dumped : N/A"
00366 maxOut = "Max Buf Dumped : N/A"
00367 dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
00368 loadTime = "Total Load Time : %5.2f"%( self.tLoad )
00369
00370 lines = evIn ,\
00371 evOut ,\
00372 dataIn ,\
00373 dataInMb ,\
00374 avgIn ,\
00375 maxIn ,\
00376 dataOut ,\
00377 dataOutMb,\
00378 avgOut ,\
00379 maxOut ,\
00380 dumpTime ,\
00381 loadTime
00382 self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
00383 for line in lines :
00384 self.log.info( line )
00385 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00386
00387
00388
00389 class GMPComponent( object ) :
00390
00391
00392
00393
00394
00395
00396 def __init__( self, nodeType, nodeID, queues, events, params ) :
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406 self.nodeType = nodeType
00407 current_process().name = nodeType
00408
00409
00410 self.initEvent, eventLoopSyncer, self.finalEvent = events
00411 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer
00412
00413
00414 self.nWorkers, self.sEvent, self.config, self.log = params
00415 self.nodeID = nodeID
00416
00417
00418 self.currentEvent = None
00419
00420
00421 qPair, histq, fq = queues
00422
00423
00424 if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
00425
00426 qin, qout = qPair
00427 self.evcom = EventCommunicator( self, qin, qout )
00428 else :
00429
00430 assert self.nodeType == 'Writer'
00431 self.evcoms = []
00432 qsin = qPair[0]
00433 for q in qsin :
00434 ec = EventCommunicator( self, q, None )
00435 self.evcoms.append( ec )
00436
00437 self.hq = histq
00438
00439 self.fq = fq
00440
00441
00442
00443 self.nIn = 0
00444 self.nOut = 0
00445
00446
00447 self.stat = SUCCESS
00448
00449
00450 self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
00451
00452
00453
00454 self.iTime = 0.0
00455 self.rTime = 0.0
00456 self.fTime = 0.0
00457 self.firstEvTime = 0.0
00458 self.tTime = 0.0
00459
00460
00461 self.proc = Process( target=self.Engine )
00462
00463 def Start( self ) :
00464
00465 self.proc.start()
00466
00467 def Engine( self ) :
00468
00469
00470 pass
00471
00472 def processConfiguration( self ) :
00473
00474 pass
00475
00476 def SetupGaudiPython( self ) :
00477
00478
00479 self.a = AppMgr()
00480 if SMAPS :
00481 from AlgSmapShot import SmapShot
00482 smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
00483 ss = SmapShot( logname=smapsLog )
00484 self.a.addAlgorithm( ss )
00485 self.evt = self.a.evtsvc()
00486 self.hvt = self.a.histsvc()
00487 self.fsr = self.a.filerecordsvc()
00488 self.inc = self.a.service('IncidentSvc','IIncidentSvc')
00489 self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
00490 self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
00491 self.TS = TESSerializer( self.ts, self.evt,
00492 self.nodeType, self.nodeID, self.log )
00493 return SUCCESS
00494
00495 def StartGaudiPython( self ) :
00496 self.a.initialize()
00497 self.a.start()
00498 return SUCCESS
00499
00500 def LoadTES( self, tbufferfile ) :
00501 root = gbl.DataObject()
00502 setOwnership(root, False)
00503 self.evt.setRoot( '/Event', root )
00504 self.ts.loadBuffer(tbufferfile)
00505
00506 def getEventNumber( self ) :
00507
00508
00509
00510
00511
00512 lst = [ '/Event/Gen/Header',
00513 '/Event/Rec/Header' ]
00514 for l in lst :
00515 path = l
00516 try :
00517 n = self.evt[path].evtNumber()
00518 return n
00519 except :
00520
00521 continue
00522
00523
00524
00525 try :
00526 n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00527 return n
00528 except :
00529 pass
00530
00531
00532 if self.nIn > 0 or self.nOut > 0 :
00533 pass
00534 else :
00535 self.log.warning('Could not determine Event Number')
00536 return -1
00537
00538 def IdentifyWriters( self ) :
00539
00540
00541
00542 d = {}
00543 keys = [ "events", "records", "tuples", "histos" ]
00544 for k in keys :
00545 d[k] = []
00546
00547
00548 wkeys = WRITERTYPES.keys()
00549 for v in self.config.values() :
00550 if v.__class__.__name__ in wkeys :
00551 writerType = WRITERTYPES[ v.__class__.__name__ ]
00552 d[writerType].append( MiniWriter(v, writerType, self.config) )
00553 if self.nodeID == 0 :
00554 self.log.info('Writer Found : %s'%(v.name()))
00555
00556
00557 if 'HistogramPersistencySvc' in self.config.keys() :
00558 hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
00559 d[ "histos" ].append( hfile )
00560 return d
00561
00562 def dumpHistograms( self ) :
00563 '''
00564 Method used by the GaudiPython algorithm CollectHistos
00565 to obtain a dictionary of form { path : object }
00566 representing the Histogram Store
00567 '''
00568 nlist = self.hvt.getHistoNames( )
00569 histDict = {}
00570 objects = 0 ; histos = 0
00571 if nlist :
00572 for n in nlist :
00573 o = self.hvt[ n ]
00574 if type(o) in aidatypes :
00575 o = aida2root(o)
00576 histos += 1
00577 else :
00578 objects += 1
00579 histDict[ n ] = o
00580 else :
00581 print 'WARNING : no histograms to recover?'
00582 return histDict
00583
00584 def Initialize( self ) :
00585 start = time.time()
00586 self.processConfiguration( )
00587 self.SetupGaudiPython( )
00588
00589 self.initEvent.set()
00590 self.StartGaudiPython( )
00591 self.iTime = time.time() - start
00592
00593 def Finalize( self ) :
00594 start = time.time()
00595 self.a.stop()
00596 self.a.finalize()
00597 self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
00598 self.finalEvent.set()
00599 self.fTime = time.time() - start
00600
00601 def Report( self ) :
00602 self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
00603 allTime = "Alive Time : %5.2f"%(self.tTime)
00604 initTime = "Init Time : %5.2f"%(self.iTime)
00605 frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
00606 runTime = "Run Time : %5.2f"%(self.rTime)
00607 finTime = "Finalise Time : %5.2f"%(self.fTime)
00608 tup = ( allTime, initTime, frstTime, runTime, finTime )
00609 for t in tup :
00610 self.log.info( t )
00611 self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
00612
00613 self.TS.Report()
00614
00615
00616
00617 class Reader( GMPComponent ) :
00618 def __init__( self, queues, events, params ) :
00619 GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
00620
00621 def processConfiguration( self ) :
00622
00623
00624
00625
00626 self.config[ 'ApplicationMgr' ].TopAlg = []
00627 self.config[ 'ApplicationMgr' ].OutStream = []
00628 if "HistogramPersistencySvc" in self.config.keys() :
00629 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00630 self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
00631 self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
00632
00633 def DumpEvent( self ) :
00634 tb = TBufferFile( TBuffer.kWrite )
00635
00636 self.ts.dumpBuffer( tb )
00637
00638 return tb
00639
00640 def DoFirstEvent( self ) :
00641
00642
00643 startFirst = time.time()
00644 self.log.info('Reader : First Event')
00645 if self.nOut == self.evtMax :
00646 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00647 self.lastEvent.set()
00648 return SUCCESS
00649 else :
00650
00651 self.a.run(1)
00652 if not bool(self.evt['/Event']) :
00653 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00654 self.lastEvent.set()
00655 return SUCCESS
00656 else :
00657
00658 try :
00659 lst = self.evt.getList()
00660 except :
00661 self.log.critical('Reader could not acquire TES List!')
00662 self.lastEvent.set()
00663 return FAILURE
00664 self.log.info('Reader : TES List : %i items'%(len(lst)))
00665 for l in lst :
00666 self.ts.addItem(l)
00667 self.currentEvent = self.getEventNumber( )
00668 tb = self.TS.Dump( )
00669 self.log.info('First Event Sent')
00670 self.evcom.send( (self.currentEvent, tb) )
00671 self.nOut += 1
00672 self.eventLoopSyncer.set()
00673 self.evt.clearStore( )
00674 self.firstEvTime = time.time()-startFirst
00675 return SUCCESS
00676
00677 def Engine( self ) :
00678
00679 startEngine = time.time()
00680 self.log.name = 'Reader'
00681 self.log.info('Reader Process starting')
00682
00683 self.Initialize()
00684
00685
00686 self.a.addAlgorithm( CollectHistograms(self) )
00687
00688 self.log.info('Reader Beginning Distribution')
00689 sc = self.DoFirstEvent( )
00690 if sc.isSuccess() :
00691 self.log.info('Reader First Event OK')
00692 else :
00693 self.log.critical('Reader Failed on First Event')
00694 self.stat = FAILURE
00695
00696
00697 while True :
00698
00699 if self.nOut == self.evtMax :
00700 self.log.info('evtMax( %i ) reached'%(self.evtMax))
00701 break
00702
00703 if not self.stat.isSuccess() :
00704 self.log.critical( 'Reader is Damaged!' )
00705 break
00706
00707 t = time.time()
00708 self.a.run(1)
00709 self.rTime += (time.time()-t)
00710 if not bool(self.evt['/Event']) :
00711 self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
00712 break
00713 self.currentEvent = self.getEventNumber( )
00714 tb = self.TS.Dump( )
00715 self.evcom.send( (self.currentEvent, tb) )
00716
00717 self.nOut += 1
00718 self.eventLoopSyncer.set()
00719 self.evt.clearStore( )
00720 self.log.info('Setting <Last> Event')
00721 self.lastEvent.set()
00722
00723
00724 self.log.info( 'Reader : Event Distribution complete.' )
00725 self.evcom.finalize()
00726 self.Finalize()
00727 self.tTime = time.time() - startEngine
00728 self.Report()
00729
00730
00731
00732 class Worker( GMPComponent ) :
00733 def __init__( self, workerID, queues, events, params ) :
00734 GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
00735
00736 self.writerDict = self.IdentifyWriters( )
00737
00738 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
00739 self.log.debug("Worker-%i Created OK"%(self.nodeID))
00740 self.eventOutput = True
00741
00742 def processConfiguration( self ) :
00743
00744
00745
00746
00747 self.config[ 'EventSelector' ].Input = []
00748 self.config[ 'ApplicationMgr' ].OutStream = []
00749 if "HistogramPersistencySvc" in self.config.keys() :
00750 self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
00751 formatHead = '[Worker-%i] '%(self.nodeID)
00752 self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
00753
00754 for key, lst in self.writerDict.iteritems() :
00755 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00756
00757 for m in self.writerDict[ "tuples" ] :
00758
00759
00760 newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
00761 self.config[ m.key ].Output = newName
00762
00763
00764 if self.nodeID == 0 :
00765 pass
00766 else :
00767 self.config[ 'MessageSvc' ].OutputLevel = ERROR
00768
00769 def Engine( self ) :
00770 startEngine = time.time()
00771 self.log.name = "Worker-%i"%(self.nodeID)
00772 self.log.info("Worker %i starting Engine"%(self.nodeID))
00773 self.Initialize()
00774 self.filerecordsAgent = FileRecordsAgent(self)
00775
00776
00777 self.log.info('EVT WRITERS ON WORKER : %i'\
00778 %( len(self.writerDict['events'])))
00779
00780 nEventWriters = len( self.writerDict[ "events" ] )
00781 if nEventWriters :
00782 for m in self.writerDict[ "events" ] :
00783 for item in m.ItemList :
00784 self.log.debug(' adding ItemList Item to ts : %s'%(item))
00785 self.ts.addItem( item )
00786 for item in m.OptItemList :
00787 self.log.debug(' adding Optional Item to ts : %s'%(item))
00788 self.ts.addOptItem( item )
00789 else :
00790 self.log.info( 'There is no Event Output for this app' )
00791 self.eventOutput = False
00792
00793
00794 self.a.addAlgorithm( CollectHistograms(self) )
00795
00796
00797 self.log.name = "Worker-%i"%(self.nodeID)
00798 Go = True
00799 while Go :
00800 packet = self.evcom.receive( )
00801 if packet : pass
00802 else : continue
00803 if packet == 'FINISHED' : break
00804 evtNumber, tbin = packet
00805 self.nIn += 1
00806 self.TS.Load( tbin )
00807
00808 t = time.time()
00809 sc = self.a.executeEvent()
00810 if self.nIn == 1 :
00811 self.firstEvTime = time.time()-t
00812 else :
00813 self.rTime += (time.time()-t)
00814 if sc.isSuccess() :
00815 pass
00816 else :
00817 self.log.warning('Did not Execute Event')
00818 self.evt.clearStore()
00819 continue
00820 if self.isEventPassed() :
00821 pass
00822 else :
00823 self.log.warning( 'Event did not pass : %i'%(evtNumber) )
00824 self.evt.clearStore()
00825 continue
00826 if self.eventOutput :
00827
00828
00829 self.currentEvent = self.getEventNumber( )
00830 tb = self.TS.Dump( )
00831 self.evcom.send( (self.currentEvent, tb) )
00832 self.nOut += 1
00833 self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
00834 self.eventLoopSyncer.set()
00835 self.evt.clearStore( )
00836 self.log.info('Setting <Last> Event')
00837 self.lastEvent.set()
00838
00839 self.evcom.finalize()
00840 self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
00841
00842 self.filerecordsAgent.SendFileRecords()
00843 self.Finalize()
00844 self.tTime = time.time()-startEngine
00845 self.Report()
00846
00847 def getCheckAlgs( self ) :
00848 '''
00849 For some output writers, a check is performed to see if the event has
00850 executed certain algorithms.
00851 These reside in the AcceptAlgs property for those writers
00852 '''
00853 acc = []
00854 req = []
00855 vet = []
00856 for m in self.writerDict[ "events" ] :
00857 if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
00858 if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
00859 if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
00860 return (acc, req, vet)
00861
00862
00863 def checkExecutedPassed( self, algName ) :
00864 if self.a.algorithm( algName )._ialg.isExecuted()\
00865 and self.a.algorithm( algName )._ialg.filterPassed() :
00866 return True
00867 else :
00868 return False
00869
00870 def isEventPassed( self ) :
00871 '''
00872 Check the algorithm status for an event.
00873 Depending on output writer settings, the event
00874 may be declined based on various criteria.
00875 This is a transcript of the check that occurs in GaudiSvc::OutputStream
00876 '''
00877 passed = False
00878
00879 self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
00880 if self.acceptAlgs :
00881 for name in self.acceptAlgs :
00882 if self.checkExecutedPassed( name ) :
00883 passed = True
00884 break
00885 else :
00886 passed = True
00887
00888 self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
00889 for name in self.requireAlgs :
00890 if self.checkExecutedPassed( name ) :
00891 pass
00892 else :
00893 self.log.info('Evt declined (requireAlgs) : %s'%(name) )
00894 passed = False
00895
00896 self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
00897 for name in self.vetoAlgs :
00898 if self.checkExecutedPassed( name ) :
00899 pass
00900 else :
00901 self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
00902 passed = False
00903 return passed
00904
00905
00906
00907 class Writer( GMPComponent ) :
00908 def __init__( self, queues, events, params ) :
00909 GMPComponent.__init__(self,'Writer', -2, queues, events, params)
00910
00911 self.writerDict = self.IdentifyWriters( )
00912
00913 self.status = [False]*self.nWorkers
00914 self.log.name = "Writer--2"
00915
00916 def processConfiguration( self ) :
00917
00918
00919
00920 self.config[ 'ApplicationMgr' ].TopAlg = []
00921 self.config[ 'EventSelector' ].Input = []
00922
00923 self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
00924
00925
00926 for key, lst in self.writerDict.iteritems() :
00927 self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
00928
00929
00930
00931
00932
00933 for m in self.writerDict[ "events" ] :
00934 self.log.debug( 'Processing Event Writer : %s'%(m) )
00935 newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
00936 self.config[ m.key ].Output = newName
00937
00938
00939
00940
00941
00942
00943
00944
00945 for m in self.writerDict[ "records" ] :
00946 self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
00947 newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
00948 extra=" OPT='RECREATE'" )
00949 self.config[ m.key ].Output = newName
00950
00951
00952 hs = "HistogramPersistencySvc"
00953 n = None
00954 if hs in self.config.keys() :
00955 n = self.config[ hs ].OutputFile
00956 if n :
00957 newName=self.config[hs].OutputFile.replace('.',\
00958 '.p%i.'%(self.nWorkers))
00959 self.config[ hs ].OutputFile = newName
00960
00961 def Engine( self ) :
00962 startEngine = time.time()
00963 self.Initialize()
00964 self.histoAgent = HistoAgent( self )
00965 self.filerecordsAgent = FileRecordsAgent( self )
00966
00967
00968 Go = True
00969 current = -1
00970 stopCriteria = self.nWorkers
00971 while Go :
00972 current = (current+1)%self.nWorkers
00973 packet = self.evcoms[current].receive( timeout=0.01 )
00974 if packet == None :
00975 continue
00976 if packet == 'FINISHED' :
00977 self.log.info('Writer got FINISHED flag : Worker %i'%(current))
00978 self.status[current] = True
00979 if all(self.status) :
00980 self.log.info('FINISHED recd from all workers, break loop')
00981 break
00982 continue
00983
00984 self.nIn += 1
00985 evtNumber, tbin = packet
00986 self.TS.Load( tbin )
00987 t = time.time()
00988 self.a.executeEvent()
00989 self.rTime += ( time.time()-t )
00990 self.currentEvent = self.getEventNumber( )
00991 self.evt.clearStore( )
00992 self.eventLoopSyncer.set()
00993 self.log.name = "Writer--2"
00994 self.log.info('Setting <Last> Event')
00995 self.lastEvent.set()
00996
00997
00998 [ e.finalize() for e in self.evcoms ]
00999
01000 sc = self.histoAgent.Receive()
01001 sc = self.histoAgent.RebuildHistoStore()
01002 if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
01003 else : self.log.warning( 'Histo Store Error in Rebuild' )
01004
01005
01006 sc = self.filerecordsAgent.Receive()
01007 self.filerecordsAgent.Rebuild()
01008 self.Finalize()
01009 self.rTime = time.time()-startEngine
01010 self.Report()
01011
01012
01013
01014
01015
01016
01017
01018 class Coord( object ) :
01019 def __init__( self, nWorkers, config, log ) :
01020
01021 self.log = log
01022 self.config = config
01023
01024 self.log.name = 'GaudiPython-Parallel-Logger'
01025 self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
01026
01027 if nWorkers == -1 :
01028
01029 self.nWorkers = cpu_count()
01030 else :
01031 self.nWorkers = nWorkers
01032
01033
01034 self.qs = self.SetupQueues( )
01035 self.hq = JoinableQueue( )
01036 self.fq = JoinableQueue( )
01037
01038
01039 self.sInit = Syncer( self.nWorkers, self.log,
01040 limit=WAIT_INITIALISE,
01041 step=STEP_INITIALISE )
01042 self.sRun = Syncer( self.nWorkers, self.log,
01043 manyEvents=True,
01044 limit=WAIT_SINGLE_EVENT,
01045 step=STEP_EVENT,
01046 firstEvent=WAIT_FIRST_EVENT )
01047 self.sFin = Syncer( self.nWorkers, self.log,
01048 limit=WAIT_FINALISE,
01049 step=STEP_FINALISE )
01050
01051 self.histSyncEvent = Event()
01052
01053
01054 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
01055
01056
01057 self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
01058 self.workers = []
01059 for i in xrange( self.nWorkers ) :
01060 wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
01061 self.workers.append( wk )
01062 self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
01063
01064 self.system = []
01065 self.system.append(self.writer)
01066 [ self.system.append(w) for w in self.workers ]
01067 self.system.append(self.reader)
01068
01069 def getSyncEvents( self, nodeID ) :
01070 init = self.sInit.d[nodeID].event
01071 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
01072 fin = self.sFin.d[nodeID].event
01073 return ( init, run, fin )
01074
01075 def getQueues( self, nodeID ) :
01076 eventQ = self.qs[ nodeID ]
01077 histQ = self.hq
01078 fsrQ = self.fq
01079 return ( eventQ, histQ, fsrQ )
01080
01081 def Go( self ) :
01082
01083
01084 self.log.name = 'GaudiPython-Parallel-Logger'
01085 self.log.info( 'INITIALISING SYSTEM' )
01086 for p in self.system :
01087 p.Start()
01088 sc = self.sInit.syncAll(step="Initialise")
01089 if sc : pass
01090 else : self.Terminate() ; return FAILURE
01091
01092
01093 self.log.name = 'GaudiPython-Parallel-Logger'
01094 self.log.info( 'RUNNING SYSTEM' )
01095 sc = self.sRun.syncAll(step="Run")
01096 if sc : pass
01097 else : self.Terminate() ; return FAILURE
01098
01099
01100 self.log.name = 'GaudiPython-Parallel-Logger'
01101 self.log.info( 'FINALISING SYSTEM' )
01102 sc = self.sFin.syncAll(step="Finalise")
01103 if sc : pass
01104 else : self.Terminate() ; return FAILURE
01105
01106
01107 self.log.info( "Cleanly join all Processes" )
01108 self.Stop()
01109 self.log.info( "Report Total Success to Main.py" )
01110 return SUCCESS
01111
01112 def Terminate( self ) :
01113
01114 self.writer.proc.terminate()
01115 [ w.proc.terminate() for w in self.workers]
01116 self.reader.proc.terminate()
01117
01118 def Stop( self ) :
01119
01120 self.system.reverse()
01121 for s in self.system :
01122 s.proc.join()
01123 return SUCCESS
01124
01125 def SetupQueues( self ) :
01126
01127
01128
01129
01130
01131
01132
01133 rwk = JoinableQueue()
01134
01135 workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
01136 d = {}
01137 d[-1] = (None, rwk)
01138 d[-2] = (workersWriter, None)
01139 for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
01140 return d
01141
01142