1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
53 self.
hvt = self._gmpc.hvt
55 self.
qin = self._gmpc.hq
56 self.
log = self._gmpc.log
72 assert tup.__class__.__name__ ==
'tuple' 73 self.histos.append(tup)
76 hstatus = self._gmpc.nWorkers + 1
79 if tup ==
'HISTOS_SENT':
80 self.log.debug(
'received HISTOS_SENT message')
86 self._gmpc.sEvent.set()
87 self.log.info(
'Writer received all histo bundles and set sync event')
92 Rebuild the Histogram Store from the histos received by Receive() 93 If we have a histo which is not in the store, 94 book and fill it according to self.bookingDict 95 If we have a histo with a matching histo in the store, 96 add the two histos, remembering that aida2root must be used on 97 the Stored histo for compatibility. 101 workerID, histDict = tup
106 for n
in histDict.keys():
108 obj = self.hvt.retrieve(n)
114 self.log.warning(
'FAILED TO ADD : %s' % (str(obj)))
119 if o.__class__.__name__
in self.bookingDict.keys():
123 self.log.warning(
'FAILED TO REGISTER : %s\tto%s' %
124 (o.__class__.__name__, n))
127 self.log.warning(
'No booking method for: %s\t%s\t%s' %
128 (n,
type(o), o.__class__.__name__))
131 hs = self.hvt.getHistoNames()
132 self.log.info(
'Histo Store Rebuilt : ')
133 self.log.info(
' Contains %i objects.' % (len(hs)))
134 self.log.info(
' Errors in Rebuilding : %i' % (errors))
139 Register a DataObject to the Histo Store 141 self._gmpc.hvt.registerObject(n, o)
145 Register a ROOT 1D THisto to the Histo Store 147 obj = self.hvt._ihs.book(n, o.GetTitle(),
148 o.GetXaxis().GetNbins(),
149 o.GetXaxis().GetXmin(),
150 o.GetXaxis().GetXmax())
155 Register a ROOT 2D THisto to the Histo Store 157 obj = self.hvt._ihs.book(n, o.GetTitle(),
158 o.GetXaxis().GetNbins(),
159 o.GetXaxis().GetXmin(),
160 o.GetXaxis().GetXmax(),
161 o.GetYaxis().GetNbins(),
162 o.GetYaxis().GetXmin(),
163 o.GetYaxis().GetXmax())
168 Register a ROOT 3D THisto to the Histo Store 170 obj = self.hvt._ihs.book(n, o.GetTitle(),
171 o.GetXaxis().GetXbins(),
172 o.GetXaxis().GetXmin(),
173 o.GetXaxis().GetXmax(),
174 o.GetYaxis().GetXbins(),
175 o.GetYaxis().GetXmin(),
176 o.GetYaxis().GetXmax(),
177 o.GetZaxis().GetXbins(),
178 o.GetZaxis().GetXmin(),
179 o.GetZaxis().GetXmax())
184 Register a ROOT TProfile to the Histo Store 186 obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
187 o.GetXaxis().GetNbins(),
188 o.GetXaxis().GetXmin(),
189 o.GetXaxis().GetXmax(), o.GetOption())
194 Register a ROOT TProfile2D to the Histo Store 196 obj = self.hvt._ihs.bookProf(n, o.GetTitle(),
197 o.GetXaxis().GetNbins(),
198 o.GetXaxis().GetXmin(),
199 o.GetXaxis().GetXmax(),
200 o.GetYaxis().GetNbins(),
201 o.GetYaxis().GetXmin(),
202 o.GetYaxis().GetXmax())
213 self.
q = self._gmpc.fq
243 self.log.info(
'Sending FileRecords...')
244 lst = self.fsr.getHistoNames()
248 self.log.info(
'No FileRecords Data to send to Writer.')
249 self.q.put(
'END_FSR')
253 if '/FileRecords' in lst:
254 lst.remove(
'/FileRecords')
257 o = self.fsr.retrieveObject(l)
258 if hasattr(o,
"configureDirectAccess"):
259 o.configureDirectAccess()
261 if self._gmpc.nodeID == 0:
262 self.objectsOut.append((0, l, pickle.dumps(o)))
266 if l ==
'/FileRecords/EventCountFSR':
267 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
268 self.objectsOut.append(tup)
271 assert "KeyedContainer" in o.__class__.__name__
272 nObjects = o.numberOfObjects()
274 self.log.debug(
"Keyed Container %s with %i objects" %
276 tup = (self._gmpc.nodeID, l, pickle.dumps(o))
277 self.objectsOut.append(tup)
278 self.log.debug(
'Done with FSR store, just to send to Writer.')
281 self.log.debug(
'%i FSR objects to Writer' % (len(self.
objectsOut)))
283 self.log.debug(
'\t%s' % (ob[0]))
286 self.log.info(
'Valid FSR Store, but no data to send to Writer')
287 self.log.info(
'SendFSR complete')
288 self.q.put(
'END_FSR')
293 self.log.info(
'Receiving FSR store data...')
294 nc = self._gmpc.nWorkers
296 objects = self.q.get()
297 if objects ==
'END_FSR':
304 self.objectsIn.append(o)
307 self.objectsIn.sort(cmp=self.
localCmp)
308 self.log.info(
'All FSR data received')
313 for sourceNode, path, serialob
in self.
objectsIn:
314 self.log.debug(
'Working with %s' % (path))
315 ob = pickle.loads(serialob)
316 if hasattr(ob,
'update'):
318 if hasattr(ob,
'numberOfObjects'):
319 nCont = ob.numberOfObjects()
320 self.log.debug(
'\t %s has containedObjects : %i' %
321 (
type(ob).__name__, nCont))
323 self.log.debug(
'Registering Object to : %s' % (path))
324 self.fsr.registerObject(path, ob)
326 self.log.debug(
'Merging Object to : %s' % (path))
332 self.log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
333 if bool(self.fsr._idp):
334 ecount =
'/FileRecords/EventCountFSR' 336 self.
fsr[ecount].setOutput(self._gmpc.nIn)
337 self.log.info(
'Event Counter Output set : %s : %i' %
340 self.log.debug(
'FSR store reconstructed!')
341 lst = self.fsr.getHistoNames()
344 ob = self.fsr.retrieveObject(l)
345 if hasattr(ob,
'configureDirectAccess'):
346 ob.configureDirectAccess()
347 if hasattr(ob,
'containedObjects'):
349 self.log.debug(
'\t%s (cont. objects : %i)' %
350 (l, ob.numberOfObjects()))
352 self.log.debug(
'\t%s' % (l))
353 self.log.info(
'FSR Store fully rebuilt.')
358 if path ==
'/FileRecords/TimeSpanFSR':
361 elif path ==
'/FileRecords/EventCountFSR':
365 elif "KeyedContainer" in ob.__class__.__name__:
368 if "LumiFSR" in ob.__class__.__name__:
371 self.log.info(
"Skipping Merge of Keyed Container %s for %s" %
372 (ob.__class__.__name__, path))
375 ob2 = self.fsr.retrieveObject(path)
376 if ob.containedObjects().
size():
377 sz = ob.containedObjects().
size()
378 cob = ob2.containedObjects()[0]
382 cob = ob.containedObjects()[j]
383 self.log.debug(
'Adding TimeSpanFSR')
384 if cob.earliest() < min:
386 if cob.latest() > max:
390 tsfsr = gbl.LHCb.TimeSpanFSR()
391 tsfsr.setEarliest(min)
393 self.
fsr[path].clear()
397 self.log.debug(
'Event Count Input Addition')
398 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
401 from ROOT
import string
403 keyedContainer = self.fsr.retrieveObject(path)
405 assert keyedContainer.numberOfObjects() == 1
406 l = keyedContainer.containedObject(0)
409 nCont = keyedC.numberOfObjects()
410 for i
in xrange(nCont):
411 obj = keyedC.containedObject(i)
413 baseLumi.merge(nextLumi)
415 newLumi = gbl.LHCb.LumiFSR()
416 for r
in baseLumi.runs:
417 newLumi.addRunNumber(r)
418 for f
in baseLumi.files:
419 newLumi.addFileID(string(f))
420 for k
in baseLumi.keys:
421 increment, integral = baseLumi.info[k]
422 newLumi.addInfo(k, increment, integral)
424 self.
fsr[path].clear()
426 self.
fsr[path].
add(newLumi)
447 for r
in lumi.runNumbers():
450 for f
in lumi.fileIDs():
455 sa = s.split(
"info (key/incr/integral) : ")[-1]
456 sa = sa.split(
'/')[:-1]
458 k, i, t = rec.split()
462 self.
info[k] = (i, t)
463 self.
keys = self.info.keys()
466 assert otherLumi.__class__.__name__ ==
"LumiFSR" 468 for r
in otherLumi.runs:
475 for f
in otherLumi.files:
482 for k
in otherLumi.keys:
483 increment, integral = otherLumi.info[k]
485 myIncrement, myIntegral = self.
info[k]
486 self.
info[k] = (myIncrement + increment, myIntegral + integral)
488 self.
info[k] = (increment, integral)
490 self.
keys = self.info.keys()
493 s =
"LumiFSR Python class\n" 496 s +=
"\t\t%i\n" % (r)
499 s +=
"\t\t%s\n" % (f)
502 increment, integral = self.
info[k]
503 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
512 cl =
'LHCb::PackedCaloHypo' 513 assert o.__class__.__name__ == cl
516 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
517 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
527 self.
pos = (o.posE, o.posX, o.posY)
531 s =
"PackedCaloHypo : \n" 532 s +=
"\tcentX : %s\n" % (str(self.
centX))
533 s +=
"\tcentY : %s\n" % (str(self.
centY))
534 s +=
"\tcerr : %s\n" % (str(self.
cerr))
535 s +=
"\tcov : %s\n" % (str(self.
cov))
537 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
538 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
539 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
540 s +=
"\tkey : %s\n" % (str(self.
key))
541 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
542 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
543 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
544 s +=
"\tlh : %s\n" % (str(self.
lh))
545 s +=
"\tpos : %s\n" % (str(self.
pos))
546 s +=
"\tz : %s\n" % (str(self.
z))
547 s +=
"---------------------------------------\n" 563 return self.event.is_set()
566 return self.lastEvent.is_set()
579 s =
"---------- SyncMini --------------\n" 580 s +=
" Status : %s\n" % (self.event.is_set())
581 s +=
" t : %5.2f\n" % (self.
t)
583 s +=
"Last Event : %s\n" % (self.lastEvent.is_set())
584 s +=
"----------------------------------\n" 605 for i
in xrange(-2, nWorkers):
606 self.
d[i] =
SyncMini(Event(), lastEvent=Event())
623 for i
in xrange(0, self.
limit, self.
step):
625 self.log.info(
'%s : All procs done @ %i s' % (step, i))
628 time.sleep(self.
step)
632 self.log.info(
"All processes : %s ok." % (step))
635 self.log.critical(
'Some process is hanging on : %s' % (step))
637 hangString =
"%s : Proc/Stat : %i/%s" % (step, k,
639 self.log.critical(hangString)
664 if sMini.check()
or sMini.checkLast():
665 if sMini.checkLast()
and sMini.check():
668 alive = time.time() - begin
670 "Audit : Node %i alive for %5.2f" % (k, alive))
676 wait = time.time() - sMini.getTime()
677 cond = wait > self.
limit 683 self.log.critical(
'Single event wait : %5.2f' % (wait))
689 self.log.info(
'TC met for event loop')
693 time.sleep(self.
step)
695 self.log.info(
"All processes Completed all Events ok")
699 self.log.critical(
'Some proc is hanging during Event processing!')
701 self.log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].check()))
707 currentStatus = [mini.check()
for mini
in self.d.values()]
708 return all(currentStatus)
712 stat = [sMini.checkLast()
for sMini
in self.d.values()]
724 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
727 n = evt[l].evtNumber()
736 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
constexpr auto size(const C &c) noexcept(noexcept(c.size())) -> decltype(c.size())