00001 from GaudiPython import gbl, SUCCESS, FAILURE
00002 from multiprocessing import Event
00003 import pickle, time
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
00046
00047
00048
00049 class HistoAgent( ) :
00050 def __init__( self, gmpComponent ) :
00051 self._gmpc = gmpComponent
00052 self.hvt = self._gmpc.hvt
00053 self.histos = []
00054 self.qin = self._gmpc.hq
00055 self.log = self._gmpc.log
00056
00057
00058
00059 self.bookingDict = {}
00060 self.bookingDict['DataObject'] = self.bookDataObject
00061 self.bookingDict['NTuple::Directory'] = self.bookDataObject
00062 self.bookingDict['NTuple::File'] = self.bookDataObject
00063 self.bookingDict['TH1D'] = self.bookTH1D
00064 self.bookingDict['TH2D'] = self.bookTH2D
00065 self.bookingDict['TH3D'] = self.bookTH3D
00066 self.bookingDict['TProfile'] = self.bookTProfile
00067 self.bookingDict['TProfile2D'] = self.bookTProfile2D
00068
00069 def register( self, tup ) :
00070
00071 assert tup.__class__.__name__ == 'tuple'
00072 self.histos.append( tup )
00073
00074 def Receive( self ) :
00075 hstatus = self._gmpc.nWorkers+1
00076 while True :
00077 tup = self.qin.get()
00078 if tup == 'HISTOS_SENT' :
00079 self.log.debug('received HISTOS_SENT message')
00080 hstatus -= 1
00081 if not hstatus : break
00082 else :
00083 self.register( tup )
00084 self._gmpc.sEvent.set()
00085 self.log.info('Writer received all histo bundles and set sync event')
00086 return SUCCESS
00087
00088
00089 def RebuildHistoStore( self ) :
00090 '''
00091 Rebuild the Histogram Store from the histos received by Receive()
00092 If we have a histo which is not in the store,
00093 book and fill it according to self.bookingDict
00094 If we have a histo with a matching histo in the store,
00095 add the two histos, remembering that aida2root must be used on
00096 the Stored histo for compatibility.
00097 '''
00098 errors = 0
00099 for tup in self.histos :
00100 workerID, histDict = tup
00101 added = 0 ; registered = 0; booked = 0
00102 for n in histDict.keys() :
00103 o = histDict[ n ]
00104 obj = self.hvt.retrieve( n )
00105 if obj :
00106 try :
00107 aida2root(obj).Add(o)
00108 except :
00109 self.log.warning('FAILED TO ADD : %s'%(str(obj)))
00110 errors += 1
00111 added += 1
00112 else :
00113 if o.__class__.__name__ in self.bookingDict.keys() :
00114 try :
00115 self.bookingDict[o.__class__.__name__](n, o)
00116 except :
00117 self.log.warning('FAILED TO REGISTER : %s\tto%s'\
00118 %(o.__class__.__name__, n))
00119 errors += 1
00120 else :
00121 self.log.warning( 'No booking method for: %s\t%s\t%s'\
00122 %(n,type(o),o.__class__.__name__) )
00123 errors += 1
00124 booked += 1
00125 hs = self.hvt.getHistoNames()
00126 self.log.info( 'Histo Store Rebuilt : ' )
00127 self.log.info( ' Contains %i objects.'%(len(hs)) )
00128 self.log.info( ' Errors in Rebuilding : %i'%(errors) )
00129 return SUCCESS
00130
00131
00132 def bookDataObject( self, n, o ):
00133 '''
00134 Register a DataObject to the Histo Store
00135 '''
00136 self._gmpc.hvt.registerObject( n, o )
00137
00138 def bookTH1D( self, n, o ) :
00139 '''
00140 Register a ROOT 1D THisto to the Histo Store
00141 '''
00142 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00143 o.GetXaxis().GetNbins(),\
00144 o.GetXaxis().GetXmin(),\
00145 o.GetXaxis().GetXmax() )
00146 aida2root(obj).Add(o)
00147
00148 def bookTH2D( self, n, o ) :
00149 '''
00150 Register a ROOT 2D THisto to the Histo Store
00151 '''
00152 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00153 o.GetXaxis().GetNbins(),\
00154 o.GetXaxis().GetXmin(),\
00155 o.GetXaxis().GetXmax(),\
00156 o.GetYaxis().GetNbins(),\
00157 o.GetYaxis().GetXmin(),\
00158 o.GetYaxis().GetXmax() )
00159 aida2root(obj).Add(o)
00160
00161 def bookTH3D( self, n, o ) :
00162 '''
00163 Register a ROOT 3D THisto to the Histo Store
00164 '''
00165 obj = self.hvt._ihs.book( n, o.GetTitle(),\
00166 o.GetXaxis().GetXbins(),\
00167 o.GetXaxis().GetXmin(),\
00168 o.GetXaxis().GetXmax(),\
00169 o.GetYaxis().GetXbins(),\
00170 o.GetYaxis().GetXmin(),\
00171 o.GetYaxis().GetXmax(),\
00172 o.GetZaxis().GetXbins(),\
00173 o.GetZaxis().GetXmin(),\
00174 o.GetZaxis().GetXmax() )
00175 aida2root(obj).Add(o)
00176
00177 def bookTProfile( self, n, o ) :
00178 '''
00179 Register a ROOT TProfile to the Histo Store
00180 '''
00181 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00182 o.GetXaxis().GetNbins(),\
00183 o.GetXaxis().GetXmin(),\
00184 o.GetXaxis().GetXmax(),\
00185 o.GetOption() )
00186 aida2root(obj).Add(o)
00187
00188 def bookTProfile2D( self, n, o ) :
00189 '''
00190 Register a ROOT TProfile2D to the Histo Store
00191 '''
00192 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
00193 o.GetXaxis().GetNbins(),\
00194 o.GetXaxis().GetXmin(),\
00195 o.GetXaxis().GetXmax(),\
00196 o.GetYaxis().GetNbins(),\
00197 o.GetYaxis().GetXmin(),\
00198 o.GetYaxis().GetXmax() )
00199 aida2root(obj).Add(o)
00200
00201
00202
00203 class FileRecordsAgent( ) :
00204 def __init__( self, gmpComponent ) :
00205 self._gmpc = gmpComponent
00206 self.fsr = self._gmpc.fsr
00207 self.q = self._gmpc.fq
00208 self.log = self._gmpc.log
00209 self.objectsIn = []
00210 self.objectsOut = []
00211
00212 def localCmp( self, tupA, tupB ) :
00213
00214
00215 ind = 0
00216 valA = tupA[ind]
00217 valB = tupB[ind]
00218 if valA<valB : return -1
00219 elif valA>valB : return 1
00220 else : return 0
00221
00222
00223 def SendFileRecords( self ) :
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 self.log.info('Sending FileRecords...')
00236 lst = self.fsr.getHistoNames()
00237
00238
00239 if not lst :
00240 self.log.info('No FileRecords Data to send to Writer.')
00241 self.q.put( 'END_FSR' )
00242 return SUCCESS
00243
00244
00245 if '/FileRecords' in lst : lst.remove('/FileRecords')
00246
00247 for l in lst :
00248 o = self.fsr.retrieveObject( l )
00249 if hasattr(o, "configureDirectAccess") :
00250 o.configureDirectAccess()
00251
00252 if self._gmpc.nodeID == 0 :
00253 self.objectsOut.append( (0, l, pickle.dumps(o)) )
00254 else :
00255
00256
00257 if l == '/FileRecords/EventCountFSR' :
00258 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00259 self.objectsOut.append( tup )
00260 else :
00261
00262 assert "KeyedContainer" in o.__class__.__name__
00263 nObjects = o.numberOfObjects()
00264 if nObjects :
00265 self.log.debug("Keyed Container %s with %i objects"\
00266 %(l, nObjects))
00267 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
00268 self.objectsOut.append( tup )
00269 self.log.debug('Done with FSR store, just to send to Writer.')
00270
00271 if self.objectsOut :
00272 self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
00273 for ob in self.objectsOut :
00274 self.log.debug('\t%s'%(ob[0]))
00275 self.q.put( self.objectsOut )
00276 else :
00277 self.log.info('Valid FSR Store, but no data to send to Writer')
00278 self.log.info('SendFSR complete')
00279 self.q.put( 'END_FSR' )
00280 return SUCCESS
00281
00282 def Receive( self ) :
00283
00284 self.log.info('Receiving FSR store data...')
00285 nc = self._gmpc.nWorkers
00286 while nc > 0 :
00287 objects = self.q.get( )
00288 if objects == 'END_FSR' :
00289 nc -= 1
00290 continue
00291 if nc==0 :
00292 break
00293
00294 for o in objects :
00295 self.objectsIn.append(o)
00296
00297
00298 self.objectsIn.sort(cmp=self.localCmp)
00299 self.log.info('All FSR data received')
00300 return SUCCESS
00301
00302 def Rebuild( self ) :
00303
00304 for sourceNode, path, serialob in self.objectsIn :
00305 self.log.debug('Working with %s'%(path))
00306 ob = pickle.loads(serialob)
00307 if hasattr( ob, 'update' ) :
00308 ob.update()
00309 if hasattr( ob, 'numberOfObjects' ) :
00310 nCont = ob.numberOfObjects()
00311 self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
00312 if sourceNode == 0 :
00313 self.log.debug('Registering Object to : %s'%(path))
00314 self.fsr.registerObject( path, ob )
00315 else :
00316 self.log.debug('Merging Object to : %s'%(path))
00317 self.MergeFSRobject( sourceNode, path, ob )
00318
00319
00320
00321
00322 self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
00323 if bool( self.fsr._idp ) :
00324 ecount = '/FileRecords/EventCountFSR'
00325 if self.fsr[ecount] :
00326 self.fsr[ecount].setOutput( self._gmpc.nIn )
00327 self.log.info( 'Event Counter Output set : %s : %i'\
00328 %(ecount, self.fsr[ecount].output()) )
00329
00330 self.log.debug('FSR store reconstructed!')
00331 lst = self.fsr.getHistoNames()
00332 if lst :
00333 for l in lst :
00334 ob = self.fsr.retrieveObject(l)
00335 if hasattr( ob, 'configureDirectAccess' ) :
00336 ob.configureDirectAccess()
00337 if hasattr( ob, 'containedObjects' ) :
00338
00339 self.log.debug('\t%s (cont. objects : %i)'\
00340 %(l, ob.numberOfObjects()))
00341 else :
00342 self.log.debug('\t%s'%(l))
00343 self.log.info('FSR Store fully rebuilt.')
00344 return SUCCESS
00345
00346 def MergeFSRobject( self, sourceNode, path, ob ) :
00347
00348 if path == '/FileRecords/TimeSpanFSR' :
00349
00350 self.ProcessTimeSpanFSR( path, ob )
00351 elif path == '/FileRecords/EventCountFSR' :
00352
00353 self.ProcessEventCountFSR( path, ob )
00354
00355 elif "KeyedContainer" in ob.__class__.__name__ :
00356
00357
00358 if "LumiFSR" in ob.__class__.__name__ :
00359 self.MergeLumiFSR( path, ob )
00360 else:
00361 self.log.info("Skipping Merge of Keyed Container %s for %s"\
00362 %(ob.__class__.__name__,path))
00363
00364 def ProcessTimeSpanFSR( self, path, ob ) :
00365 if ob.containedObjects().size() :
00366 sz = ob.containedObjects().size()
00367 for j in xrange(sz) :
00368 cob = ob.containedObjects()[j]
00369 self.log.debug('Adding TimeSpanFSR')
00370
00371 tsfsr = gbl.LHCb.TimeSpanFSR()
00372 tsfsr.setEarliest( cob.earliest() )
00373 tsfsr.setLatest( cob.latest( ) )
00374 self.fsr[path].add(tsfsr)
00375 continue
00376
00377 def ProcessEventCountFSR( self, path, ob ) :
00378 self.log.debug('Event Count Input Addition')
00379 self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
00380
00381 def MergeLumiFSR( self, path, keyedC ) :
00382 from ROOT import string
00383
00384 keyedContainer = self.fsr.retrieveObject(path)
00385
00386 assert keyedContainer.numberOfObjects() == 1
00387 l = keyedContainer.containedObject(0)
00388 baseLumi = LumiFSR( l )
00389
00390 nCont = keyedC.numberOfObjects()
00391 for i in xrange(nCont) :
00392 obj = keyedC.containedObject(i)
00393 nextLumi = LumiFSR( obj )
00394 baseLumi.merge( nextLumi )
00395
00396 newLumi = gbl.LHCb.LumiFSR()
00397 for r in baseLumi.runs :
00398 newLumi.addRunNumber( r )
00399 for f in baseLumi.files :
00400 newLumi.addFileID( string(f) )
00401 for k in baseLumi.keys :
00402 increment, integral = baseLumi.info[k]
00403 newLumi.addInfo(k, increment, integral)
00404
00405 self.fsr[path].clear()
00406
00407 self.fsr[path].add(newLumi)
00408 return SUCCESS
00409
00410
00411
00412 class LumiFSR( ) :
00413 def __init__(self, lumi) :
00414
00415
00416
00417
00418
00419
00420 self.runs = []
00421 self.files = []
00422 self.info = {}
00423 self.keys = []
00424
00425
00426 for r in lumi.runNumbers() :
00427 self.runs.append(r)
00428
00429 for f in lumi.fileIDs() :
00430 self.files.append(f)
00431
00432
00433 s = str(lumi)
00434 sa = s.split("info (key/incr/integral) : ")[-1]
00435 sa = sa.split('/')[:-1]
00436 for rec in sa :
00437 k,i,t = rec.split()
00438 k = int(k)
00439 i = int(i)
00440 t = int(t)
00441 self.info[k] = (i,t)
00442 self.keys = self.info.keys()
00443 def merge( self, otherLumi ) :
00444 assert otherLumi.__class__.__name__ == "LumiFSR"
00445
00446 for r in otherLumi.runs :
00447 if r in self.runs :
00448 pass
00449 else :
00450 self.runs.append( r )
00451 self.runs.sort()
00452
00453 for f in otherLumi.files :
00454 if f in self.files :
00455 pass
00456 else :
00457 self.files.append( f )
00458 self.files.sort()
00459
00460 for k in otherLumi.keys :
00461 increment, integral = otherLumi.info[k]
00462 if k in self.keys :
00463 myIncrement, myIntegral = self.info[k]
00464 self.info[k] = ( myIncrement+increment, myIntegral+integral )
00465 else :
00466 self.info[k] = ( increment, integral )
00467
00468 self.keys = self.info.keys()
00469 def __repr__( self ) :
00470 s = "LumiFSR Python class\n"
00471 s += "\tRuns : \n"
00472 for r in self.runs :
00473 s += "\t\t%i\n"%(r)
00474 s += "\tFiles : \n"
00475 for f in self.files :
00476 s += "\t\t%s\n"%(f)
00477 s += "\tInfo : \n"
00478 for k in self.keys :
00479 increment, integral = self.info[k]
00480 s += "\t\t%i\t%i\t%i\n"%(k,increment,integral)
00481 return s
00482
00483
00484
00485 class PackedCaloHypo() :
00486 def __init__(self, o) :
00487 cl = 'LHCb::PackedCaloHypo'
00488 assert o.__class__.__name__ == cl
00489 self.centX = o.centX
00490 self.centY = o.centY
00491 self.cerr = (o.cerr00,o.cerr10,o.cerr11)
00492 self.cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
00493 self.firstCluster = o.firstCluster
00494 self.firstDigit = o.firstDigit
00495 self.firstHypo = o.firstHypo
00496 self.hypothesis = o.hypothesis
00497 self.key = o.key
00498 self.lastCluster = o.lastCluster
00499 self.lastDigit = o.lastDigit
00500 self.lastHypo = o.lastHypo
00501 self.lh = o.lh
00502 self.pos = (o.posE, o.posX, o.posY)
00503 self.z = o.z
00504 def __repr__( self ) :
00505 s = "PackedCaloHypo : \n"
00506 s += "\tcentX : %s\n"%( str(self.centX) )
00507 s += "\tcentY : %s\n"%( str(self.centY) )
00508 s += "\tcerr : %s\n"%( str(self.cerr ) )
00509 s += "\tcov : %s\n"%( str(self.cov ) )
00510 s += "\tfirstCluster : %s\n"%( str(self.firstCluster) )
00511 s += "\tfirstDigit : %s\n"%( str(self.firstDigit) )
00512 s += "\tfirstHypo : %s\n"%( str(self.firstHypo) )
00513 s += "\thypothesis : %s\n"%( str(self.hypothesis) )
00514 s += "\tkey : %s\n"%( str(self.key) )
00515 s += "\tlastCluster : %s\n"%( str(self.lastCluster) )
00516 s += "\tlastDigit : %s\n"%( str(self.lastDigit) )
00517 s += "\tlastHypo : %s\n"%( str(self.lastHypo) )
00518 s += "\tlh : %s\n"%( str(self.lh ) )
00519 s += "\tpos : %s\n"%( str(self.pos ) )
00520 s += "\tz : %s\n"%( str(self.z ) )
00521 s += "---------------------------------------\n"
00522 return s
00523
00524
00525
00526 class SyncMini( object ) :
00527 def __init__( self, event, lastEvent=None ) :
00528 self.event = event
00529 self.t = 0.0
00530 self.lastEvent = None
00531 if lastEvent :
00532 self.lastEvent = lastEvent
00533 def check( self ) :
00534 return self.event.is_set()
00535 def checkLast( self ) :
00536 return self.lastEvent.is_set()
00537 def reset( self ) :
00538 self.event.clear()
00539 self.t = time.time()
00540 def getTime( self ) :
00541 return self.t
00542 def set( self ) :
00543 self.event.set()
00544 def __repr__( self ) :
00545 s = "---------- SyncMini --------------\n"
00546 s += " Status : %s\n"%(self.event.is_set())
00547 s += " t : %5.2f\n"%(self.t)
00548 if self.lastEvent :
00549 s += "Last Event : %s\n"%(self.lastEvent.is_set())
00550 s += "----------------------------------\n"
00551 return s
00552
00553
00554
00555 class Syncer( object ) :
00556 def __init__( self, nWorkers, log, manyEvents=False,
00557 limit=None, step=None, firstEvent=None ) :
00558
00559 self.limit = limit
00560 self.step = step
00561 self.d = {}
00562 self.manyEvents = manyEvents
00563 for i in xrange(-2, nWorkers) :
00564 self.d[ i ] = SyncMini( Event(), lastEvent=Event() )
00565 if self.manyEvents :
00566 self.limitFirst = firstEvent
00567 self.keys = self.d.keys()
00568 self.nWorkers = nWorkers
00569 self.log = log
00570
00571 def syncAll( self, step="Not specified" ) :
00572
00573
00574
00575 if self.manyEvents :
00576 sc = self.syncAllRolling( )
00577 return sc
00578
00579
00580 for i in xrange( 0, self.limit, self.step ) :
00581 if self.checkAll( ) :
00582 self.log.info('%s : All procs done @ %i s'%(step,i))
00583 break
00584 else :
00585 time.sleep(self.step)
00586
00587
00588 if self.checkAll() :
00589 self.log.info("All processes : %s ok."%(step))
00590 return SUCCESS
00591 else :
00592 self.log.critical('Some process is hanging on : %s'%(step))
00593 for k in self.keys :
00594 hangString= "%s : Proc/Stat : %i/%s"%(step,k,self.d[k].check())
00595 self.log.critical( hangString )
00596 return FAILURE
00597
00598 def syncAllRolling( self ) :
00599
00600
00601
00602
00603
00604
00605
00606 begin = time.time()
00607 firstEv = {}
00608 timers = {}
00609 for k in self.keys :
00610 self.d[k].reset()
00611 firstEv[k] = False
00612 timers[k] = 0.0
00613
00614 active = self.keys
00615 while True :
00616
00617 for k in active :
00618 sMini = self.d[k]
00619
00620 if sMini.check() :
00621 if sMini.checkLast() :
00622
00623 active.remove( k )
00624 alive = time.time()-begin
00625 self.log.info( "Audit : Node %i alive for %5.2f"\
00626 %(k,alive) )
00627 else :
00628 sMini.reset()
00629 else :
00630
00631
00632 wait = time.time()-sMini.getTime()
00633 cond = wait > self.limit
00634 if not firstEv[k] :
00635 cond = wait > self.limitFirst
00636 firstEv[k] = True
00637 if cond :
00638
00639 self.log.critical('Single event wait : %5.2f'%(wait))
00640 self.processHang()
00641 return FAILURE
00642
00643
00644 if self.checkLastEvents() :
00645 self.log.info('TC met for event loop')
00646 break
00647 else :
00648
00649 time.sleep(self.step)
00650
00651 self.log.info("All processes Completed all Events ok")
00652 return SUCCESS
00653
00654 def processHang( self ) :
00655 self.log.critical('Some proc is hanging during Event processing!')
00656 for k in self.keys :
00657 self.log.critical( "Proc/Stat : %i / %s"%(k,self.d[k].check()) )
00658 return
00659
00660 def checkAll( self ) :
00661
00662
00663 currentStatus = [ mini.check() for mini in self.d.values() ]
00664 return all( currentStatus )
00665
00666 def checkLastEvents( self ) :
00667
00668 stat = [ sMini.checkLast() for sMini in self.d.values() ]
00669 return all(stat)
00670
00671
00672
00673 def getEventNumber( evt ) :
00674
00675
00676 n = None
00677
00678 lst = [ '/Event/Gen/Header',
00679 '/Event/Rec/Header' ]
00680 for l in lst :
00681 try :
00682 n = evt[l].evtNumber()
00683 return n
00684 except :
00685
00686 continue
00687
00688
00689
00690 try :
00691 n = evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
00692 return n
00693 except :
00694 pass
00695
00696
00697 return n
00698
00699
00700
00701