11 from GaudiPython
import gbl, SUCCESS, FAILURE
12 from multiprocessing
import Event
55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
82 assert tup.__class__.__name__ ==
'tuple'
86 hstatus = self.
_gmpc.nWorkers + 1
89 if tup ==
'HISTOS_SENT':
90 self.
log.debug(
'received HISTOS_SENT message')
96 self.
_gmpc.sEvent.set()
97 self.
log.info(
'Writer received all histo bundles and set sync event')
102 Rebuild the Histogram Store from the histos received by Receive()
103 If we have a histo which is not in the store,
104 book and fill it according to self.bookingDict
105 If we have a histo with a matching histo in the store,
106 add the two histos, remembering that aida2root must be used on
107 the Stored histo for compatibility.
111 workerID, histDict = tup
116 for n
in histDict.keys():
118 obj = self.
hvt.retrieve(n)
124 self.
log.warning(
'FAILED TO ADD : %s' % (str(obj)))
133 self.
log.warning(
'FAILED TO REGISTER : %s\tto%s' %
134 (o.__class__.__name__, n))
137 self.
log.warning(
'No booking method for: %s\t%s\t%s' %
138 (n,
type(o), o.__class__.__name__))
141 hs = self.
hvt.getHistoNames()
142 self.
log.info(
'Histo Store Rebuilt : ')
143 self.
log.info(
' Contains %i objects.' % (len(hs)))
144 self.
log.info(
' Errors in Rebuilding : %i' % (errors))
149 Register a DataObject to the Histo Store
151 self.
_gmpc.hvt.registerObject(n, o)
155 Register a ROOT 1D THisto to the Histo Store
157 obj = self.
hvt._ihs.book(n, o.GetTitle(),
158 o.GetXaxis().GetNbins(),
159 o.GetXaxis().GetXmin(),
160 o.GetXaxis().GetXmax())
165 Register a ROOT 2D THisto to the Histo Store
167 obj = self.
hvt._ihs.book(n, o.GetTitle(),
168 o.GetXaxis().GetNbins(),
169 o.GetXaxis().GetXmin(),
170 o.GetXaxis().GetXmax(),
171 o.GetYaxis().GetNbins(),
172 o.GetYaxis().GetXmin(),
173 o.GetYaxis().GetXmax())
178 Register a ROOT 3D THisto to the Histo Store
180 obj = self.
hvt._ihs.book(n, o.GetTitle(),
181 o.GetXaxis().GetXbins(),
182 o.GetXaxis().GetXmin(),
183 o.GetXaxis().GetXmax(),
184 o.GetYaxis().GetXbins(),
185 o.GetYaxis().GetXmin(),
186 o.GetYaxis().GetXmax(),
187 o.GetZaxis().GetXbins(),
188 o.GetZaxis().GetXmin(),
189 o.GetZaxis().GetXmax())
194 Register a ROOT TProfile to the Histo Store
196 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
197 o.GetXaxis().GetNbins(),
198 o.GetXaxis().GetXmin(),
199 o.GetXaxis().GetXmax(), o.GetOption())
204 Register a ROOT TProfile2D to the Histo Store
206 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
207 o.GetXaxis().GetNbins(),
208 o.GetXaxis().GetXmin(),
209 o.GetXaxis().GetXmax(),
210 o.GetYaxis().GetNbins(),
211 o.GetYaxis().GetXmin(),
212 o.GetYaxis().GetXmax())
253 self.
log.info(
'Sending FileRecords...')
254 lst = self.
fsr.getHistoNames()
258 self.
log.info(
'No FileRecords Data to send to Writer.')
259 self.
q.
put(
'END_FSR')
263 if '/FileRecords' in lst:
264 lst.remove(
'/FileRecords')
267 o = self.
fsr.retrieveObject(l)
268 if hasattr(o,
"configureDirectAccess"):
269 o.configureDirectAccess()
271 if self.
_gmpc.nodeID == 0:
272 self.
objectsOut.append((0, l, pickle.dumps(o)))
276 if l ==
'/FileRecords/EventCountFSR':
277 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
279 elif "KeyedContainer" in o.__class__.__name__:
281 nObjects = o.numberOfObjects()
283 self.
log.debug(
"Keyed Container %s with %i objects" %
285 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
288 self.
log.info(
'Ignoring %s in FSR' % o.__class__.__name__)
290 self.
log.debug(
'Done with FSR store, just to send to Writer.')
293 self.
log.debug(
'%i FSR objects to Writer' % (len(self.
objectsOut)))
295 self.
log.debug(
'\t%s' % (ob[0]))
298 self.
log.info(
'Valid FSR Store, but no data to send to Writer')
299 self.
log.info(
'SendFSR complete')
300 self.
q.
put(
'END_FSR')
305 self.
log.info(
'Receiving FSR store data...')
306 nc = self.
_gmpc.nWorkers
308 objects = self.
q.
get()
309 if objects ==
'END_FSR':
320 self.
log.info(
'All FSR data received')
325 for sourceNode, path, serialob
in self.
objectsIn:
326 self.
log.debug(
'Working with %s' % (path))
327 ob = pickle.loads(serialob)
328 if hasattr(ob,
'update'):
330 if hasattr(ob,
'numberOfObjects'):
331 nCont = ob.numberOfObjects()
332 self.
log.debug(
'\t %s has containedObjects : %i' %
333 (
type(ob).__name__, nCont))
335 self.
log.debug(
'Registering Object to : %s' % (path))
336 self.
fsr.registerObject(path, ob)
338 self.
log.debug(
'Merging Object to : %s' % (path))
344 self.
log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
345 if bool(self.
fsr._idp):
346 ecount =
'/FileRecords/EventCountFSR'
348 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
349 self.
log.info(
'Event Counter Output set : %s : %i' %
352 self.
log.debug(
'FSR store reconstructed!')
353 lst = self.
fsr.getHistoNames()
356 ob = self.
fsr.retrieveObject(l)
357 if hasattr(ob,
'configureDirectAccess'):
358 ob.configureDirectAccess()
359 if hasattr(ob,
'containedObjects'):
361 self.
log.debug(
'\t%s (cont. objects : %i)' %
362 (l, ob.numberOfObjects()))
364 self.
log.debug(
'\t%s' % (l))
365 self.
log.info(
'FSR Store fully rebuilt.')
370 if path ==
'/FileRecords/TimeSpanFSR':
373 elif path ==
'/FileRecords/EventCountFSR':
377 elif "KeyedContainer" in ob.__class__.__name__
and "LumiFSR" in ob.__class__.__name__:
382 "Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
385 ob2 = self.
fsr.retrieveObject(path)
386 if ob.containedObjects().
size():
387 sz = ob.containedObjects().
size()
388 cob = ob2.containedObjects()[0]
392 cob = ob.containedObjects()[j]
393 self.
log.debug(
'Adding TimeSpanFSR')
394 if cob.earliest() < min:
396 if cob.latest() > max:
400 tsfsr = gbl.LHCb.TimeSpanFSR()
401 tsfsr.setEarliest(min)
403 self.
fsr[path].clear()
407 self.
log.debug(
'Event Count Input Addition')
408 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
411 from ROOT
import string
413 keyedContainer = self.
fsr.retrieveObject(path)
415 assert keyedContainer.numberOfObjects() == 1
416 l = keyedContainer.containedObject(0)
419 nCont = keyedC.numberOfObjects()
420 for i
in range(nCont):
421 obj = keyedC.containedObject(i)
423 baseLumi.merge(nextLumi)
425 newLumi = gbl.LHCb.LumiFSR()
426 for r
in baseLumi.runs:
427 newLumi.addRunNumber(r)
428 for f
in baseLumi.files:
429 newLumi.addFileID(string(f))
430 for k
in baseLumi.keys:
431 increment, integral = baseLumi.info[k]
432 newLumi.addInfo(k, increment, integral)
434 self.
fsr[path].clear()
436 self.
fsr[path].
add(newLumi)
457 for r
in lumi.runNumbers():
460 for f
in lumi.fileIDs():
465 sa = s.split(
"info (key/incr/integral) : ")[-1]
466 sa = sa.split(
'/')[:-1]
468 k, i, t = rec.split()
472 self.
info[k] = (i, t)
476 assert otherLumi.__class__.__name__ ==
"LumiFSR"
478 for r
in otherLumi.runs:
485 for f
in otherLumi.files:
492 for k
in otherLumi.keys:
493 increment, integral = otherLumi.info[k]
495 myIncrement, myIntegral = self.
info[k]
496 self.
info[k] = (myIncrement + increment, myIntegral + integral)
498 self.
info[k] = (increment, integral)
503 s =
"LumiFSR Python class\n"
506 s +=
"\t\t%i\n" % (r)
509 s +=
"\t\t%s\n" % (f)
512 increment, integral = self.
info[k]
513 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
522 cl =
'LHCb::PackedCaloHypo'
523 assert o.__class__.__name__ == cl
526 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
527 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
537 self.
pos = (o.posE, o.posX, o.posY)
541 s =
"PackedCaloHypo : \n"
542 s +=
"\tcentX : %s\n" % (str(self.
centX))
543 s +=
"\tcentY : %s\n" % (str(self.
centY))
544 s +=
"\tcerr : %s\n" % (str(self.
cerr))
545 s +=
"\tcov : %s\n" % (str(self.
cov))
547 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
548 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
549 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
550 s +=
"\tkey : %s\n" % (str(self.
key))
551 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
552 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
553 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
554 s +=
"\tlh : %s\n" % (str(self.
lh))
555 s +=
"\tpos : %s\n" % (str(self.
pos))
556 s +=
"\tz : %s\n" % (str(self.
z))
557 s +=
"---------------------------------------\n"
573 return self.
event.is_set()
589 s =
"---------- SyncMini --------------\n"
590 s +=
" Status : %s\n" % (self.
event.is_set())
591 s +=
" t : %5.2f\n" % (self.
t)
593 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
594 s +=
"----------------------------------\n"
635 self.
log.info(
'%s : All procs done @ %i s' % (step, i))
638 time.sleep(self.
step)
642 self.
log.info(
"All processes : %s ok." % (step))
645 self.
log.critical(
'Some process is hanging on : %s' % (step))
647 hangString =
"%s : Proc/Stat : %i/%s" % (step, k,
649 self.
log.critical(hangString)
674 if sMini.check()
or sMini.checkLast():
675 if sMini.checkLast()
and sMini.check():
678 alive = time.time() - begin
680 "Audit : Node %i alive for %5.2f" % (k, alive))
686 wait = time.time() - sMini.getTime()
687 cond = wait > self.
limit
693 self.
log.critical(
'Single event wait : %5.2f' % (wait))
699 self.
log.info(
'TC met for event loop')
703 time.sleep(self.
step)
705 self.
log.info(
"All processes Completed all Events ok")
709 self.
log.critical(
'Some proc is hanging during Event processing!')
711 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].
check()))
717 currentStatus = [mini.check()
for mini
in self.
d.values()]
718 return all(currentStatus)
722 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
734 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
737 n = evt[l].evtNumber()
746 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]