1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
52 self.
hvt = self._gmpc.hvt
54 self.
qin = self._gmpc.hq
55 self.
log = self._gmpc.log
71 assert tup.__class__.__name__ ==
'tuple'
72 self.histos.append( tup )
75 hstatus = self._gmpc.nWorkers+1
78 if tup ==
'HISTOS_SENT' :
79 self.log.debug(
'received HISTOS_SENT message')
81 if not hstatus :
break
84 self._gmpc.sEvent.set()
85 self.log.info(
'Writer received all histo bundles and set sync event')
91 Rebuild the Histogram Store from the histos received by Receive()
92 If we have a histo which is not in the store,
93 book and fill it according to self.bookingDict
94 If we have a histo with a matching histo in the store,
95 add the two histos, remembering that aida2root must be used on
96 the Stored histo for compatibility.
100 workerID, histDict = tup
101 added = 0 ; registered = 0; booked = 0
102 for n
in histDict.keys() :
104 obj = self.hvt.retrieve( n )
109 self.log.warning(
'FAILED TO ADD : %s'%(str(obj)))
113 if o.__class__.__name__
in self.bookingDict.keys() :
117 self.log.warning(
'FAILED TO REGISTER : %s\tto%s'\
118 %(o.__class__.__name__, n))
121 self.log.warning(
'No booking method for: %s\t%s\t%s'\
122 %(n,
type(o),o.__class__.__name__) )
125 hs = self.hvt.getHistoNames()
126 self.log.info(
'Histo Store Rebuilt : ' )
127 self.log.info(
' Contains %i objects.'%(len(hs)) )
128 self.log.info(
' Errors in Rebuilding : %i'%(errors) )
134 Register a DataObject to the Histo Store
136 self._gmpc.hvt.registerObject( n, o )
140 Register a ROOT 1D THisto to the Histo Store
142 obj = self.hvt._ihs.book( n, o.GetTitle(),\
143 o.GetXaxis().GetNbins(),\
144 o.GetXaxis().GetXmin(),\
145 o.GetXaxis().GetXmax() )
150 Register a ROOT 2D THisto to the Histo Store
152 obj = self.hvt._ihs.book( n, o.GetTitle(),\
153 o.GetXaxis().GetNbins(),\
154 o.GetXaxis().GetXmin(),\
155 o.GetXaxis().GetXmax(),\
156 o.GetYaxis().GetNbins(),\
157 o.GetYaxis().GetXmin(),\
158 o.GetYaxis().GetXmax() )
163 Register a ROOT 3D THisto to the Histo Store
165 obj = self.hvt._ihs.book( n, o.GetTitle(),\
166 o.GetXaxis().GetXbins(),\
167 o.GetXaxis().GetXmin(),\
168 o.GetXaxis().GetXmax(),\
169 o.GetYaxis().GetXbins(),\
170 o.GetYaxis().GetXmin(),\
171 o.GetYaxis().GetXmax(),\
172 o.GetZaxis().GetXbins(),\
173 o.GetZaxis().GetXmin(),\
174 o.GetZaxis().GetXmax() )
179 Register a ROOT TProfile to the Histo Store
181 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
182 o.GetXaxis().GetNbins(),\
183 o.GetXaxis().GetXmin(),\
184 o.GetXaxis().GetXmax(),\
190 Register a ROOT TProfile2D to the Histo Store
192 obj = self.hvt._ihs.bookProf( n, o.GetTitle(),\
193 o.GetXaxis().GetNbins(),\
194 o.GetXaxis().GetXmin(),\
195 o.GetXaxis().GetXmax(),\
196 o.GetYaxis().GetNbins(),\
197 o.GetYaxis().GetXmin(),\
198 o.GetYaxis().GetXmax() )
207 self.
q = self._gmpc.fq
218 if valA<valB :
return -1
219 elif valA>valB :
return 1
235 self.log.info(
'Sending FileRecords...')
236 lst = self.fsr.getHistoNames()
240 self.log.info(
'No FileRecords Data to send to Writer.')
241 self.q.put(
'END_FSR' )
245 if '/FileRecords' in lst : lst.remove(
'/FileRecords')
248 o = self.fsr.retrieveObject( l )
249 if hasattr(o,
"configureDirectAccess") :
250 o.configureDirectAccess()
252 if self._gmpc.nodeID == 0 :
253 self.objectsOut.append( (0, l, pickle.dumps(o)) )
257 if l ==
'/FileRecords/EventCountFSR' :
258 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
259 self.objectsOut.append( tup )
262 assert "KeyedContainer" in o.__class__.__name__
263 nObjects = o.numberOfObjects()
265 self.log.debug(
"Keyed Container %s with %i objects"\
267 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
268 self.objectsOut.append( tup )
269 self.log.debug(
'Done with FSR store, just to send to Writer.')
272 self.log.debug(
'%i FSR objects to Writer'%(len(self.
objectsOut)))
274 self.log.debug(
'\t%s'%(ob[0]))
277 self.log.info(
'Valid FSR Store, but no data to send to Writer')
278 self.log.info(
'SendFSR complete')
279 self.q.put(
'END_FSR' )
284 self.log.info(
'Receiving FSR store data...')
285 nc = self._gmpc.nWorkers
287 objects = self.q.get( )
288 if objects ==
'END_FSR' :
295 self.objectsIn.append(o)
298 self.objectsIn.sort(cmp=self.
localCmp)
299 self.log.info(
'All FSR data received')
304 for sourceNode, path, serialob
in self.
objectsIn :
305 self.log.debug(
'Working with %s'%(path))
306 ob = pickle.loads(serialob)
307 if hasattr( ob,
'update' ) :
309 if hasattr( ob,
'numberOfObjects' ) :
310 nCont = ob.numberOfObjects()
311 self.log.debug(
'\t %s has containedObjects : %i'%(
type(ob).__name__, nCont) )
313 self.log.debug(
'Registering Object to : %s'%(path))
314 self.fsr.registerObject( path, ob )
316 self.log.debug(
'Merging Object to : %s'%(path))
322 self.log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
323 if bool( self.fsr._idp ) :
324 ecount =
'/FileRecords/EventCountFSR'
325 if self.
fsr[ecount] :
326 self.
fsr[ecount].setOutput( self._gmpc.nIn )
327 self.log.info(
'Event Counter Output set : %s : %i'\
328 %(ecount, self.
fsr[ecount].output()) )
330 self.log.debug(
'FSR store reconstructed!')
331 lst = self.fsr.getHistoNames()
334 ob = self.fsr.retrieveObject(l)
335 if hasattr( ob,
'configureDirectAccess' ) :
336 ob.configureDirectAccess()
337 if hasattr( ob,
'containedObjects' ) :
339 self.log.debug(
'\t%s (cont. objects : %i)'\
340 %(l, ob.numberOfObjects()))
342 self.log.debug(
'\t%s'%(l))
343 self.log.info(
'FSR Store fully rebuilt.')
348 if path ==
'/FileRecords/TimeSpanFSR' :
351 elif path ==
'/FileRecords/EventCountFSR' :
355 elif "KeyedContainer" in ob.__class__.__name__ :
358 if "LumiFSR" in ob.__class__.__name__ :
361 self.log.info(
"Skipping Merge of Keyed Container %s for %s"\
362 %(ob.__class__.__name__,path))
365 ob2 = self.fsr.retrieveObject( path )
366 if ob.containedObjects().size() :
367 sz = ob.containedObjects().size()
368 cob = ob2.containedObjects()[0]
371 for j
in xrange( sz ) :
372 cob = ob.containedObjects()[j]
373 self.log.debug(
'Adding TimeSpanFSR' )
374 if cob.earliest() < min:
376 if cob.latest() > max:
380 tsfsr = gbl.LHCb.TimeSpanFSR()
381 tsfsr.setEarliest( min )
382 tsfsr.setLatest( max )
383 self.
fsr[path].clear()
384 self.
fsr[path].
add( tsfsr )
387 self.log.debug(
'Event Count Input Addition')
388 self.
fsr[path].setInput( self.
fsr[path].input()+ob.input() )
391 from ROOT
import string
393 keyedContainer = self.fsr.retrieveObject(path)
395 assert keyedContainer.numberOfObjects() == 1
396 l = keyedContainer.containedObject(0)
399 nCont = keyedC.numberOfObjects()
400 for i
in xrange(nCont) :
401 obj = keyedC.containedObject(i)
403 baseLumi.merge( nextLumi )
405 newLumi = gbl.LHCb.LumiFSR()
406 for r
in baseLumi.runs :
407 newLumi.addRunNumber( r )
408 for f
in baseLumi.files :
409 newLumi.addFileID(
string(f) )
410 for k
in baseLumi.keys :
411 increment, integral = baseLumi.info[k]
412 newLumi.addInfo(k, increment, integral)
414 self.
fsr[path].clear()
416 self.
fsr[path].
add(newLumi)
435 for r
in lumi.runNumbers() :
438 for f
in lumi.fileIDs() :
443 sa = s.split(
"info (key/incr/integral) : ")[-1]
444 sa = sa.split(
'/')[:-1]
451 self.
keys = self.info.keys()
453 assert otherLumi.__class__.__name__ ==
"LumiFSR"
455 for r
in otherLumi.runs :
459 self.runs.append( r )
462 for f
in otherLumi.files :
466 self.files.append( f )
469 for k
in otherLumi.keys :
470 increment, integral = otherLumi.info[k]
472 myIncrement, myIntegral = self.
info[k]
473 self.
info[k] = ( myIncrement+increment, myIntegral+integral )
475 self.
info[k] = ( increment, integral )
477 self.
keys = self.info.keys()
479 s =
"LumiFSR Python class\n"
484 for f
in self.
files :
488 increment, integral = self.
info[k]
489 s +=
"\t\t%i\t%i\t%i\n"%(k,increment,integral)
496 cl =
'LHCb::PackedCaloHypo'
497 assert o.__class__.__name__ == cl
500 self.
cerr = (o.cerr00,o.cerr10,o.cerr11)
501 self.
cov = (o.cov00,o.cov10,o.cov11,o.cov20,o.cov21,o.cov22)
511 self.
pos = (o.posE, o.posX, o.posY)
514 s =
"PackedCaloHypo : \n"
515 s +=
"\tcentX : %s\n"%( str(self.
centX) )
516 s +=
"\tcentY : %s\n"%( str(self.
centY) )
517 s +=
"\tcerr : %s\n"%( str(self.
cerr ) )
518 s +=
"\tcov : %s\n"%( str(self.
cov ) )
520 s +=
"\tfirstDigit : %s\n"%( str(self.
firstDigit) )
521 s +=
"\tfirstHypo : %s\n"%( str(self.
firstHypo) )
522 s +=
"\thypothesis : %s\n"%( str(self.
hypothesis) )
523 s +=
"\tkey : %s\n"%( str(self.
key) )
524 s +=
"\tlastCluster : %s\n"%( str(self.
lastCluster) )
525 s +=
"\tlastDigit : %s\n"%( str(self.
lastDigit) )
526 s +=
"\tlastHypo : %s\n"%( str(self.
lastHypo) )
527 s +=
"\tlh : %s\n"%( str(self.
lh ) )
528 s +=
"\tpos : %s\n"%( str(self.
pos ) )
529 s +=
"\tz : %s\n"%( str(self.
z ) )
530 s +=
"---------------------------------------\n"
543 return self.event.is_set()
545 return self.lastEvent.is_set()
554 s =
"---------- SyncMini --------------\n"
555 s +=
" Status : %s\n"%(self.event.is_set())
556 s +=
" t : %5.2f\n"%(self.
t)
558 s +=
"Last Event : %s\n"%(self.lastEvent.is_set())
559 s +=
"----------------------------------\n"
565 def __init__( self, nWorkers, log, manyEvents=False,
566 limit=
None, step=
None, firstEvent=
None ) :
572 for i
in xrange(-2, nWorkers) :
573 self.
d[ i ] =
SyncMini( Event(), lastEvent=Event() )
589 for i
in xrange( 0, self.
limit, self.
step ) :
591 self.log.info(
'%s : All procs done @ %i s'%(step,i))
594 time.sleep(self.
step)
598 self.log.info(
"All processes : %s ok."%(step))
601 self.log.critical(
'Some process is hanging on : %s'%(step))
603 hangString=
"%s : Proc/Stat : %i/%s"%(step,k,self.
d[k].
check())
604 self.log.critical( hangString )
629 if sMini.check()
or sMini.checkLast():
630 if sMini.checkLast()
and sMini.check() :
633 alive = time.time()-begin
634 self.log.info(
"Audit : Node %i alive for %5.2f"\
641 wait = time.time()-sMini.getTime()
642 cond = wait > self.
limit
648 self.log.critical(
'Single event wait : %5.2f'%(wait))
654 self.log.info(
'TC met for event loop')
658 time.sleep(self.
step)
660 self.log.info(
"All processes Completed all Events ok")
664 self.log.critical(
'Some proc is hanging during Event processing!')
666 self.log.critical(
"Proc/Stat : %i / %s"%(k,self.
d[k].
check()) )
672 currentStatus = [ mini.check()
for mini
in self.d.values() ]
673 return all( currentStatus )
677 stat = [ sMini.checkLast()
for sMini
in self.d.values() ]
687 lst = [
'/Event/Gen/Header',
688 '/Event/Rec/Header' ]
691 n = evt[l].evtNumber()
700 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]