13 from multiprocessing
import Event
15 from GaudiPython
import FAILURE, SUCCESS, gbl
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
83 assert tup.__class__.__name__ ==
"tuple"
87 hstatus = self.
_gmpc.nWorkers + 1
90 if tup ==
"HISTOS_SENT":
91 self.
log.debug(
"received HISTOS_SENT message")
97 self.
_gmpc.sEvent.set()
98 self.
log.info(
"Writer received all histo bundles and set sync event")
103 Rebuild the Histogram Store from the histos received by Receive()
104 If we have a histo which is not in the store,
105 book and fill it according to self.bookingDict
106 If we have a histo with a matching histo in the store,
107 add the two histos, remembering that aida2root must be used on
108 the Stored histo for compatibility.
112 workerID, histDict = tup
116 for n
in histDict.keys():
118 obj = self.
hvt.retrieve(n)
124 self.
log.warning(
"FAILED TO ADD : %s" % (str(obj)))
134 "FAILED TO REGISTER : %s\tto%s"
135 % (o.__class__.__name__, n)
140 "No booking method for: %s\t%s\t%s"
141 % (n,
type(o), o.__class__.__name__)
145 hs = self.
hvt.getHistoNames()
146 self.
log.info(
"Histo Store Rebuilt : ")
147 self.
log.info(
" Contains %i objects." % (len(hs)))
148 self.
log.info(
" Errors in Rebuilding : %i" % (errors))
153 Register a DataObject to the Histo Store
155 self.
_gmpc.hvt.registerObject(n, o)
159 Register a ROOT 1D THisto to the Histo Store
161 obj = self.
hvt._ihs.book(
164 o.GetXaxis().GetNbins(),
165 o.GetXaxis().GetXmin(),
166 o.GetXaxis().GetXmax(),
172 Register a ROOT 2D THisto to the Histo Store
174 obj = self.
hvt._ihs.book(
177 o.GetXaxis().GetNbins(),
178 o.GetXaxis().GetXmin(),
179 o.GetXaxis().GetXmax(),
180 o.GetYaxis().GetNbins(),
181 o.GetYaxis().GetXmin(),
182 o.GetYaxis().GetXmax(),
188 Register a ROOT 3D THisto to the Histo Store
190 obj = self.
hvt._ihs.book(
193 o.GetXaxis().GetXbins(),
194 o.GetXaxis().GetXmin(),
195 o.GetXaxis().GetXmax(),
196 o.GetYaxis().GetXbins(),
197 o.GetYaxis().GetXmin(),
198 o.GetYaxis().GetXmax(),
199 o.GetZaxis().GetXbins(),
200 o.GetZaxis().GetXmin(),
201 o.GetZaxis().GetXmax(),
207 Register a ROOT TProfile to the Histo Store
209 obj = self.
hvt._ihs.bookProf(
212 o.GetXaxis().GetNbins(),
213 o.GetXaxis().GetXmin(),
214 o.GetXaxis().GetXmax(),
221 Register a ROOT TProfile2D to the Histo Store
223 obj = self.
hvt._ihs.bookProf(
226 o.GetXaxis().GetNbins(),
227 o.GetXaxis().GetXmin(),
228 o.GetXaxis().GetXmax(),
229 o.GetYaxis().GetNbins(),
230 o.GetYaxis().GetXmin(),
231 o.GetYaxis().GetXmax(),
273 self.
log.info(
"Sending FileRecords...")
274 lst = self.
fsr.getHistoNames()
278 self.
log.info(
"No FileRecords Data to send to Writer.")
279 self.
q.
put(
"END_FSR")
283 if "/FileRecords" in lst:
284 lst.remove(
"/FileRecords")
287 o = self.
fsr.retrieveObject(l)
288 if hasattr(o,
"configureDirectAccess"):
289 o.configureDirectAccess()
291 if self.
_gmpc.nodeID == 0:
292 self.
objectsOut.append((0, l, pickle.dumps(o)))
296 if l ==
"/FileRecords/EventCountFSR":
297 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
299 elif "KeyedContainer" in o.__class__.__name__:
301 nObjects = o.numberOfObjects()
304 "Keyed Container %s with %i objects" % (l, nObjects)
306 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
309 self.
log.info(
"Ignoring %s in FSR" % o.__class__.__name__)
311 self.
log.debug(
"Done with FSR store, just to send to Writer.")
314 self.
log.debug(
"%i FSR objects to Writer" % (len(self.
objectsOut)))
316 self.
log.debug(
"\t%s" % (ob[0]))
319 self.
log.info(
"Valid FSR Store, but no data to send to Writer")
320 self.
log.info(
"SendFSR complete")
321 self.
q.
put(
"END_FSR")
326 self.
log.info(
"Receiving FSR store data...")
327 nc = self.
_gmpc.nWorkers
329 objects = self.
q.
get()
330 if objects ==
"END_FSR":
341 self.
log.info(
"All FSR data received")
346 for sourceNode, path, serialob
in self.
objectsIn:
347 self.
log.debug(
"Working with %s" % (path))
348 ob = pickle.loads(serialob)
349 if hasattr(ob,
"update"):
351 if hasattr(ob,
"numberOfObjects"):
352 nCont = ob.numberOfObjects()
354 "\t %s has containedObjects : %i" % (
type(ob).__name__, nCont)
357 self.
log.debug(
"Registering Object to : %s" % (path))
358 self.
fsr.registerObject(path, ob)
360 self.
log.debug(
"Merging Object to : %s" % (path))
366 self.
log.info(
"FSR Store Rebuilt. Correcting EventCountFSR")
367 if bool(self.
fsr._idp):
368 ecount =
"/FileRecords/EventCountFSR"
370 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
372 "Event Counter Output set : %s : %i"
376 self.
log.debug(
"FSR store reconstructed!")
377 lst = self.
fsr.getHistoNames()
380 ob = self.
fsr.retrieveObject(l)
381 if hasattr(ob,
"configureDirectAccess"):
382 ob.configureDirectAccess()
383 if hasattr(ob,
"containedObjects"):
386 "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
389 self.
log.debug(
"\t%s" % (l))
390 self.
log.info(
"FSR Store fully rebuilt.")
395 if path ==
"/FileRecords/TimeSpanFSR":
398 elif path ==
"/FileRecords/EventCountFSR":
403 "KeyedContainer" in ob.__class__.__name__
404 and "LumiFSR" in ob.__class__.__name__
409 self.
log.info(
"Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
412 ob2 = self.
fsr.retrieveObject(path)
413 if ob.containedObjects().
size():
414 sz = ob.containedObjects().
size()
415 cob = ob2.containedObjects()[0]
419 cob = ob.containedObjects()[j]
420 self.
log.debug(
"Adding TimeSpanFSR")
421 if cob.earliest() < min:
423 if cob.latest() > max:
427 tsfsr = gbl.LHCb.TimeSpanFSR()
428 tsfsr.setEarliest(min)
430 self.
fsr[path].clear()
434 self.
log.debug(
"Event Count Input Addition")
435 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
438 from ROOT
import string
441 keyedContainer = self.
fsr.retrieveObject(path)
443 assert keyedContainer.numberOfObjects() == 1
444 l = keyedContainer.containedObject(0)
447 nCont = keyedC.numberOfObjects()
448 for i
in range(nCont):
449 obj = keyedC.containedObject(i)
451 baseLumi.merge(nextLumi)
453 newLumi = gbl.LHCb.LumiFSR()
454 for r
in baseLumi.runs:
455 newLumi.addRunNumber(r)
456 for f
in baseLumi.files:
457 newLumi.addFileID(string(f))
458 for k
in baseLumi.keys:
459 increment, integral = baseLumi.info[k]
460 newLumi.addInfo(k, increment, integral)
462 self.
fsr[path].clear()
464 self.
fsr[path].
add(newLumi)
485 for r
in lumi.runNumbers():
488 for f
in lumi.fileIDs():
493 sa = s.split(
"info (key/incr/integral) : ")[-1]
494 sa = sa.split(
"/")[:-1]
496 k, i, t = rec.split()
500 self.
info[k] = (i, t)
504 assert otherLumi.__class__.__name__ ==
"LumiFSR"
506 for r
in otherLumi.runs:
513 for f
in otherLumi.files:
520 for k
in otherLumi.keys:
521 increment, integral = otherLumi.info[k]
523 myIncrement, myIntegral = self.
info[k]
524 self.
info[k] = (myIncrement + increment, myIntegral + integral)
526 self.
info[k] = (increment, integral)
531 s =
"LumiFSR Python class\n"
534 s +=
"\t\t%i\n" % (r)
537 s +=
"\t\t%s\n" % (f)
540 increment, integral = self.
info[k]
541 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
550 cl =
"LHCb::PackedCaloHypo"
551 assert o.__class__.__name__ == cl
554 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
555 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
565 self.
pos = (o.posE, o.posX, o.posY)
569 s =
"PackedCaloHypo : \n"
570 s +=
"\tcentX : %s\n" % (str(self.
centX))
571 s +=
"\tcentY : %s\n" % (str(self.
centY))
572 s +=
"\tcerr : %s\n" % (str(self.
cerr))
573 s +=
"\tcov : %s\n" % (str(self.
cov))
575 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
576 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
577 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
578 s +=
"\tkey : %s\n" % (str(self.
key))
579 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
580 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
581 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
582 s +=
"\tlh : %s\n" % (str(self.
lh))
583 s +=
"\tpos : %s\n" % (str(self.
pos))
584 s +=
"\tz : %s\n" % (str(self.
z))
585 s +=
"---------------------------------------\n"
601 return self.
event.is_set()
617 s =
"---------- SyncMini --------------\n"
618 s +=
" Status : %s\n" % (self.
event.is_set())
619 s +=
" t : %5.2f\n" % (self.
t)
621 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
622 s +=
"----------------------------------\n"
631 self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
639 for i
in range(-2, nWorkers):
659 self.
log.info(
"%s : All procs done @ %i s" % (step, i))
662 time.sleep(self.
step)
666 self.
log.info(
"All processes : %s ok." % (step))
669 self.
log.critical(
"Some process is hanging on : %s" % (step))
671 hangString =
"%s : Proc/Stat : %i/%s" % (step, k, self.
d[k].
check())
672 self.
log.critical(hangString)
697 if sMini.check()
or sMini.checkLast():
698 if sMini.checkLast()
and sMini.check():
701 alive = time.time() - begin
702 self.
log.info(
"Audit : Node %i alive for %5.2f" % (k, alive))
708 wait = time.time() - sMini.getTime()
709 cond = wait > self.
limit
715 self.
log.critical(
"Single event wait : %5.2f" % (wait))
721 self.
log.info(
"TC met for event loop")
725 time.sleep(self.
step)
727 self.
log.info(
"All processes Completed all Events ok")
731 self.
log.critical(
"Some proc is hanging during Event processing!")
733 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].
check()))
739 currentStatus = [mini.check()
for mini
in self.
d.values()]
740 return all(currentStatus)
744 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
756 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
759 n = evt[l].evtNumber()
768 n = evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]