1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
72 assert tup.__class__.__name__ ==
'tuple' 76 hstatus = self.
_gmpc.nWorkers + 1
79 if tup ==
'HISTOS_SENT':
80 self.
log.debug(
'received HISTOS_SENT message')
86 self.
_gmpc.sEvent.set()
87 self.
log.info(
'Writer received all histo bundles and set sync event')
92 Rebuild the Histogram Store from the histos received by Receive() 93 If we have a histo which is not in the store, 94 book and fill it according to self.bookingDict 95 If we have a histo with a matching histo in the store, 96 add the two histos, remembering that aida2root must be used on 97 the Stored histo for compatibility. 101 workerID, histDict = tup
106 for n
in histDict.keys():
108 obj = self.
hvt.retrieve(n)
114 self.
log.warning(
'FAILED TO ADD : %s' % (str(obj)))
119 if o.__class__.__name__
in self.
bookingDict.keys():
123 self.
log.warning(
'FAILED TO REGISTER : %s\tto%s' %
124 (o.__class__.__name__, n))
127 self.
log.warning(
'No booking method for: %s\t%s\t%s' %
128 (n,
type(o), o.__class__.__name__))
131 hs = self.
hvt.getHistoNames()
132 self.
log.info(
'Histo Store Rebuilt : ')
133 self.
log.info(
' Contains %i objects.' % (len(hs)))
134 self.
log.info(
' Errors in Rebuilding : %i' % (errors))
139 Register a DataObject to the Histo Store 141 self.
_gmpc.hvt.registerObject(n, o)
145 Register a ROOT 1D THisto to the Histo Store 147 obj = self.
hvt._ihs.book(n, o.GetTitle(),
148 o.GetXaxis().GetNbins(),
149 o.GetXaxis().GetXmin(),
150 o.GetXaxis().GetXmax())
155 Register a ROOT 2D THisto to the Histo Store 157 obj = self.
hvt._ihs.book(n, o.GetTitle(),
158 o.GetXaxis().GetNbins(),
159 o.GetXaxis().GetXmin(),
160 o.GetXaxis().GetXmax(),
161 o.GetYaxis().GetNbins(),
162 o.GetYaxis().GetXmin(),
163 o.GetYaxis().GetXmax())
168 Register a ROOT 3D THisto to the Histo Store 170 obj = self.
hvt._ihs.book(n, o.GetTitle(),
171 o.GetXaxis().GetXbins(),
172 o.GetXaxis().GetXmin(),
173 o.GetXaxis().GetXmax(),
174 o.GetYaxis().GetXbins(),
175 o.GetYaxis().GetXmin(),
176 o.GetYaxis().GetXmax(),
177 o.GetZaxis().GetXbins(),
178 o.GetZaxis().GetXmin(),
179 o.GetZaxis().GetXmax())
184 Register a ROOT TProfile to the Histo Store 186 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
187 o.GetXaxis().GetNbins(),
188 o.GetXaxis().GetXmin(),
189 o.GetXaxis().GetXmax(), o.GetOption())
194 Register a ROOT TProfile2D to the Histo Store 196 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
197 o.GetXaxis().GetNbins(),
198 o.GetXaxis().GetXmin(),
199 o.GetXaxis().GetXmax(),
200 o.GetYaxis().GetNbins(),
201 o.GetYaxis().GetXmin(),
202 o.GetYaxis().GetXmax())
243 self.
log.info(
'Sending FileRecords...')
244 lst = self.
fsr.getHistoNames()
248 self.
log.info(
'No FileRecords Data to send to Writer.')
249 self.
q.
put(
'END_FSR')
253 if '/FileRecords' in lst:
254 lst.remove(
'/FileRecords')
257 o = self.
fsr.retrieveObject(l)
258 if hasattr(o,
"configureDirectAccess"):
259 o.configureDirectAccess()
261 if self.
_gmpc.nodeID == 0:
262 self.
objectsOut.append((0, l, pickle.dumps(o)))
266 if l ==
'/FileRecords/EventCountFSR':
267 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
269 elif "KeyedContainer" in o.__class__.__name__:
271 nObjects = o.numberOfObjects()
273 self.
log.debug(
"Keyed Container %s with %i objects" %
275 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
278 self.
log.info(
'Ignoring %s in FSR' % o.__class__.__name__)
280 self.
log.debug(
'Done with FSR store, just to send to Writer.')
283 self.
log.debug(
'%i FSR objects to Writer' % (len(self.
objectsOut)))
285 self.
log.debug(
'\t%s' % (ob[0]))
288 self.
log.info(
'Valid FSR Store, but no data to send to Writer')
289 self.
log.info(
'SendFSR complete')
290 self.
q.
put(
'END_FSR')
295 self.
log.info(
'Receiving FSR store data...')
296 nc = self.
_gmpc.nWorkers
298 objects = self.
q.
get()
299 if objects ==
'END_FSR':
310 self.
log.info(
'All FSR data received')
315 for sourceNode, path, serialob
in self.
objectsIn:
316 self.
log.debug(
'Working with %s' % (path))
317 ob = pickle.loads(serialob)
318 if hasattr(ob,
'update'):
320 if hasattr(ob,
'numberOfObjects'):
321 nCont = ob.numberOfObjects()
322 self.
log.debug(
'\t %s has containedObjects : %i' %
323 (
type(ob).__name__, nCont))
325 self.
log.debug(
'Registering Object to : %s' % (path))
326 self.
fsr.registerObject(path, ob)
328 self.
log.debug(
'Merging Object to : %s' % (path))
334 self.
log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
335 if bool(self.
fsr._idp):
336 ecount =
'/FileRecords/EventCountFSR' 338 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
339 self.
log.info(
'Event Counter Output set : %s : %i' %
342 self.
log.debug(
'FSR store reconstructed!')
343 lst = self.
fsr.getHistoNames()
346 ob = self.
fsr.retrieveObject(l)
347 if hasattr(ob,
'configureDirectAccess'):
348 ob.configureDirectAccess()
349 if hasattr(ob,
'containedObjects'):
351 self.
log.debug(
'\t%s (cont. objects : %i)' %
352 (l, ob.numberOfObjects()))
354 self.
log.debug(
'\t%s' % (l))
355 self.
log.info(
'FSR Store fully rebuilt.')
360 if path ==
'/FileRecords/TimeSpanFSR':
363 elif path ==
'/FileRecords/EventCountFSR':
367 elif "KeyedContainer" in ob.__class__.__name__
and "LumiFSR" in ob.__class__.__name__:
372 "Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
375 ob2 = self.
fsr.retrieveObject(path)
376 if ob.containedObjects().
size():
377 sz = ob.containedObjects().
size()
378 cob = ob2.containedObjects()[0]
382 cob = ob.containedObjects()[j]
383 self.
log.debug(
'Adding TimeSpanFSR')
384 if cob.earliest() < min:
386 if cob.latest() > max:
390 tsfsr = gbl.LHCb.TimeSpanFSR()
391 tsfsr.setEarliest(min)
393 self.
fsr[path].clear()
397 self.
log.debug(
'Event Count Input Addition')
398 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
401 from ROOT
import string
403 keyedContainer = self.
fsr.retrieveObject(path)
405 assert keyedContainer.numberOfObjects() == 1
406 l = keyedContainer.containedObject(0)
409 nCont = keyedC.numberOfObjects()
410 for i
in range(nCont):
411 obj = keyedC.containedObject(i)
413 baseLumi.merge(nextLumi)
415 newLumi = gbl.LHCb.LumiFSR()
416 for r
in baseLumi.runs:
417 newLumi.addRunNumber(r)
418 for f
in baseLumi.files:
419 newLumi.addFileID(string(f))
420 for k
in baseLumi.keys:
421 increment, integral = baseLumi.info[k]
422 newLumi.addInfo(k, increment, integral)
424 self.
fsr[path].clear()
426 self.
fsr[path].
add(newLumi)
447 for r
in lumi.runNumbers():
450 for f
in lumi.fileIDs():
455 sa = s.split(
"info (key/incr/integral) : ")[-1]
456 sa = sa.split(
'/')[:-1]
458 k, i, t = rec.split()
462 self.
info[k] = (i, t)
466 assert otherLumi.__class__.__name__ ==
"LumiFSR" 468 for r
in otherLumi.runs:
475 for f
in otherLumi.files:
482 for k
in otherLumi.keys:
483 increment, integral = otherLumi.info[k]
485 myIncrement, myIntegral = self.
info[k]
486 self.
info[k] = (myIncrement + increment, myIntegral + integral)
488 self.
info[k] = (increment, integral)
493 s =
"LumiFSR Python class\n" 496 s +=
"\t\t%i\n" % (r)
499 s +=
"\t\t%s\n" % (f)
502 increment, integral = self.
info[k]
503 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
512 cl =
'LHCb::PackedCaloHypo' 513 assert o.__class__.__name__ == cl
516 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
517 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
527 self.
pos = (o.posE, o.posX, o.posY)
531 s =
"PackedCaloHypo : \n" 532 s +=
"\tcentX : %s\n" % (str(self.
centX))
533 s +=
"\tcentY : %s\n" % (str(self.
centY))
534 s +=
"\tcerr : %s\n" % (str(self.
cerr))
535 s +=
"\tcov : %s\n" % (str(self.
cov))
537 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
538 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
539 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
540 s +=
"\tkey : %s\n" % (str(self.
key))
541 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
542 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
543 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
544 s +=
"\tlh : %s\n" % (str(self.
lh))
545 s +=
"\tpos : %s\n" % (str(self.
pos))
546 s +=
"\tz : %s\n" % (str(self.
z))
547 s +=
"---------------------------------------\n" 563 return self.
event.is_set()
579 s =
"---------- SyncMini --------------\n" 580 s +=
" Status : %s\n" % (self.
event.is_set())
581 s +=
" t : %5.2f\n" % (self.
t)
583 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
584 s +=
"----------------------------------\n" 625 self.
log.info(
'%s : All procs done @ %i s' % (step, i))
628 time.sleep(self.
step)
632 self.
log.info(
"All processes : %s ok." % (step))
635 self.
log.critical(
'Some process is hanging on : %s' % (step))
637 hangString =
"%s : Proc/Stat : %i/%s" % (step, k,
639 self.
log.critical(hangString)
664 if sMini.check()
or sMini.checkLast():
665 if sMini.checkLast()
and sMini.check():
668 alive = time.time() - begin
670 "Audit : Node %i alive for %5.2f" % (k, alive))
676 wait = time.time() - sMini.getTime()
677 cond = wait > self.
limit 683 self.
log.critical(
'Single event wait : %5.2f' % (wait))
689 self.
log.info(
'TC met for event loop')
693 time.sleep(self.
step)
695 self.
log.info(
"All processes Completed all Events ok")
699 self.
log.critical(
'Some proc is hanging during Event processing!')
701 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].check()))
707 currentStatus = [mini.check()
for mini
in self.
d.values()]
708 return all(currentStatus)
712 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
724 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
727 n = evt[l].evtNumber()
736 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
constexpr auto size(const T &, Args &&...) noexcept
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.