1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
52 self.
hvt = self._gmpc.hvt
54 self.
qin = self._gmpc.hq
55 self.
log = self._gmpc.log
71 assert tup.__class__.__name__ ==
'tuple'
72 self.histos.append( tup )
75 hstatus = self._gmpc.nWorkers+1
78 if tup ==
'HISTOS_SENT' :
79 self.log.debug(
'received HISTOS_SENT message')
81 if not hstatus :
break
84 self._gmpc.sEvent.set()
85 self.log.info(
'Writer received all histo bundles and set sync event')
91 Rebuild the Histogram Store from the histos received by Receive()
92 If we have a histo which is not in the store,
93 book and fill it according to self.bookingDict
94 If we have a histo with a matching histo in the store,
95 add the two histos, remembering that aida2root must be used on
96 the Stored histo for compatibility.
100 workerID, histDict = tup
101 added = 0 ; registered = 0; booked = 0
103 for n
in histDict.keys() :
105 obj = self.hvt.retrieve( n )
111 self.log.warning(
'FAILED TO ADD : %s'%(str(obj)))
116 if o.__class__.__name__
in self.bookingDict.keys() :
120 self.log.warning(
'FAILED TO REGISTER : %s\tto%s'\
121 %(o.__class__.__name__, n))
124 self.log.warning(
'No booking method for: %s\t%s\t%s'\
125 %(n,
type(o),o.__class__.__name__) )
128 hs = self.hvt.getHistoNames()
129 self.log.info(
'Histo Store Rebuilt : ' )
130 self.log.info(
' Contains %i objects.'%(len(hs)) )
131 self.log.info(
' Errors in Rebuilding : %i'%(errors) )
137 Register a DataObject to the Histo Store
139 self._gmpc.hvt.registerObject( n, o )
143 Register a ROOT 1D THisto to the Histo Store
145 obj = self.hvt._ihs.book( n, o.GetTitle(),\
146 o.GetXaxis().GetNbins(),\
147 o.GetXaxis().GetXmin(),\
148 o.GetXaxis().GetXmax() )
153 Register a ROOT 2D THisto to the Histo Store
155 obj = self.hvt._ihs.book( n, o.GetTitle(),\
156 o.GetXaxis().GetNbins(),\
157 o.GetXaxis().GetXmin(),\
158 o.GetXaxis().GetXmax(),\
159 o.GetYaxis().GetNbins(),\
160 o.GetYaxis().GetXmin(),\
161 o.GetYaxis().GetXmax() )
166 Register a ROOT 3D THisto to the Histo Store
168 obj = self.hvt._ihs.book( n, o.GetTitle(),\
169 o.GetXaxis().GetXbins(),\
170 o.GetXaxis().GetXmin(),\
171 o.GetXaxis().GetXmax(),\
172 o.GetYaxis().GetXbins(),\
173 o.GetYaxis().GetXmin(),\
174 o.GetYaxis().GetXmax(),\
175 o.GetZaxis().GetXbins(),\
176 o.GetZaxis().GetXmin(),\
177 o.GetZaxis().GetXmax() )
182 Register a ROOT TProfile to the Histo Store
184 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
185 o.GetXaxis().GetNbins(),\
186 o.GetXaxis().GetXmin(),\
187 o.GetXaxis().GetXmax(),\
193 Register a ROOT TProfile2D to the Histo Store
195 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
196 o.GetXaxis().GetNbins(),\
197 o.GetXaxis().GetXmin(),\
198 o.GetXaxis().GetXmax(),\
199 o.GetYaxis().GetNbins(),\
200 o.GetYaxis().GetXmin(),\
201 o.GetYaxis().GetXmax() )
210 self.
q = self._gmpc.fq
221 if valA<valB :
return -1
222 elif valA>valB :
return 1
238 self.log.info(
'Sending FileRecords...')
239 lst = self.fsr.getHistoNames()
243 self.log.info(
'No FileRecords Data to send to Writer.')
244 self.q.put(
'END_FSR' )
248 if '/FileRecords' in lst : lst.remove(
'/FileRecords')
251 o = self.fsr.retrieveObject( l )
252 if hasattr(o,
"configureDirectAccess") :
253 o.configureDirectAccess()
255 if self._gmpc.nodeID == 0 :
256 self.objectsOut.append( (0, l, pickle.dumps(o)) )
260 if l ==
'/FileRecords/EventCountFSR' :
261 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
262 self.objectsOut.append( tup )
265 assert "KeyedContainer" in o.__class__.__name__
266 nObjects = o.numberOfObjects()
268 self.log.debug(
"Keyed Container %s with %i objects"\
270 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
271 self.objectsOut.append( tup )
272 self.log.debug(
'Done with FSR store, just to send to Writer.')
275 self.log.debug(
'%i FSR objects to Writer'%(len(self.
objectsOut)))
277 self.log.debug(
'\t%s'%(ob[0]))
280 self.log.info(
'Valid FSR Store, but no data to send to Writer')
281 self.log.info(
'SendFSR complete')
282 self.q.put(
'END_FSR' )
287 self.log.info(
'Receiving FSR store data...')
288 nc = self._gmpc.nWorkers
290 objects = self.q.get( )
291 if objects ==
'END_FSR' :
298 self.objectsIn.append(o)
301 self.objectsIn.sort(cmp=self.
localCmp)
302 self.log.info(
'All FSR data received')
307 for sourceNode, path, serialob
in self.
objectsIn :
308 self.log.debug(
'Working with %s'%(path))
309 ob = pickle.loads(serialob)
310 if hasattr( ob,
'update' ) :
312 if hasattr( ob,
'numberOfObjects' ) :
313 nCont = ob.numberOfObjects()
314 self.log.debug(
'\t %s has containedObjects : %i'%(
type(ob).__name__, nCont) )
316 self.log.debug(
'Registering Object to : %s'%(path))
317 self.fsr.registerObject( path, ob )
319 self.log.debug(
'Merging Object to : %s'%(path))
325 self.log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
326 if bool( self.fsr._idp ) :
327 ecount =
'/FileRecords/EventCountFSR'
328 if self.
fsr[ecount] :
329 self.
fsr[ecount].setOutput( self._gmpc.nIn )
330 self.log.info(
'Event Counter Output set : %s : %i'\
331 %(ecount, self.
fsr[ecount].output()) )
333 self.log.debug(
'FSR store reconstructed!')
334 lst = self.fsr.getHistoNames()
337 ob = self.fsr.retrieveObject(l)
338 if hasattr( ob,
'configureDirectAccess' ) :
339 ob.configureDirectAccess()
340 if hasattr( ob,
'containedObjects' ) :
342 self.log.debug(
'\t%s (cont. objects : %i)'\
343 %(l, ob.numberOfObjects()))
345 self.log.debug(
'\t%s'%(l))
346 self.log.info(
'FSR Store fully rebuilt.')
351 if path ==
'/FileRecords/TimeSpanFSR' :
354 elif path ==
'/FileRecords/EventCountFSR' :
358 elif "KeyedContainer" in ob.__class__.__name__ :
361 if "LumiFSR" in ob.__class__.__name__ :
364 self.log.info(
"Skipping Merge of Keyed Container %s for %s"\
365 %(ob.__class__.__name__,path))
368 ob2 = self.fsr.retrieveObject( path )
369 if ob.containedObjects().size() :
370 sz = ob.containedObjects().size()
371 cob = ob2.containedObjects()[0]
374 for j
in xrange( sz ) :
375 cob = ob.containedObjects()[j]
376 self.log.debug(
'Adding TimeSpanFSR' )
377 if cob.earliest() < min:
379 if cob.latest() > max:
383 tsfsr = gbl.LHCb.TimeSpanFSR()
384 tsfsr.setEarliest( min )
385 tsfsr.setLatest( max )
386 self.
fsr[path].clear()
387 self.
fsr[path].
add( tsfsr )
390 self.log.debug(
'Event Count Input Addition')
391 self.
fsr[path].setInput( self.
fsr[path].input()+ob.input() )
394 from ROOT
import string
396 keyedContainer = self.fsr.retrieveObject(path)
398 assert keyedContainer.numberOfObjects() == 1
399 l = keyedContainer.containedObject(0)
402 nCont = keyedC.numberOfObjects()
403 for i
in xrange(nCont) :
404 obj = keyedC.containedObject(i)
406 baseLumi.merge( nextLumi )
408 newLumi = gbl.LHCb.LumiFSR()
409 for r
in baseLumi.runs :
410 newLumi.addRunNumber( r )
411 for f
in baseLumi.files :
412 newLumi.addFileID( string(f) )
413 for k
in baseLumi.keys :
414 increment, integral = baseLumi.info[k]
415 newLumi.addInfo(k, increment, integral)
417 self.
fsr[path].clear()
419 self.
fsr[path].
add(newLumi)
438 for r
in lumi.runNumbers() :
441 for f
in lumi.fileIDs() :
446 sa = s.split(
"info (key/incr/integral) : ")[-1]
447 sa = sa.split(
'/')[:-1]
454 self.
keys = self.info.keys()
456 assert otherLumi.__class__.__name__ ==
"LumiFSR"
458 for r
in otherLumi.runs :
462 self.runs.append( r )
465 for f
in otherLumi.files :
469 self.files.append( f )
472 for k
in otherLumi.keys :
473 increment, integral = otherLumi.info[k]
475 myIncrement, myIntegral = self.
info[k]
476 self.
info[k] = ( myIncrement+increment, myIntegral+integral )
478 self.
info[k] = ( increment, integral )
480 self.
keys = self.info.keys()
482 s =
"LumiFSR Python class\n"
487 for f
in self.
files :
491 increment, integral = self.
info[k]
492 s +=
"\t\t%i\t%i\t%i\n"%(k,increment,integral)
499 cl =
'LHCb::PackedCaloHypo'
500 assert o.__class__.__name__ == cl
503 self.
cerr = (o.cerr00,o.cerr10,o.cerr11)
504 self.
cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
514 self.
pos = (o.posE, o.posX, o.posY)
517 s =
"PackedCaloHypo : \n"
518 s +=
"\tcentX : %s\n"%( str(self.
centX) )
519 s +=
"\tcentY : %s\n"%( str(self.
centY) )
520 s +=
"\tcerr : %s\n"%( str(self.
cerr ) )
521 s +=
"\tcov : %s\n"%( str(self.
cov ) )
523 s +=
"\tfirstDigit : %s\n"%( str(self.
firstDigit) )
524 s +=
"\tfirstHypo : %s\n"%( str(self.
firstHypo) )
525 s +=
"\thypothesis : %s\n"%( str(self.
hypothesis) )
526 s +=
"\tkey : %s\n"%( str(self.
key) )
527 s +=
"\tlastCluster : %s\n"%( str(self.
lastCluster) )
528 s +=
"\tlastDigit : %s\n"%( str(self.
lastDigit) )
529 s +=
"\tlastHypo : %s\n"%( str(self.
lastHypo) )
530 s +=
"\tlh : %s\n"%( str(self.
lh ) )
531 s +=
"\tpos : %s\n"%( str(self.
pos ) )
532 s +=
"\tz : %s\n"%( str(self.
z ) )
533 s +=
"---------------------------------------\n"
546 return self.event.is_set()
548 return self.lastEvent.is_set()
557 s =
"---------- SyncMini --------------\n"
558 s +=
" Status : %s\n"%(self.event.is_set())
559 s +=
" t : %5.2f\n"%(self.
t)
561 s +=
"Last Event : %s\n"%(self.lastEvent.is_set())
562 s +=
"----------------------------------\n"
568 def __init__( self, nWorkers, log, manyEvents=False,
569 limit=
None, step=
None, firstEvent=
None ) :
576 for i
in xrange(-2, nWorkers) :
577 self.
d[ i ] =
SyncMini( Event(), lastEvent=Event() )
594 for i
in xrange( 0, self.
limit, self.
step ) :
596 self.log.info(
'%s : All procs done @ %i s'%(step,i))
599 time.sleep(self.
step)
603 self.log.info(
"All processes : %s ok."%(step))
606 self.log.critical(
'Some process is hanging on : %s'%(step))
608 hangString=
"%s : Proc/Stat : %i/%s"%(step,k,self.
d[k].
check())
609 self.log.critical( hangString )
634 if sMini.check()
or sMini.checkLast():
635 if sMini.checkLast()
and sMini.check() :
638 alive = time.time()-begin
639 self.log.info(
"Audit : Node %i alive for %5.2f"\
646 wait = time.time()-sMini.getTime()
647 cond = wait > self.
limit
653 self.log.critical(
'Single event wait : %5.2f'%(wait))
659 self.log.info(
'TC met for event loop')
663 time.sleep(self.
step)
665 self.log.info(
"All processes Completed all Events ok")
669 self.log.critical(
'Some proc is hanging during Event processing!')
671 self.log.critical(
"Proc/Stat : %i / %s"%(k,self.
d[k].
check()) )
677 currentStatus = [ mini.check()
for mini
in self.d.values() ]
678 return all( currentStatus )
682 stat = [ sMini.checkLast()
for sMini
in self.d.values() ]
692 lst = [
'/Event/Gen/Header',
693 '/Event/Rec/Header' ]
696 n = evt[l].evtNumber()
705 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]