13 from multiprocessing
import Event
15 from GaudiPython
import FAILURE, SUCCESS, gbl
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
83 assert tup.__class__.__name__ ==
"tuple"
87 hstatus = self.
_gmpc.nWorkers + 1
90 if tup ==
"HISTOS_SENT":
91 self.
log.debug(
"received HISTOS_SENT message")
97 self.
_gmpc.sEvent.set()
98 self.
log.info(
"Writer received all histo bundles and set sync event")
103 Rebuild the Histogram Store from the histos received by Receive()
104 If we have a histo which is not in the store,
105 book and fill it according to self.bookingDict
106 If we have a histo with a matching histo in the store,
107 add the two histos, remembering that aida2root must be used on
108 the Stored histo for compatibility.
112 workerID, histDict = tup
116 for n
in histDict.keys():
118 obj = self.
hvt.retrieve(n)
124 self.
log.warning(
"FAILED TO ADD : %s" % (str(obj)))
133 "FAILED TO REGISTER : %s\tto%s"
134 % (o.__class__.__name__, n)
139 "No booking method for: %s\t%s\t%s"
140 % (n,
type(o), o.__class__.__name__)
144 hs = self.
hvt.getHistoNames()
145 self.
log.info(
"Histo Store Rebuilt : ")
146 self.
log.info(
" Contains %i objects." % (len(hs)))
147 self.
log.info(
" Errors in Rebuilding : %i" % (errors))
152 Register a DataObject to the Histo Store
154 self.
_gmpc.hvt.registerObject(n, o)
158 Register a ROOT 1D THisto to the Histo Store
160 obj = self.
hvt._ihs.book(
163 o.GetXaxis().GetNbins(),
164 o.GetXaxis().GetXmin(),
165 o.GetXaxis().GetXmax(),
171 Register a ROOT 2D THisto to the Histo Store
173 obj = self.
hvt._ihs.book(
176 o.GetXaxis().GetNbins(),
177 o.GetXaxis().GetXmin(),
178 o.GetXaxis().GetXmax(),
179 o.GetYaxis().GetNbins(),
180 o.GetYaxis().GetXmin(),
181 o.GetYaxis().GetXmax(),
187 Register a ROOT 3D THisto to the Histo Store
189 obj = self.
hvt._ihs.book(
192 o.GetXaxis().GetXbins(),
193 o.GetXaxis().GetXmin(),
194 o.GetXaxis().GetXmax(),
195 o.GetYaxis().GetXbins(),
196 o.GetYaxis().GetXmin(),
197 o.GetYaxis().GetXmax(),
198 o.GetZaxis().GetXbins(),
199 o.GetZaxis().GetXmin(),
200 o.GetZaxis().GetXmax(),
206 Register a ROOT TProfile to the Histo Store
208 obj = self.
hvt._ihs.bookProf(
211 o.GetXaxis().GetNbins(),
212 o.GetXaxis().GetXmin(),
213 o.GetXaxis().GetXmax(),
220 Register a ROOT TProfile2D to the Histo Store
222 obj = self.
hvt._ihs.bookProf(
225 o.GetXaxis().GetNbins(),
226 o.GetXaxis().GetXmin(),
227 o.GetXaxis().GetXmax(),
228 o.GetYaxis().GetNbins(),
229 o.GetYaxis().GetXmin(),
230 o.GetYaxis().GetXmax(),
272 self.
log.info(
"Sending FileRecords...")
273 lst = self.
fsr.getHistoNames()
277 self.
log.info(
"No FileRecords Data to send to Writer.")
278 self.
q.
put(
"END_FSR")
282 if "/FileRecords" in lst:
283 lst.remove(
"/FileRecords")
286 o = self.
fsr.retrieveObject(l)
287 if hasattr(o,
"configureDirectAccess"):
288 o.configureDirectAccess()
290 if self.
_gmpc.nodeID == 0:
291 self.
objectsOut.append((0, l, pickle.dumps(o)))
295 if l ==
"/FileRecords/EventCountFSR":
296 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
298 elif "KeyedContainer" in o.__class__.__name__:
300 nObjects = o.numberOfObjects()
303 "Keyed Container %s with %i objects" % (l, nObjects)
305 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
308 self.
log.info(
"Ignoring %s in FSR" % o.__class__.__name__)
310 self.
log.debug(
"Done with FSR store, just to send to Writer.")
313 self.
log.debug(
"%i FSR objects to Writer" % (len(self.
objectsOut)))
315 self.
log.debug(
"\t%s" % (ob[0]))
318 self.
log.info(
"Valid FSR Store, but no data to send to Writer")
319 self.
log.info(
"SendFSR complete")
320 self.
q.
put(
"END_FSR")
325 self.
log.info(
"Receiving FSR store data...")
326 nc = self.
_gmpc.nWorkers
328 objects = self.
q.
get()
329 if objects ==
"END_FSR":
340 self.
log.info(
"All FSR data received")
345 for sourceNode, path, serialob
in self.
objectsIn:
346 self.
log.debug(
"Working with %s" % (path))
347 ob = pickle.loads(serialob)
348 if hasattr(ob,
"update"):
350 if hasattr(ob,
"numberOfObjects"):
351 nCont = ob.numberOfObjects()
353 "\t %s has containedObjects : %i" % (
type(ob).__name__, nCont)
356 self.
log.debug(
"Registering Object to : %s" % (path))
357 self.
fsr.registerObject(path, ob)
359 self.
log.debug(
"Merging Object to : %s" % (path))
365 self.
log.info(
"FSR Store Rebuilt. Correcting EventCountFSR")
366 if bool(self.
fsr._idp):
367 ecount =
"/FileRecords/EventCountFSR"
369 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
371 "Event Counter Output set : %s : %i"
375 self.
log.debug(
"FSR store reconstructed!")
376 lst = self.
fsr.getHistoNames()
379 ob = self.
fsr.retrieveObject(l)
380 if hasattr(ob,
"configureDirectAccess"):
381 ob.configureDirectAccess()
382 if hasattr(ob,
"containedObjects"):
385 "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
388 self.
log.debug(
"\t%s" % (l))
389 self.
log.info(
"FSR Store fully rebuilt.")
394 if path ==
"/FileRecords/TimeSpanFSR":
397 elif path ==
"/FileRecords/EventCountFSR":
402 "KeyedContainer" in ob.__class__.__name__
403 and "LumiFSR" in ob.__class__.__name__
408 self.
log.info(
"Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
411 ob2 = self.
fsr.retrieveObject(path)
412 if ob.containedObjects().
size():
413 sz = ob.containedObjects().
size()
414 cob = ob2.containedObjects()[0]
418 cob = ob.containedObjects()[j]
419 self.
log.debug(
"Adding TimeSpanFSR")
420 if cob.earliest() < min:
422 if cob.latest() > max:
426 tsfsr = gbl.LHCb.TimeSpanFSR()
427 tsfsr.setEarliest(min)
429 self.
fsr[path].clear()
433 self.
log.debug(
"Event Count Input Addition")
434 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
437 from ROOT
import string
440 keyedContainer = self.
fsr.retrieveObject(path)
442 assert keyedContainer.numberOfObjects() == 1
443 l = keyedContainer.containedObject(0)
446 nCont = keyedC.numberOfObjects()
447 for i
in range(nCont):
448 obj = keyedC.containedObject(i)
450 baseLumi.merge(nextLumi)
452 newLumi = gbl.LHCb.LumiFSR()
453 for r
in baseLumi.runs:
454 newLumi.addRunNumber(r)
455 for f
in baseLumi.files:
456 newLumi.addFileID(string(f))
457 for k
in baseLumi.keys:
458 increment, integral = baseLumi.info[k]
459 newLumi.addInfo(k, increment, integral)
461 self.
fsr[path].clear()
463 self.
fsr[path].
add(newLumi)
484 for r
in lumi.runNumbers():
487 for f
in lumi.fileIDs():
492 sa = s.split(
"info (key/incr/integral) : ")[-1]
493 sa = sa.split(
"/")[:-1]
495 k, i, t = rec.split()
499 self.
info[k] = (i, t)
503 assert otherLumi.__class__.__name__ ==
"LumiFSR"
505 for r
in otherLumi.runs:
512 for f
in otherLumi.files:
519 for k
in otherLumi.keys:
520 increment, integral = otherLumi.info[k]
522 myIncrement, myIntegral = self.
info[k]
523 self.
info[k] = (myIncrement + increment, myIntegral + integral)
525 self.
info[k] = (increment, integral)
530 s =
"LumiFSR Python class\n"
533 s +=
"\t\t%i\n" % (r)
536 s +=
"\t\t%s\n" % (f)
539 increment, integral = self.
info[k]
540 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
549 cl =
"LHCb::PackedCaloHypo"
550 assert o.__class__.__name__ == cl
553 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
554 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
564 self.
pos = (o.posE, o.posX, o.posY)
568 s =
"PackedCaloHypo : \n"
569 s +=
"\tcentX : %s\n" % (str(self.
centX))
570 s +=
"\tcentY : %s\n" % (str(self.
centY))
571 s +=
"\tcerr : %s\n" % (str(self.
cerr))
572 s +=
"\tcov : %s\n" % (str(self.
cov))
574 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
575 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
576 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
577 s +=
"\tkey : %s\n" % (str(self.
key))
578 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
579 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
580 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
581 s +=
"\tlh : %s\n" % (str(self.
lh))
582 s +=
"\tpos : %s\n" % (str(self.
pos))
583 s +=
"\tz : %s\n" % (str(self.
z))
584 s +=
"---------------------------------------\n"
600 return self.
event.is_set()
616 s =
"---------- SyncMini --------------\n"
617 s +=
" Status : %s\n" % (self.
event.is_set())
618 s +=
" t : %5.2f\n" % (self.
t)
620 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
621 s +=
"----------------------------------\n"
630 self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
638 for i
in range(-2, nWorkers):
658 self.
log.info(
"%s : All procs done @ %i s" % (step, i))
661 time.sleep(self.
step)
665 self.
log.info(
"All processes : %s ok." % (step))
668 self.
log.critical(
"Some process is hanging on : %s" % (step))
670 hangString =
"%s : Proc/Stat : %i/%s" % (step, k, self.
d[k].
check())
671 self.
log.critical(hangString)
696 if sMini.check()
or sMini.checkLast():
697 if sMini.checkLast()
and sMini.check():
700 alive = time.time() - begin
701 self.
log.info(
"Audit : Node %i alive for %5.2f" % (k, alive))
707 wait = time.time() - sMini.getTime()
708 cond = wait > self.
limit
714 self.
log.critical(
"Single event wait : %5.2f" % (wait))
720 self.
log.info(
"TC met for event loop")
724 time.sleep(self.
step)
726 self.
log.info(
"All processes Completed all Events ok")
730 self.
log.critical(
"Some proc is hanging during Event processing!")
732 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].
check()))
738 currentStatus = [mini.check()
for mini
in self.
d.values()]
739 return all(currentStatus)
743 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
755 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
758 n = evt[l].evtNumber()
767 n = evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]