1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
51 self._gmpc = gmpComponent
52 self.hvt = self._gmpc.hvt
54 self.qin = self._gmpc.hq
55 self.log = self._gmpc.log
60 self.bookingDict[
'DataObject'] = self.bookDataObject
61 self.bookingDict[
'NTuple::Directory'] = self.bookDataObject
62 self.bookingDict[
'NTuple::File'] = self.bookDataObject
63 self.bookingDict[
'TH1D'] = self.bookTH1D
64 self.bookingDict[
'TH2D'] = self.bookTH2D
65 self.bookingDict[
'TH3D'] = self.bookTH3D
66 self.bookingDict[
'TProfile'] = self.bookTProfile
67 self.bookingDict[
'TProfile2D'] = self.bookTProfile2D
69 def register( self, tup ) :
71 assert tup.__class__.__name__ ==
'tuple'
72 self.histos.append( tup )
75 hstatus = self._gmpc.nWorkers+1
78 if tup ==
'HISTOS_SENT' :
79 self.log.debug(
'received HISTOS_SENT message')
81 if not hstatus :
break
84 self._gmpc.sEvent.set()
85 self.log.info(
'Writer received all histo bundles and set sync event')
89 def RebuildHistoStore( self ) :
91 Rebuild the Histogram Store from the histos received by Receive()
92 If we have a histo which is not in the store,
93 book and fill it according to self.bookingDict
94 If we have a histo with a matching histo in the store,
95 add the two histos, remembering that aida2root must be used on
96 the Stored histo for compatibility.
99 for tup
in self.histos :
100 workerID, histDict = tup
101 added = 0 ; registered = 0; booked = 0
103 for n
in histDict.keys() :
105 obj = self.hvt.retrieve( n )
111 self.log.warning(
'FAILED TO ADD : %s'%(str(obj)))
116 if o.__class__.__name__
in self.bookingDict.keys() :
118 self.bookingDict[o.__class__.__name__](n, o)
120 self.log.warning(
'FAILED TO REGISTER : %s\tto%s'\
121 %(o.__class__.__name__, n))
124 self.log.warning(
'No booking method for: %s\t%s\t%s'\
125 %(n,
type(o),o.__class__.__name__) )
128 hs = self.hvt.getHistoNames()
129 self.log.info(
'Histo Store Rebuilt : ' )
130 self.log.info(
' Contains %i objects.'%(len(hs)) )
131 self.log.info(
' Errors in Rebuilding : %i'%(errors) )
135 def bookDataObject( self, n, o ):
137 Register a DataObject to the Histo Store
139 self._gmpc.hvt.registerObject( n, o )
141 def bookTH1D( self, n, o ) :
143 Register a ROOT 1D THisto to the Histo Store
145 obj = self.hvt._ihs.book( n, o.GetTitle(),\
146 o.GetXaxis().GetNbins(),\
147 o.GetXaxis().GetXmin(),\
148 o.GetXaxis().GetXmax() )
151 def bookTH2D( self, n, o ) :
153 Register a ROOT 2D THisto to the Histo Store
155 obj = self.hvt._ihs.book( n, o.GetTitle(),\
156 o.GetXaxis().GetNbins(),\
157 o.GetXaxis().GetXmin(),\
158 o.GetXaxis().GetXmax(),\
159 o.GetYaxis().GetNbins(),\
160 o.GetYaxis().GetXmin(),\
161 o.GetYaxis().GetXmax() )
164 def bookTH3D( self, n, o ) :
166 Register a ROOT 3D THisto to the Histo Store
168 obj = self.hvt._ihs.book( n, o.GetTitle(),\
169 o.GetXaxis().GetXbins(),\
170 o.GetXaxis().GetXmin(),\
171 o.GetXaxis().GetXmax(),\
172 o.GetYaxis().GetXbins(),\
173 o.GetYaxis().GetXmin(),\
174 o.GetYaxis().GetXmax(),\
175 o.GetZaxis().GetXbins(),\
176 o.GetZaxis().GetXmin(),\
177 o.GetZaxis().GetXmax() )
180 def bookTProfile( self, n, o ) :
182 Register a ROOT TProfile to the Histo Store
184 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
185 o.GetXaxis().GetNbins(),\
186 o.GetXaxis().GetXmin(),\
187 o.GetXaxis().GetXmax(),\
191 def bookTProfile2D( self, n, o ) :
193 Register a ROOT TProfile2D to the Histo Store
195 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
196 o.GetXaxis().GetNbins(),\
197 o.GetXaxis().GetXmin(),\
198 o.GetXaxis().GetXmax(),\
199 o.GetYaxis().GetNbins(),\
200 o.GetYaxis().GetXmin(),\
201 o.GetYaxis().GetXmax() )
206 class FileRecordsAgent( ) :
207 def __init__( self, gmpComponent ) :
208 self._gmpc = gmpComponent
209 self.fsr = self._gmpc.fsr
210 self.q = self._gmpc.fq
211 self.log = self._gmpc.log
215 def localCmp( self, tupA, tupB ) :
221 if valA<valB :
return -1
222 elif valA>valB :
return 1
226 def SendFileRecords( self ) :
238 self.log.info(
'Sending FileRecords...')
239 lst = self.fsr.getHistoNames()
243 self.log.info(
'No FileRecords Data to send to Writer.')
244 self.q.put(
'END_FSR' )
248 if '/FileRecords' in lst : lst.remove(
'/FileRecords')
251 o = self.fsr.retrieveObject( l )
252 if hasattr(o,
"configureDirectAccess") :
253 o.configureDirectAccess()
255 if self._gmpc.nodeID == 0 :
256 self.objectsOut.append( (0, l, pickle.dumps(o)) )
260 if l ==
'/FileRecords/EventCountFSR' :
261 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
262 self.objectsOut.append( tup )
265 assert "KeyedContainer" in o.__class__.__name__
266 nObjects = o.numberOfObjects()
268 self.log.debug(
"Keyed Container %s with %i objects"\
270 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
271 self.objectsOut.append( tup )
272 self.log.debug(
'Done with FSR store, just to send to Writer.')
275 self.log.debug(
'%i FSR objects to Writer'%(len(self.objectsOut)))
276 for ob
in self.objectsOut :
277 self.log.debug(
'\t%s'%(ob[0]))
278 self.q.put( self.objectsOut )
280 self.log.info(
'Valid FSR Store, but no data to send to Writer')
281 self.log.info(
'SendFSR complete')
282 self.q.put(
'END_FSR' )
285 def Receive( self ) :
287 self.log.info(
'Receiving FSR store data...')
288 nc = self._gmpc.nWorkers
290 objects = self.q.get( )
291 if objects ==
'END_FSR' :
298 self.objectsIn.append(o)
301 self.objectsIn.sort(cmp=self.localCmp)
302 self.log.info(
'All FSR data received')
305 def Rebuild( self ) :
307 for sourceNode, path, serialob
in self.objectsIn :
308 self.log.debug(
'Working with %s'%(path))
309 ob = pickle.loads(serialob)
310 if hasattr( ob,
'update' ) :
312 if hasattr( ob,
'numberOfObjects' ) :
313 nCont = ob.numberOfObjects()
314 self.log.debug(
'\t %s has containedObjects : %i'%(
type(ob).__name__, nCont) )
316 self.log.debug(
'Registering Object to : %s'%(path))
317 self.fsr.registerObject( path, ob )
319 self.log.debug(
'Merging Object to : %s'%(path))
320 self.MergeFSRobject( sourceNode, path, ob )
325 self.log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
326 if bool( self.fsr._idp ) :
327 ecount =
'/FileRecords/EventCountFSR'
328 if self.fsr[ecount] :
329 self.fsr[ecount].setOutput( self._gmpc.nIn )
330 self.log.info(
'Event Counter Output set : %s : %i'\
331 %(ecount, self.fsr[ecount].output()) )
333 self.log.debug(
'FSR store reconstructed!')
334 lst = self.fsr.getHistoNames()
337 ob = self.fsr.retrieveObject(l)
338 if hasattr( ob,
'configureDirectAccess' ) :
339 ob.configureDirectAccess()
340 if hasattr( ob,
'containedObjects' ) :
342 self.log.debug(
'\t%s (cont. objects : %i)'\
343 %(l, ob.numberOfObjects()))
345 self.log.debug(
'\t%s'%(l))
346 self.log.info(
'FSR Store fully rebuilt.')
349 def MergeFSRobject( self, sourceNode, path, ob ) :
351 if path ==
'/FileRecords/TimeSpanFSR' :
353 self.ProcessTimeSpanFSR( path, ob )
354 elif path ==
'/FileRecords/EventCountFSR' :
356 self.ProcessEventCountFSR( path, ob )
358 elif "KeyedContainer" in ob.__class__.__name__ :
361 if "LumiFSR" in ob.__class__.__name__ :
362 self.MergeLumiFSR( path, ob )
364 self.log.info(
"Skipping Merge of Keyed Container %s for %s"\
365 %(ob.__class__.__name__,path))
367 def ProcessTimeSpanFSR( self, path, ob ) :
368 ob2 = self.fsr.retrieveObject( path )
369 if ob.containedObjects().size() :
370 sz = ob.containedObjects().size()
371 cob = ob2.containedObjects()[0]
374 for j
in xrange( sz ) :
375 cob = ob.containedObjects()[j]
376 self.log.debug(
'Adding TimeSpanFSR' )
377 if cob.earliest() < min:
379 if cob.latest() > max:
383 tsfsr = gbl.LHCb.TimeSpanFSR()
384 tsfsr.setEarliest( min )
385 tsfsr.setLatest( max )
386 self.fsr[path].clear()
387 self.fsr[path].
add( tsfsr )
389 def ProcessEventCountFSR( self, path, ob ) :
390 self.log.debug(
'Event Count Input Addition')
391 self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
393 def MergeLumiFSR( self, path, keyedC ) :
394 from ROOT
import string
396 keyedContainer = self.fsr.retrieveObject(path)
398 assert keyedContainer.numberOfObjects() == 1
399 l = keyedContainer.containedObject(0)
402 nCont = keyedC.numberOfObjects()
403 for i
in xrange(nCont) :
404 obj = keyedC.containedObject(i)
406 baseLumi.merge( nextLumi )
408 newLumi = gbl.LHCb.LumiFSR()
409 for r
in baseLumi.runs :
410 newLumi.addRunNumber( r )
411 for f
in baseLumi.files :
412 newLumi.addFileID( string(f) )
413 for k
in baseLumi.keys :
414 increment, integral = baseLumi.info[k]
415 newLumi.addInfo(k, increment, integral)
417 self.fsr[path].clear()
419 self.fsr[path].
add(newLumi)
438 for r
in lumi.runNumbers() :
441 for f
in lumi.fileIDs() :
446 sa = s.split(
"info (key/incr/integral) : ")[-1]
447 sa = sa.split(
'/')[:-1]
454 self.keys = self.info.keys()
455 def merge( self, otherLumi ) :
456 assert otherLumi.__class__.__name__ ==
"LumiFSR"
458 for r
in otherLumi.runs :
462 self.runs.append( r )
465 for f
in otherLumi.files :
469 self.files.append( f )
472 for k
in otherLumi.keys :
473 increment, integral = otherLumi.info[k]
475 myIncrement, myIntegral = self.info[k]
476 self.info[k] = ( myIncrement+increment, myIntegral+integral )
478 self.info[k] = ( increment, integral )
480 self.keys = self.info.keys()
481 def __repr__( self ) :
482 s =
"LumiFSR Python class\n"
487 for f
in self.files :
491 increment, integral = self.info[k]
492 s +=
"\t\t%i\t%i\t%i\n"%(k,increment,integral)
497 class PackedCaloHypo() :
499 cl =
'LHCb::PackedCaloHypo'
500 assert o.__class__.__name__ == cl
503 self.cerr = (o.cerr00,o.cerr10,o.cerr11)
504 self.cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
505 self.firstCluster = o.firstCluster
506 self.firstDigit = o.firstDigit
507 self.firstHypo = o.firstHypo
508 self.hypothesis = o.hypothesis
510 self.lastCluster = o.lastCluster
511 self.lastDigit = o.lastDigit
512 self.lastHypo = o.lastHypo
514 self.pos = (o.posE, o.posX, o.posY)
516 def __repr__( self ) :
517 s =
"PackedCaloHypo : \n"
518 s +=
"\tcentX : %s\n"%( str(self.centX) )
519 s +=
"\tcentY : %s\n"%( str(self.centY) )
520 s +=
"\tcerr : %s\n"%( str(self.cerr ) )
521 s +=
"\tcov : %s\n"%( str(self.cov ) )
522 s +=
"\tfirstCluster : %s\n"%( str(self.firstCluster) )
523 s +=
"\tfirstDigit : %s\n"%( str(self.firstDigit) )
524 s +=
"\tfirstHypo : %s\n"%( str(self.firstHypo) )
525 s +=
"\thypothesis : %s\n"%( str(self.hypothesis) )
526 s +=
"\tkey : %s\n"%( str(self.key) )
527 s +=
"\tlastCluster : %s\n"%( str(self.lastCluster) )
528 s +=
"\tlastDigit : %s\n"%( str(self.lastDigit) )
529 s +=
"\tlastHypo : %s\n"%( str(self.lastHypo) )
530 s +=
"\tlh : %s\n"%( str(self.lh ) )
531 s +=
"\tpos : %s\n"%( str(self.pos ) )
532 s +=
"\tz : %s\n"%( str(self.z ) )
533 s +=
"---------------------------------------\n"
538 class SyncMini( object ) :
539 def __init__( self, event, lastEvent=None ) :
542 self.lastEvent =
None
544 self.lastEvent = lastEvent
546 return self.event.is_set()
547 def checkLast( self ) :
548 return self.lastEvent.is_set()
552 def getTime( self ) :
556 def __repr__( self ) :
557 s =
"---------- SyncMini --------------\n"
558 s +=
" Status : %s\n"%(self.event.is_set())
559 s +=
" t : %5.2f\n"%(self.t)
561 s +=
"Last Event : %s\n"%(self.lastEvent.is_set())
562 s +=
"----------------------------------\n"
567 class Syncer( object ) :
568 def __init__( self, nWorkers, log, manyEvents=False,
569 limit=
None, step=
None, firstEvent=
None ) :
574 self.manyEvents = manyEvents
576 for i
in xrange(-2, nWorkers) :
577 self.d[ i ] = SyncMini(
Event(), lastEvent=
Event() )
579 self.limitFirst = firstEvent
581 self.keys = self.d.keys()
582 self.nWorkers = nWorkers
585 def syncAll( self, step="Not specified" ) :
590 sc = self.syncAllRolling( )
594 for i
in xrange( 0, self.limit, self.step ) :
595 if self.checkAll( ) :
596 self.log.info(
'%s : All procs done @ %i s'%(step,i))
599 time.sleep(self.step)
603 self.log.info(
"All processes : %s ok."%(step))
606 self.log.critical(
'Some process is hanging on : %s'%(step))
608 hangString=
"%s : Proc/Stat : %i/%s"%(step,k,self.d[k].
check())
609 self.log.critical( hangString )
612 def syncAllRolling( self ) :
634 if sMini.check()
or sMini.checkLast():
635 if sMini.checkLast()
and sMini.check() :
638 alive = time.time()-begin
639 self.log.info(
"Audit : Node %i alive for %5.2f"\
646 wait = time.time()-sMini.getTime()
647 cond = wait > self.limit
649 cond = wait > self.limitFirst
653 self.log.critical(
'Single event wait : %5.2f'%(wait))
658 if self.checkLastEvents() :
659 self.log.info(
'TC met for event loop')
663 time.sleep(self.step)
665 self.log.info(
"All processes Completed all Events ok")
668 def processHang( self ) :
669 self.log.critical(
'Some proc is hanging during Event processing!')
671 self.log.critical(
"Proc/Stat : %i / %s"%(k,self.d[k].
check()) )
674 def checkAll( self ) :
677 currentStatus = [ mini.check()
for mini
in self.d.values() ]
678 return all( currentStatus )
680 def checkLastEvents( self ) :
682 stat = [ sMini.checkLast()
for sMini
in self.d.values() ]
692 lst = [
'/Event/Gen/Header',
693 '/Event/Rec/Header' ]
696 n = evt[l].evtNumber()
705 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
Essential information of the event used in examples It can be identified by "/Event".
def __init__(self, hostname)