00001 from GaudiPython import gbl, SUCCESS, FAILURE
00002 from multiprocessing import Event
00003 import pickle, time
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00046
00047
00048
00049 class HistoAgent( ) :
00050 def __init__( self, gmpComponent ) :
00051 self._gmpc = gmpComponent
00052 self.hvt = self._gmpc.hvt
00053 self.histos = []
00054 self.qin = self._gmpc.hq
00055 self.log = self._gmpc.log
00056
00057
00058
00059 self.bookingDict = {}
00060 self.bookingDict['DataObject'] = self.bookDataObject
00061 self.bookingDict['NTuple::Directory'] = self.bookDataObject
00062 self.bookingDict['NTuple::File'] = self.bookDataObject
00063 self.bookingDict['TH1D'] = self.bookTH1D
00064 self.bookingDict['TH2D'] = self.bookTH2D
00065 self.bookingDict['TH3D'] = self.bookTH3D
00066 self.bookingDict['TProfile'] = self.bookTProfile
00067 self.bookingDict['TProfile2D'] = self.bookTProfile2D
00068
00069 def register( self, tup ) :
00070
00071 assert tup.__class__.__name__ == 'tuple'
00072 self.histos.append( tup )
00073
00074 def Receive( self ) :
00075 hstatus = self._gmpc.nWorkers+1
00076 while True :
00077 tup = self.qin.get()
00078 if tup == 'HISTOS_SENT' :
00079 self.log.debug('received HISTOS_SENT message')
00080 hstatus -= 1
00081 if not hstatus : break
00082 else :
00083 self.register( tup )
00084 self._gmpc.sEvent.set()
00085 self.log.info('Writer received all histo bundles and set sync event')
00086 return SUCCESS
00087
00088
00089 def RebuildHistoStore( self ) :
00090 '''
00091 Rebuild the Histogram Store from the histos received by Receive()
00092 If we have a histo which is not in the store,
00093 book and fill it according to self.bookingDict
00094 If we have a histo with a matching histo in the store,
00095 add the two histos, remembering that aida2root must be used on
00096 the Stored histo for compatibility.
00097 '''
00098 errors = 0
00099 for tup in self.histos :
00100 workerID, histDict = tup
00101 added = 0 ; registered = 0; booked = 0
00102 for n in histDict.keys() :
00103 o = histDict[ n ]
00104 obj = self.hvt.retrieve( n )
00105 if obj :
00106 try :
00107 aida2root(obj).Add(o)
00108 except :
00109 self.log.warning('FAILED TO ADD : %s'%(str(obj)))
00110 errors += 1
00111 added += 1
00112 else :
00113 if o.__class__.__name__ in self.bookingDict.keys() :
00114 try :
00115 self.bookingDict[o.__class__.__name__](n, o)
00116 except :
00117 self.log.warning('FAILED TO REGISTER : %s\tto%s'\
00118 %(o.__class__.__name__, n))
00119 errors += 1
00120 else :
00121 self.log.warning( 'No booking method for: %s\t%s\t%s'\
00122 %(n,type(o),o.__class__.__name__) )
00123 errors += 1
00124 booked += 1
00125 hs = self.hvt.getHistoNames()
00126 self.log.info( 'Histo Store Rebuilt : ' )
00127 self.log.info( ' Contains %i objects.'%(len(hs)) )
00128 self.log.info( ' Errors in Rebuilding : %i'%(errors) )
00129 return SUCCESS
00130
00131
00132 def bookDataObject( self, n, o ):
00133 '''
00134 Register a DataObject to the Histo Store
00135 '''
00136 self._gmpc.hvt.registerObject( n, o )
00137
00138 def bookTH1D( self, n, o ) :
00139 '''
00140 Register a ROOT 1D THisto to the Histo Store
00141 '''
00142 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00143 o.GetXaxis().GetNbins(),\
00144 o.GetXaxis().GetXmin(),\
00145 o.GetXaxis().GetXmax() )
00146 aida2root(obj).Add(o)
00147
00148 def bookTH2D( self, n, o ) :
00149 '''
00150 Register a ROOT 2D THisto to the Histo Store
00151 '''
00152 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00153 o.GetXaxis().GetNbins(),\
00154 o.GetXaxis().GetXmin(),\
00155 o.GetXaxis().GetXmax(),\
00156 o.GetYaxis().GetNbins(),\
00157 o.GetYaxis().GetXmin(),\
00158 o.GetYaxis().GetXmax() )
00159 aida2root(obj).Add(o)
00160
00161 def bookTH3D( self, n, o ) :
00162 '''
00163 Register a ROOT 3D THisto to the Histo Store
00164 '''
00165 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00166 o.GetXaxis().GetXbins(),\
00167 o.GetXaxis().GetXmin(),\
00168 o.GetXaxis().GetXmax(),\
00169 o.GetYaxis().GetXbins(),\
00170 o.GetYaxis().GetXmin(),\
00171 o.GetYaxis().GetXmax(),\
00172 o.GetZaxis().GetXbins(),\
00173 o.GetZaxis().GetXmin(),\
00174 o.GetZaxis().GetXmax() )
00175 aida2root(obj).Add(o)
00176
00177 def bookTProfile( self, n, o ) :
00178 '''
00179 Register a ROOT TProfile to the Histo Store
00180 '''
00181 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00182 o.GetXaxis().GetNbins(),\
00183 o.GetXaxis().GetXmin(),\
00184 o.GetXaxis().GetXmax(),\
00185 o.GetOption() )
00186 aida2root(obj).Add(o)
00187
00188 def bookTProfile2D( self, n, o ) :
00189 '''
00190 Register a ROOT TProfile2D to the Histo Store
00191 '''
00192 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00193 o.GetXaxis().GetNbins(),\
00194 o.GetXaxis().GetXmin(),\
00195 o.GetXaxis().GetXmax(),\
00196 o.GetYaxis().GetNbins(),\
00197 o.GetYaxis().GetXmin(),\
00198 o.GetYaxis().GetXmax() )
00199 aida2root(obj).Add(o)
00200
00201
00202
00203 class FileRecordsAgent( ) :
00204 def __init__( self, gmpComponent ) :
00205 self._gmpc = gmpComponent
00206 self.fsr = self._gmpc.fsr
00207 self.q = self._gmpc.fq
00208 self.log = self._gmpc.log
00209 self.objectsIn = []
00210 self.objectsOut = []
00211
00212 def localCmp( self, tupA, tupB ) :
00213
00214
00215 ind = 0
00216 valA = tupA[ind]
00217 valB = tupB[ind]
00218 if valA<valB : return -1
00219 elif valA>valB : return 1
00220 else : return 0
00221
00222
00223 def SendFileRecords( self ) :
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 self.log.info('Sending FileRecords...')
00236 lst = self.fsr.getHistoNames()
00237
00238
00239 if not lst :
00240 self.log.info('No FileRecords Data to send to Writer.')
00241 self.q.put( 'END_FSR' )
00242 return SUCCESS
00243
00244
00245 if '/FileRecords' in lst : lst.remove('/FileRecords')
00246
00247 for l in lst :
00248 o = self.fsr.retrieveObject( l )
00249 if hasattr(o, "configureDirectAccess") :
00250 o.configureDirectAccess()
00251
00252 if self._gmpc.nodeID == 0 :
00253 self.objectsOut.append( (0, l, pickle.dumps(o)) )
00254 else :
00255
00256
00257 if l == '/FileRecords/EventCountFSR' :
00258 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00259 self.objectsOut.append( tup )
00260 else :
00261
00262 assert "KeyedContainer" in o.__class__.__name__
00263 nObjects = o.numberOfObjects()
00264 if nObjects :
00265 self.log.debug("Keyed Container %s with %i objects"\
00266 %(l, nObjects))
00267 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00268 self.objectsOut.append( tup )
00269 self.log.debug('Done with FSR store, just to send to Writer.')
00270
00271 if self.objectsOut :
00272 self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
00273 for ob in self.objectsOut :
00274 self.log.debug('\t%s'%(ob[0]))
00275 self.q.put( self.objectsOut )
00276 else :
00277 self.log.info('Valid FSR Store, but no data to send to Writer')
00278 self.log.info('SendFSR complete')
00279 self.q.put( 'END_FSR' )
00280 return SUCCESS
00281
00282 def Receive( self ) :
00283
00284 self.log.info('Receiving FSR store data...')
00285 nc = self._gmpc.nWorkers
00286 while nc > 0 :
00287 objects = self.q.get( )
00288 if objects == 'END_FSR' :
00289 nc -= 1
00290 continue
00291 if nc==0 :
00292 break
00293
00294 for o in objects :
00295 self.objectsIn.append(o)
00296
00297
00298 self.objectsIn.sort(cmp=self.localCmp)
00299 self.log.info('All FSR data received')
00300 return SUCCESS
00301
00302 def Rebuild( self ) :
00303
00304 for sourceNode, path, serialob in self.objectsIn :
00305 self.log.debug('Working with %s'%(path))
00306 ob = pickle.loads(serialob)
00307 if hasattr( ob, 'update' ) :
00308 ob.update()
00309 if hasattr( ob, 'numberOfObjects' ) :
00310 nCont = ob.numberOfObjects()
00311 self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
00312 if sourceNode == 0 :
00313 self.log.debug('Registering Object to : %s'%(path))
00314 self.fsr.registerObject( path, ob )
00315 else :
00316 self.log.debug('Merging Object to : %s'%(path))
00317 self.MergeFSRobject( sourceNode, path, ob )
00318
00319
00320
00321
00322 self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
00323 if bool( self.fsr._idp ) :
00324 ecount = '/FileRecords/EventCountFSR'
00325 if self.fsr[ecount] :
00326 self.fsr[ecount].setOutput( self._gmpc.nIn )
00327 self.log.info( 'Event Counter Output set : %s : %i'\
00328 %(ecount, self.fsr[ecount].output()) )
00329
00330 self.log.debug('FSR store reconstructed!')
00331 lst = self.fsr.getHistoNames()
00332 if lst :
00333 for l in lst :
00334 ob = self.fsr.retrieveObject(l)
00335 if hasattr( ob, 'configureDirectAccess' ) :
00336 ob.configureDirectAccess()
00337 if hasattr( ob, 'containedObjects' ) :
00338
00339 self.log.debug('\t%s (cont. objects : %i)'\
00340 %(l, ob.numberOfObjects()))
00341 else :
00342 self.log.debug('\t%s'%(l))
00343 self.log.info('FSR Store fully rebuilt.')
00344 return SUCCESS
00345
00346 def MergeFSRobject( self, sourceNode, path, ob ) :
00347
00348 if path == '/FileRecords/TimeSpanFSR' :
00349
00350 self.ProcessTimeSpanFSR( path, ob )
00351 elif path == '/FileRecords/EventCountFSR' :
00352
00353 self.ProcessEventCountFSR( path, ob )
00354
00355 elif "KeyedContainer" in ob.__class__.__name__ :
00356
00357
00358 if "LumiFSR" in ob.__class__.__name__ :
00359 self.MergeLumiFSR( path, ob )
00360 else:
00361 self.log.info("Skipping Merge of Keyed Container %s for %s"\
00362 %(ob.__class__.__name__,path))
00363
00364 def ProcessTimeSpanFSR( self, path, ob ) :
00365 ob2 = self.fsr.retrieveObject( path )
00366 if ob.containedObjects().size() :
00367 sz = ob.containedObjects().size()
00368 cob = ob2.containedObjects()[0]
00369 min = cob.earliest()
00370 max = cob.latest()
00371 for j in xrange( sz ) :
00372 cob = ob.containedObjects()[j]
00373 self.log.debug( 'Adding TimeSpanFSR' )
00374 if cob.earliest() < min:
00375 min = cob.earliest()
00376 if cob.latest() > max:
00377 max = cob.latest()
00378
00379 continue
00380 tsfsr = gbl.LHCb.TimeSpanFSR()
00381 tsfsr.setEarliest( min )
00382 tsfsr.setLatest( max )
00383 self.fsr[path].clear()
00384 self.fsr[path].add( tsfsr )
00385
00386 def ProcessEventCountFSR( self, path, ob ) :
00387 self.log.debug('Event Count Input Addition')
00388 self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
00389
00390 def MergeLumiFSR( self, path, keyedC ) :
00391 from ROOT import string
00392
00393 keyedContainer = self.fsr.retrieveObject(path)
00394
00395 assert keyedContainer.numberOfObjects() == 1
00396 l = keyedContainer.containedObject(0)
00397 baseLumi = LumiFSR( l )
00398
00399 nCont = keyedC.numberOfObjects()
00400 for i in xrange(nCont) :
00401 obj = keyedC.containedObject(i)
00402 nextLumi = LumiFSR( obj )
00403 baseLumi.merge( nextLumi )
00404
00405 newLumi = gbl.LHCb.LumiFSR()
00406 for r in baseLumi.runs :
00407 newLumi.addRunNumber( r )
00408 for f in baseLumi.files :
00409 newLumi.addFileID( string(f) )
00410 for k in baseLumi.keys :
00411 increment, integral = baseLumi.info[k]
00412 newLumi.addInfo(k, increment, integral)
00413
00414 self.fsr[path].clear()
00415
00416 self.fsr[path].add(newLumi)
00417 return SUCCESS
00418
00419
00420
00421 class LumiFSR( ) :
00422 def __init__(self, lumi) :
00423
00424
00425
00426
00427
00428
00429 self.runs = []
00430 self.files = []
00431 self.info = {}
00432 self.keys = []
00433
00434
00435 for r in lumi.runNumbers() :
00436 self.runs.append(r)
00437
00438 for f in lumi.fileIDs() :
00439 self.files.append(f)
00440
00441
00442 s = str(lumi)
00443 sa = s.split("info (key/incr/integral) : ")[-1]
00444 sa = sa.split('/')[:-1]
00445 for rec in sa :
00446 k,i,t = rec.split()
00447 k = int(k)
00448 i = int(i)
00449 t = int(t)
00450 self.info[k] = (i,t)
00451 self.keys = self.info.keys()
00452 def merge( self, otherLumi ) :
00453 assert otherLumi.__class__.__name__ == "LumiFSR"
00454
00455 for r in otherLumi.runs :
00456 if r in self.runs :
00457 pass
00458 else :
00459 self.runs.append( r )
00460 self.runs.sort()
00461
00462 for f in otherLumi.files :
00463 if f in self.files :
00464 pass
00465 else :
00466 self.files.append( f )
00467 self.files.sort()
00468
00469 for k in otherLumi.keys :
00470 increment, integral = otherLumi.info[k]
00471 if k in self.keys :
00472 myIncrement, myIntegral = self.info[k]
00473 self.info[k] = ( myIncrement+increment, myIntegral+integral )
00474 else :
00475 self.info[k] = ( increment, integral )
00476
00477 self.keys = self.info.keys()
00478 def __repr__( self ) :
00479 s = "LumiFSR Python class\n"
00480 s += "\tRuns : \n"
00481 for r in self.runs :
00482 s += "\t\t%i\n"%(r)
00483 s += "\tFiles : \n"
00484 for f in self.files :
00485 s += "\t\t%s\n"%(f)
00486 s += "\tInfo : \n"
00487 for k in self.keys :
00488 increment, integral = self.info[k]
00489 s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
00490 return s
00491
00492
00493
00494 class PackedCaloHypo() :
00495 def __init__(self, o) :
00496 cl = 'LHCb::PackedCaloHypo'
00497 assert o.__class__.__name__ == cl
00498 self.centX = o.centX
00499 self.centY = o.centY
00500 self.cerr = (o.cerr00,o.cerr10,o.cerr11)
00501 self.cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
00502 self.firstCluster = o.firstCluster
00503 self.firstDigit = o.firstDigit
00504 self.firstHypo = o.firstHypo
00505 self.hypothesis = o.hypothesis
00506 self.key = o.key
00507 self.lastCluster = o.lastCluster
00508 self.lastDigit = o.lastDigit
00509 self.lastHypo = o.lastHypo
00510 self.lh = o.lh
00511 self.pos = (o.posE, o.posX, o.posY)
00512 self.z = o.z
00513 def __repr__( self ) :
00514 s = "PackedCaloHypo : \n"
00515 s += "\tcentX : %s\n"%( str(self.centX) )
00516 s += "\tcentY : %s\n"%( str(self.centY) )
00517 s += "\tcerr : %s\n"%( str(self.cerr ) )
00518 s += "\tcov : %s\n"%( str(self.cov ) )
00519 s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
00520 s += "\tfirstDigit : %s\n"%( str(self.firstDigit) )
00521 s += "\tfirstHypo : %s\n"%( str(self.firstHypo) )
00522 s += "\thypothesis : %s\n"%( str(self.hypothesis) )
00523 s += "\tkey : %s\n"%( str(self.key) )
00524 s += "\tlastCluster : %s\n"%( str(self.lastCluster) )
00525 s += "\tlastDigit : %s\n"%( str(self.lastDigit) )
00526 s += "\tlastHypo : %s\n"%( str(self.lastHypo) )
00527 s += "\tlh : %s\n"%( str(self.lh ) )
00528 s += "\tpos : %s\n"%( str(self.pos ) )
00529 s += "\tz : %s\n"%( str(self.z ) )
00530 s += "---------------------------------------\n"
00531 return s
00532
00533
00534
00535 class SyncMini( object ) :
00536 def __init__( self, event, lastEvent=None ) :
00537 self.event = event
00538 self.t = 0.0
00539 self.lastEvent = None
00540 if lastEvent :
00541 self.lastEvent = lastEvent
00542 def check( self ) :
00543 return self.event.is_set()
00544 def checkLast( self ) :
00545 return self.lastEvent.is_set()
00546 def reset( self ) :
00547 self.event.clear()
00548 self.t = time.time()
00549 def getTime( self ) :
00550 return self.t
00551 def set( self ) :
00552 self.event.set()
00553 def __repr__( self ) :
00554 s = "---------- SyncMini --------------\n"
00555 s += " Status : %s\n"%(self.event.is_set())
00556 s += " t : %5.2f\n"%(self.t)
00557 if self.lastEvent :
00558 s += "Last Event : %s\n"%(self.lastEvent.is_set())
00559 s += "----------------------------------\n"
00560 return s
00561
00562
00563
00564 class Syncer( object ) :
00565 def __init__( self, nWorkers, log, manyEvents=False,
00566 limit=None, step=None, firstEvent=None ) :
00567
00568 self.limit = limit
00569 self.step = step
00570 self.d = {}
00571 self.manyEvents = manyEvents
00572 for i in xrange(-2, nWorkers) :
00573 self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
00574 if self.manyEvents :
00575 self.limitFirst = firstEvent
00576 self.keys = self.d.keys()
00577 self.nWorkers = nWorkers
00578 self.log = log
00579
00580 def syncAll( self, step="Not specified" ) :
00581
00582
00583
00584 if self.manyEvents :
00585 sc = self.syncAllRolling( )
00586 return sc
00587
00588
00589 for i in xrange( 0, self.limit, self.step ) :
00590 if self.checkAll( ) :
00591 self.log.info('%s : All procs done @ %i s'%(step,i))
00592 break
00593 else :
00594 time.sleep(self.step)
00595
00596
00597 if self.checkAll() :
00598 self.log.info("All processes : %s ok."%(step))
00599 return SUCCESS
00600 else :
00601 self.log.critical('Some process is hanging on : %s'%(step))
00602 for k in self.keys :
00603 hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
00604 self.log.critical( hangString )
00605 return FAILURE
00606
00607 def syncAllRolling( self ) :
00608
00609
00610
00611
00612
00613
00614
00615 begin = time.time()
00616 firstEv = {}
00617 timers = {}
00618 for k in self.keys :
00619 self.d[k].reset()
00620 firstEv[k] = False
00621 timers[k] = 0.0
00622
00623 active = self.keys
00624 while True :
00625
00626 for k in active :
00627 sMini = self.d[k]
00628
00629 if sMini.check() or sMini.checkLast():
00630 if sMini.checkLast() and sMini.check() :
00631
00632 active.remove( k )
00633 alive = time.time()-begin
00634 self.log.info( "Audit : Node %i alive for %5.2f"\
00635 %(k,alive) )
00636 else :
00637 sMini.reset()
00638 else :
00639
00640
00641 wait = time.time()-sMini.getTime()
00642 cond = wait > self.limit
00643 if not firstEv[k] :
00644 cond = wait > self.limitFirst
00645 firstEv[k] = True
00646 if cond :
00647
00648 self.log.critical('Single event wait : %5.2f'%(wait))
00649 self.processHang()
00650 return FAILURE
00651
00652
00653 if self.checkLastEvents() :
00654 self.log.info('TC met for event loop')
00655 break
00656 else :
00657
00658 time.sleep(self.step)
00659
00660 self.log.info("All processes Completed all Events ok")
00661 return SUCCESS
00662
00663 def processHang( self ) :
00664 self.log.critical('Some proc is hanging during Event processing!')
00665 for k in self.keys :
00666 self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
00667 return
00668
00669 def checkAll( self ) :
00670
00671
00672 currentStatus = [ mini.check() for mini in self.d.values() ]
00673 return all( currentStatus )
00674
00675 def checkLastEvents( self ) :
00676
00677 stat = [ sMini.checkLast() for sMini in self.d.values() ]
00678 return all(stat)
00679
00680
00681
00682 def getEventNumber( evt ) :
00683
00684
00685 n = None
00686
00687 lst = [ '/Event/Gen/Header',
00688 '/Event/Rec/Header' ]
00689 for l in lst :
00690 try :
00691 n = evt[l].evtNumber()
00692 return n
00693 except :
00694
00695 continue
00696
00697
00698
00699 try :
00700 n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00701 return n
00702 except :
00703 pass
00704
00705
00706 return n
00707
00708
00709
00710