1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
46 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
54 self.
hvt = self._gmpc.hvt
56 self.
qin = self._gmpc.hq
57 self.
log = self._gmpc.log
73 assert tup.__class__.__name__ ==
'tuple' 74 self.histos.append(tup)
77 hstatus = self._gmpc.nWorkers + 1
80 if tup ==
'HISTOS_SENT':
81 self.log.debug(
'received HISTOS_SENT message')
87 self._gmpc.sEvent.set()
88 self.log.info(
'Writer received all histo bundles and set sync event')
93 Rebuild the Histogram Store from the histos received by Receive() 94 If we have a histo which is not in the store, 95 book and fill it according to self.bookingDict 96 If we have a histo with a matching histo in the store, 97 add the two histos, remembering that aida2root must be used on 98 the Stored histo for compatibility. 102 workerID, histDict = tup
107 for n
in histDict.keys():
109 obj = self.hvt.retrieve(n)
115 self.log.warning(
'FAILED TO ADD : %s' % (str(obj)))
120 if o.__class__.__name__
in self.bookingDict.keys():
124 self.log.warning(
'FAILED TO REGISTER : %s\tto%s' 125 % (o.__class__.__name__, n))
128 self.log.warning(
'No booking method for: %s\t%s\t%s' 129 % (n,
type(o), o.__class__.__name__))
132 hs = self.hvt.getHistoNames()
133 self.log.info(
'Histo Store Rebuilt : ')
134 self.log.info(
' Contains %i objects.' % (len(hs)))
135 self.log.info(
' Errors in Rebuilding : %i' % (errors))
140 Register a DataObject to the Histo Store 142 self._gmpc.hvt.registerObject(n, o)
146 Register a ROOT 1D THisto to the Histo Store 148 obj = self.hvt._ihs.book(n, o.GetTitle(),
149 o.GetXaxis().GetNbins(),
150 o.GetXaxis().GetXmin(),
151 o.GetXaxis().GetXmax())
156 Register a ROOT 2D THisto to the Histo Store 158 obj = self.hvt._ihs.book(n, o.GetTitle(),
159 o.GetXaxis().GetNbins(),
160 o.GetXaxis().GetXmin(),
161 o.GetXaxis().GetXmax(),
162 o.GetYaxis().GetNbins(),
163 o.GetYaxis().GetXmin(),
164 o.GetYaxis().GetXmax())
169 Register a ROOT 3D THisto to the Histo Store 171 obj = self.hvt._ihs.book(n, o.GetTitle(),
172 o.GetXaxis().GetXbins(),
173 o.GetXaxis().GetXmin(),
174 o.GetXaxis().GetXmax(),
175 o.GetYaxis().GetXbins(),
176 o.GetYaxis().GetXmin(),
177 o.GetYaxis().GetXmax(),
178 o.GetZaxis().GetXbins(),
179 o.GetZaxis().GetXmin(),
180 o.GetZaxis().GetXmax())
185 Register a ROOT TProfile to the Histo Store 187 obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
188 o.GetXaxis().GetNbins(),
189 o.GetXaxis().GetXmin(),
190 o.GetXaxis().GetXmax(),
196 Register a ROOT TProfile2D to the Histo Store 198 obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
199 o.GetXaxis().GetNbins(),
200 o.GetXaxis().GetXmin(),
201 o.GetXaxis().GetXmax(),
202 o.GetYaxis().GetNbins(),
203 o.GetYaxis().GetXmin(),
204 o.GetYaxis().GetXmax())
214 self.
q = self._gmpc.fq
244 self.log.info(
'Sending FileRecords...')
245 lst = self.fsr.getHistoNames()
249 self.log.info(
'No FileRecords Data to send to Writer.')
250 self.q.put(
'END_FSR')
254 if '/FileRecords' in lst:
255 lst.remove(
'/FileRecords')
258 o = self.fsr.retrieveObject(l)
259 if hasattr(o,
"configureDirectAccess"):
260 o.configureDirectAccess()
262 if self._gmpc.nodeID == 0:
263 self.objectsOut.append((0, l, pickle.dumps(o)))
267 if l ==
'/FileRecords/EventCountFSR':
268 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
269 self.objectsOut.append(tup)
272 assert "KeyedContainer" in o.__class__.__name__
273 nObjects = o.numberOfObjects()
275 self.log.debug(
"Keyed Container %s with %i objects" 277 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
278 self.objectsOut.append(tup)
279 self.log.debug(
'Done with FSR store, just to send to Writer.')
282 self.log.debug(
'%i FSR objects to Writer' % (len(self.
objectsOut)))
284 self.log.debug(
'\t%s' % (ob[0]))
287 self.log.info(
'Valid FSR Store, but no data to send to Writer')
288 self.log.info(
'SendFSR complete')
289 self.q.put(
'END_FSR')
294 self.log.info(
'Receiving FSR store data...')
295 nc = self._gmpc.nWorkers
297 objects = self.q.get()
298 if objects ==
'END_FSR':
305 self.objectsIn.append(o)
308 self.objectsIn.sort(cmp=self.
localCmp)
309 self.log.info(
'All FSR data received')
314 for sourceNode, path, serialob
in self.
objectsIn:
315 self.log.debug(
'Working with %s' % (path))
316 ob = pickle.loads(serialob)
317 if hasattr(ob,
'update'):
319 if hasattr(ob,
'numberOfObjects'):
320 nCont = ob.numberOfObjects()
321 self.log.debug(
'\t %s has containedObjects : %i' %
322 (
type(ob).__name__, nCont))
324 self.log.debug(
'Registering Object to : %s' % (path))
325 self.fsr.registerObject(path, ob)
327 self.log.debug(
'Merging Object to : %s' % (path))
333 self.log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
334 if bool(self.fsr._idp):
335 ecount =
'/FileRecords/EventCountFSR' 337 self.
fsr[ecount].setOutput(self._gmpc.nIn)
338 self.log.info(
'Event Counter Output set : %s : %i' 341 self.log.debug(
'FSR store reconstructed!')
342 lst = self.fsr.getHistoNames()
345 ob = self.fsr.retrieveObject(l)
346 if hasattr(ob,
'configureDirectAccess'):
347 ob.configureDirectAccess()
348 if hasattr(ob,
'containedObjects'):
350 self.log.debug(
'\t%s (cont. objects : %i)' 351 % (l, ob.numberOfObjects()))
353 self.log.debug(
'\t%s' % (l))
354 self.log.info(
'FSR Store fully rebuilt.')
359 if path ==
'/FileRecords/TimeSpanFSR':
362 elif path ==
'/FileRecords/EventCountFSR':
366 elif "KeyedContainer" in ob.__class__.__name__:
369 if "LumiFSR" in ob.__class__.__name__:
372 self.log.info(
"Skipping Merge of Keyed Container %s for %s" 373 % (ob.__class__.__name__, path))
376 ob2 = self.fsr.retrieveObject(path)
377 if ob.containedObjects().
size():
378 sz = ob.containedObjects().
size()
379 cob = ob2.containedObjects()[0]
383 cob = ob.containedObjects()[j]
384 self.log.debug(
'Adding TimeSpanFSR')
385 if cob.earliest() < min:
387 if cob.latest() > max:
391 tsfsr = gbl.LHCb.TimeSpanFSR()
392 tsfsr.setEarliest(min)
394 self.
fsr[path].clear()
398 self.log.debug(
'Event Count Input Addition')
399 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
402 from ROOT
import string
404 keyedContainer = self.fsr.retrieveObject(path)
406 assert keyedContainer.numberOfObjects() == 1
407 l = keyedContainer.containedObject(0)
410 nCont = keyedC.numberOfObjects()
411 for i
in xrange(nCont):
412 obj = keyedC.containedObject(i)
414 baseLumi.merge(nextLumi)
416 newLumi = gbl.LHCb.LumiFSR()
417 for r
in baseLumi.runs:
418 newLumi.addRunNumber(r)
419 for f
in baseLumi.files:
420 newLumi.addFileID(string(f))
421 for k
in baseLumi.keys:
422 increment, integral = baseLumi.info[k]
423 newLumi.addInfo(k, increment, integral)
425 self.
fsr[path].clear()
427 self.
fsr[path].
add(newLumi)
447 for r
in lumi.runNumbers():
450 for f
in lumi.fileIDs():
455 sa = s.split(
"info (key/incr/integral) : ")[-1]
456 sa = sa.split(
'/')[:-1]
458 k, i, t = rec.split()
462 self.
info[k] = (i, t)
463 self.
keys = self.info.keys()
466 assert otherLumi.__class__.__name__ ==
"LumiFSR" 468 for r
in otherLumi.runs:
475 for f
in otherLumi.files:
482 for k
in otherLumi.keys:
483 increment, integral = otherLumi.info[k]
485 myIncrement, myIntegral = self.
info[k]
486 self.
info[k] = (myIncrement + increment, myIntegral + integral)
488 self.
info[k] = (increment, integral)
490 self.
keys = self.info.keys()
493 s =
"LumiFSR Python class\n" 496 s +=
"\t\t%i\n" % (r)
499 s +=
"\t\t%s\n" % (f)
502 increment, integral = self.
info[k]
503 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
511 cl =
'LHCb::PackedCaloHypo' 512 assert o.__class__.__name__ == cl
515 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
516 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
526 self.
pos = (o.posE, o.posX, o.posY)
530 s =
"PackedCaloHypo : \n" 531 s +=
"\tcentX : %s\n" % (str(self.
centX))
532 s +=
"\tcentY : %s\n" % (str(self.
centY))
533 s +=
"\tcerr : %s\n" % (str(self.
cerr))
534 s +=
"\tcov : %s\n" % (str(self.
cov))
536 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
537 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
538 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
539 s +=
"\tkey : %s\n" % (str(self.
key))
540 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
541 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
542 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
543 s +=
"\tlh : %s\n" % (str(self.
lh))
544 s +=
"\tpos : %s\n" % (str(self.
pos))
545 s +=
"\tz : %s\n" % (str(self.
z))
546 s +=
"---------------------------------------\n" 561 return self.event.is_set()
564 return self.lastEvent.is_set()
577 s =
"---------- SyncMini --------------\n" 578 s +=
" Status : %s\n" % (self.event.is_set())
579 s +=
" t : %5.2f\n" % (self.
t)
581 s +=
"Last Event : %s\n" % (self.lastEvent.is_set())
582 s +=
"----------------------------------\n" 589 def __init__(self, nWorkers, log, manyEvents=False,
590 limit=
None, step=
None, firstEvent=
None):
597 for i
in xrange(-2, nWorkers):
598 self.
d[i] =
SyncMini(Event(), lastEvent=Event())
615 for i
in xrange(0, self.
limit, self.
step):
617 self.log.info(
'%s : All procs done @ %i s' % (step, i))
620 time.sleep(self.
step)
624 self.log.info(
"All processes : %s ok." % (step))
627 self.log.critical(
'Some process is hanging on : %s' % (step))
629 hangString =
"%s : Proc/Stat : %i/%s" % (
630 step, k, self.
d[k].check())
631 self.log.critical(hangString)
656 if sMini.check()
or sMini.checkLast():
657 if sMini.checkLast()
and sMini.check():
660 alive = time.time() - begin
661 self.log.info(
"Audit : Node %i alive for %5.2f" 668 wait = time.time() - sMini.getTime()
669 cond = wait > self.
limit 675 self.log.critical(
'Single event wait : %5.2f' % (wait))
681 self.log.info(
'TC met for event loop')
685 time.sleep(self.
step)
687 self.log.info(
"All processes Completed all Events ok")
691 self.log.critical(
'Some proc is hanging during Event processing!')
693 self.log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].check()))
699 currentStatus = [mini.check()
for mini
in self.d.values()]
700 return all(currentStatus)
704 stat = [sMini.checkLast()
for sMini
in self.d.values()]
715 lst = [
'/Event/Gen/Header',
719 n = evt[l].evtNumber()
728 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
constexpr auto size(const C &c) noexcept(noexcept(c.size())) -> decltype(c.size())