13 from multiprocessing
import Event
15 from GaudiPython
import FAILURE, SUCCESS, gbl
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
83 assert tup.__class__.__name__ ==
"tuple"
87 hstatus = self.
_gmpc.nWorkers + 1
90 if tup ==
"HISTOS_SENT":
91 self.
log.debug(
"received HISTOS_SENT message")
97 self.
_gmpc.sEvent.set()
98 self.
log.info(
"Writer received all histo bundles and set sync event")
103 Rebuild the Histogram Store from the histos received by Receive()
104 If we have a histo which is not in the store,
105 book and fill it according to self.bookingDict
106 If we have a histo with a matching histo in the store,
107 add the two histos, remembering that aida2root must be used on
108 the Stored histo for compatibility.
112 workerID, histDict = tup
117 for n
in histDict.keys():
119 obj = self.
hvt.retrieve(n)
125 self.
log.warning(
"FAILED TO ADD : %s" % (str(obj)))
135 "FAILED TO REGISTER : %s\tto%s"
136 % (o.__class__.__name__, n)
141 "No booking method for: %s\t%s\t%s"
142 % (n,
type(o), o.__class__.__name__)
146 hs = self.
hvt.getHistoNames()
147 self.
log.info(
"Histo Store Rebuilt : ")
148 self.
log.info(
" Contains %i objects." % (len(hs)))
149 self.
log.info(
" Errors in Rebuilding : %i" % (errors))
154 Register a DataObject to the Histo Store
156 self.
_gmpc.hvt.registerObject(n, o)
160 Register a ROOT 1D THisto to the Histo Store
162 obj = self.
hvt._ihs.book(
165 o.GetXaxis().GetNbins(),
166 o.GetXaxis().GetXmin(),
167 o.GetXaxis().GetXmax(),
173 Register a ROOT 2D THisto to the Histo Store
175 obj = self.
hvt._ihs.book(
178 o.GetXaxis().GetNbins(),
179 o.GetXaxis().GetXmin(),
180 o.GetXaxis().GetXmax(),
181 o.GetYaxis().GetNbins(),
182 o.GetYaxis().GetXmin(),
183 o.GetYaxis().GetXmax(),
189 Register a ROOT 3D THisto to the Histo Store
191 obj = self.
hvt._ihs.book(
194 o.GetXaxis().GetXbins(),
195 o.GetXaxis().GetXmin(),
196 o.GetXaxis().GetXmax(),
197 o.GetYaxis().GetXbins(),
198 o.GetYaxis().GetXmin(),
199 o.GetYaxis().GetXmax(),
200 o.GetZaxis().GetXbins(),
201 o.GetZaxis().GetXmin(),
202 o.GetZaxis().GetXmax(),
208 Register a ROOT TProfile to the Histo Store
210 obj = self.
hvt._ihs.bookProf(
213 o.GetXaxis().GetNbins(),
214 o.GetXaxis().GetXmin(),
215 o.GetXaxis().GetXmax(),
222 Register a ROOT TProfile2D to the Histo Store
224 obj = self.
hvt._ihs.bookProf(
227 o.GetXaxis().GetNbins(),
228 o.GetXaxis().GetXmin(),
229 o.GetXaxis().GetXmax(),
230 o.GetYaxis().GetNbins(),
231 o.GetYaxis().GetXmin(),
232 o.GetYaxis().GetXmax(),
274 self.
log.info(
"Sending FileRecords...")
275 lst = self.
fsr.getHistoNames()
279 self.
log.info(
"No FileRecords Data to send to Writer.")
280 self.
q.
put(
"END_FSR")
284 if "/FileRecords" in lst:
285 lst.remove(
"/FileRecords")
288 o = self.
fsr.retrieveObject(l)
289 if hasattr(o,
"configureDirectAccess"):
290 o.configureDirectAccess()
292 if self.
_gmpc.nodeID == 0:
293 self.
objectsOut.append((0, l, pickle.dumps(o)))
297 if l ==
"/FileRecords/EventCountFSR":
298 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
300 elif "KeyedContainer" in o.__class__.__name__:
302 nObjects = o.numberOfObjects()
305 "Keyed Container %s with %i objects" % (l, nObjects)
307 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
310 self.
log.info(
"Ignoring %s in FSR" % o.__class__.__name__)
312 self.
log.debug(
"Done with FSR store, just to send to Writer.")
315 self.
log.debug(
"%i FSR objects to Writer" % (len(self.
objectsOut)))
317 self.
log.debug(
"\t%s" % (ob[0]))
320 self.
log.info(
"Valid FSR Store, but no data to send to Writer")
321 self.
log.info(
"SendFSR complete")
322 self.
q.
put(
"END_FSR")
327 self.
log.info(
"Receiving FSR store data...")
328 nc = self.
_gmpc.nWorkers
330 objects = self.
q.
get()
331 if objects ==
"END_FSR":
342 self.
log.info(
"All FSR data received")
347 for sourceNode, path, serialob
in self.
objectsIn:
348 self.
log.debug(
"Working with %s" % (path))
349 ob = pickle.loads(serialob)
350 if hasattr(ob,
"update"):
352 if hasattr(ob,
"numberOfObjects"):
353 nCont = ob.numberOfObjects()
355 "\t %s has containedObjects : %i" % (
type(ob).__name__, nCont)
358 self.
log.debug(
"Registering Object to : %s" % (path))
359 self.
fsr.registerObject(path, ob)
361 self.
log.debug(
"Merging Object to : %s" % (path))
367 self.
log.info(
"FSR Store Rebuilt. Correcting EventCountFSR")
368 if bool(self.
fsr._idp):
369 ecount =
"/FileRecords/EventCountFSR"
371 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
373 "Event Counter Output set : %s : %i"
377 self.
log.debug(
"FSR store reconstructed!")
378 lst = self.
fsr.getHistoNames()
381 ob = self.
fsr.retrieveObject(l)
382 if hasattr(ob,
"configureDirectAccess"):
383 ob.configureDirectAccess()
384 if hasattr(ob,
"containedObjects"):
387 "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
390 self.
log.debug(
"\t%s" % (l))
391 self.
log.info(
"FSR Store fully rebuilt.")
396 if path ==
"/FileRecords/TimeSpanFSR":
399 elif path ==
"/FileRecords/EventCountFSR":
404 "KeyedContainer" in ob.__class__.__name__
405 and "LumiFSR" in ob.__class__.__name__
410 self.
log.info(
"Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
413 ob2 = self.
fsr.retrieveObject(path)
414 if ob.containedObjects().
size():
415 sz = ob.containedObjects().
size()
416 cob = ob2.containedObjects()[0]
420 cob = ob.containedObjects()[j]
421 self.
log.debug(
"Adding TimeSpanFSR")
422 if cob.earliest() < min:
424 if cob.latest() > max:
428 tsfsr = gbl.LHCb.TimeSpanFSR()
429 tsfsr.setEarliest(min)
431 self.
fsr[path].clear()
435 self.
log.debug(
"Event Count Input Addition")
436 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
439 from ROOT
import string
442 keyedContainer = self.
fsr.retrieveObject(path)
444 assert keyedContainer.numberOfObjects() == 1
445 l = keyedContainer.containedObject(0)
448 nCont = keyedC.numberOfObjects()
449 for i
in range(nCont):
450 obj = keyedC.containedObject(i)
452 baseLumi.merge(nextLumi)
454 newLumi = gbl.LHCb.LumiFSR()
455 for r
in baseLumi.runs:
456 newLumi.addRunNumber(r)
457 for f
in baseLumi.files:
458 newLumi.addFileID(string(f))
459 for k
in baseLumi.keys:
460 increment, integral = baseLumi.info[k]
461 newLumi.addInfo(k, increment, integral)
463 self.
fsr[path].clear()
465 self.
fsr[path].
add(newLumi)
486 for r
in lumi.runNumbers():
489 for f
in lumi.fileIDs():
494 sa = s.split(
"info (key/incr/integral) : ")[-1]
495 sa = sa.split(
"/")[:-1]
497 k, i, t = rec.split()
501 self.
info[k] = (i, t)
505 assert otherLumi.__class__.__name__ ==
"LumiFSR"
507 for r
in otherLumi.runs:
514 for f
in otherLumi.files:
521 for k
in otherLumi.keys:
522 increment, integral = otherLumi.info[k]
524 myIncrement, myIntegral = self.
info[k]
525 self.
info[k] = (myIncrement + increment, myIntegral + integral)
527 self.
info[k] = (increment, integral)
532 s =
"LumiFSR Python class\n"
535 s +=
"\t\t%i\n" % (r)
538 s +=
"\t\t%s\n" % (f)
541 increment, integral = self.
info[k]
542 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
551 cl =
"LHCb::PackedCaloHypo"
552 assert o.__class__.__name__ == cl
555 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
556 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
566 self.
pos = (o.posE, o.posX, o.posY)
570 s =
"PackedCaloHypo : \n"
571 s +=
"\tcentX : %s\n" % (str(self.
centX))
572 s +=
"\tcentY : %s\n" % (str(self.
centY))
573 s +=
"\tcerr : %s\n" % (str(self.
cerr))
574 s +=
"\tcov : %s\n" % (str(self.
cov))
576 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
577 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
578 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
579 s +=
"\tkey : %s\n" % (str(self.
key))
580 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
581 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
582 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
583 s +=
"\tlh : %s\n" % (str(self.
lh))
584 s +=
"\tpos : %s\n" % (str(self.
pos))
585 s +=
"\tz : %s\n" % (str(self.
z))
586 s +=
"---------------------------------------\n"
602 return self.
event.is_set()
618 s =
"---------- SyncMini --------------\n"
619 s +=
" Status : %s\n" % (self.
event.is_set())
620 s +=
" t : %5.2f\n" % (self.
t)
622 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
623 s +=
"----------------------------------\n"
632 self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
640 for i
in range(-2, nWorkers):
660 self.
log.info(
"%s : All procs done @ %i s" % (step, i))
663 time.sleep(self.
step)
667 self.
log.info(
"All processes : %s ok." % (step))
670 self.
log.critical(
"Some process is hanging on : %s" % (step))
672 hangString =
"%s : Proc/Stat : %i/%s" % (step, k, self.
d[k].
check())
673 self.
log.critical(hangString)
698 if sMini.check()
or sMini.checkLast():
699 if sMini.checkLast()
and sMini.check():
702 alive = time.time() - begin
703 self.
log.info(
"Audit : Node %i alive for %5.2f" % (k, alive))
709 wait = time.time() - sMini.getTime()
710 cond = wait > self.
limit
716 self.
log.critical(
"Single event wait : %5.2f" % (wait))
722 self.
log.info(
"TC met for event loop")
726 time.sleep(self.
step)
728 self.
log.info(
"All processes Completed all Events ok")
732 self.
log.critical(
"Some proc is hanging during Event processing!")
734 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].
check()))
740 currentStatus = [mini.check()
for mini
in self.
d.values()]
741 return all(currentStatus)
745 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
757 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
760 n = evt[l].evtNumber()
769 n = evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]