1 from GaudiPython
import gbl, SUCCESS, FAILURE
2 from multiprocessing
import Event
45 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
72 assert tup.__class__.__name__ ==
'tuple' 76 hstatus = self.
_gmpc.nWorkers + 1
79 if tup ==
'HISTOS_SENT':
80 self.
log.debug(
'received HISTOS_SENT message')
86 self.
_gmpc.sEvent.set()
87 self.
log.info(
'Writer received all histo bundles and set sync event')
92 Rebuild the Histogram Store from the histos received by Receive() 93 If we have a histo which is not in the store, 94 book and fill it according to self.bookingDict 95 If we have a histo with a matching histo in the store, 96 add the two histos, remembering that aida2root must be used on 97 the Stored histo for compatibility. 101 workerID, histDict = tup
106 for n
in histDict.keys():
108 obj = self.
hvt.retrieve(n)
114 self.
log.warning(
'FAILED TO ADD : %s' % (str(obj)))
119 if o.__class__.__name__
in self.
bookingDict.keys():
123 self.
log.warning(
'FAILED TO REGISTER : %s\tto%s' %
124 (o.__class__.__name__, n))
127 self.
log.warning(
'No booking method for: %s\t%s\t%s' %
128 (n,
type(o), o.__class__.__name__))
131 hs = self.
hvt.getHistoNames()
132 self.
log.info(
'Histo Store Rebuilt : ')
133 self.
log.info(
' Contains %i objects.' % (len(hs)))
134 self.
log.info(
' Errors in Rebuilding : %i' % (errors))
139 Register a DataObject to the Histo Store 141 self.
_gmpc.hvt.registerObject(n, o)
145 Register a ROOT 1D THisto to the Histo Store 147 obj = self.
hvt._ihs.book(n, o.GetTitle(),
148 o.GetXaxis().GetNbins(),
149 o.GetXaxis().GetXmin(),
150 o.GetXaxis().GetXmax())
155 Register a ROOT 2D THisto to the Histo Store 157 obj = self.
hvt._ihs.book(n, o.GetTitle(),
158 o.GetXaxis().GetNbins(),
159 o.GetXaxis().GetXmin(),
160 o.GetXaxis().GetXmax(),
161 o.GetYaxis().GetNbins(),
162 o.GetYaxis().GetXmin(),
163 o.GetYaxis().GetXmax())
168 Register a ROOT 3D THisto to the Histo Store 170 obj = self.
hvt._ihs.book(n, o.GetTitle(),
171 o.GetXaxis().GetXbins(),
172 o.GetXaxis().GetXmin(),
173 o.GetXaxis().GetXmax(),
174 o.GetYaxis().GetXbins(),
175 o.GetYaxis().GetXmin(),
176 o.GetYaxis().GetXmax(),
177 o.GetZaxis().GetXbins(),
178 o.GetZaxis().GetXmin(),
179 o.GetZaxis().GetXmax())
184 Register a ROOT TProfile to the Histo Store 186 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
187 o.GetXaxis().GetNbins(),
188 o.GetXaxis().GetXmin(),
189 o.GetXaxis().GetXmax(), o.GetOption())
194 Register a ROOT TProfile2D to the Histo Store 196 obj = self.
hvt._ihs.bookProf(n, o.GetTitle(),
197 o.GetXaxis().GetNbins(),
198 o.GetXaxis().GetXmin(),
199 o.GetXaxis().GetXmax(),
200 o.GetYaxis().GetNbins(),
201 o.GetYaxis().GetXmin(),
202 o.GetYaxis().GetXmax())
243 self.
log.info(
'Sending FileRecords...')
244 lst = self.
fsr.getHistoNames()
248 self.
log.info(
'No FileRecords Data to send to Writer.')
249 self.
q.
put(
'END_FSR')
253 if '/FileRecords' in lst:
254 lst.remove(
'/FileRecords')
257 o = self.
fsr.retrieveObject(l)
258 if hasattr(o,
"configureDirectAccess"):
259 o.configureDirectAccess()
261 if self.
_gmpc.nodeID == 0:
262 self.
objectsOut.append((0, l, pickle.dumps(o)))
266 if l ==
'/FileRecords/EventCountFSR':
267 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
271 assert "KeyedContainer" in o.__class__.__name__
272 nObjects = o.numberOfObjects()
274 self.
log.debug(
"Keyed Container %s with %i objects" %
276 tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
278 self.
log.debug(
'Done with FSR store, just to send to Writer.')
281 self.
log.debug(
'%i FSR objects to Writer' % (len(self.
objectsOut)))
283 self.
log.debug(
'\t%s' % (ob[0]))
286 self.
log.info(
'Valid FSR Store, but no data to send to Writer')
287 self.
log.info(
'SendFSR complete')
288 self.
q.
put(
'END_FSR')
293 self.
log.info(
'Receiving FSR store data...')
294 nc = self.
_gmpc.nWorkers
296 objects = self.
q.
get()
297 if objects ==
'END_FSR':
308 self.
log.info(
'All FSR data received')
313 for sourceNode, path, serialob
in self.
objectsIn:
314 self.
log.debug(
'Working with %s' % (path))
315 ob = pickle.loads(serialob)
316 if hasattr(ob,
'update'):
318 if hasattr(ob,
'numberOfObjects'):
319 nCont = ob.numberOfObjects()
320 self.
log.debug(
'\t %s has containedObjects : %i' %
321 (
type(ob).__name__, nCont))
323 self.
log.debug(
'Registering Object to : %s' % (path))
324 self.
fsr.registerObject(path, ob)
326 self.
log.debug(
'Merging Object to : %s' % (path))
332 self.
log.info(
'FSR Store Rebuilt. Correcting EventCountFSR')
333 if bool(self.
fsr._idp):
334 ecount =
'/FileRecords/EventCountFSR' 336 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
337 self.
log.info(
'Event Counter Output set : %s : %i' %
340 self.
log.debug(
'FSR store reconstructed!')
341 lst = self.
fsr.getHistoNames()
344 ob = self.
fsr.retrieveObject(l)
345 if hasattr(ob,
'configureDirectAccess'):
346 ob.configureDirectAccess()
347 if hasattr(ob,
'containedObjects'):
349 self.
log.debug(
'\t%s (cont. objects : %i)' %
350 (l, ob.numberOfObjects()))
352 self.
log.debug(
'\t%s' % (l))
353 self.
log.info(
'FSR Store fully rebuilt.')
358 if path ==
'/FileRecords/TimeSpanFSR':
361 elif path ==
'/FileRecords/EventCountFSR':
365 elif "KeyedContainer" in ob.__class__.__name__:
368 if "LumiFSR" in ob.__class__.__name__:
371 self.
log.info(
"Skipping Merge of Keyed Container %s for %s" %
372 (ob.__class__.__name__, path))
375 ob2 = self.
fsr.retrieveObject(path)
376 if ob.containedObjects().
size():
377 sz = ob.containedObjects().
size()
378 cob = ob2.containedObjects()[0]
382 cob = ob.containedObjects()[j]
383 self.
log.debug(
'Adding TimeSpanFSR')
384 if cob.earliest() < min:
386 if cob.latest() > max:
390 tsfsr = gbl.LHCb.TimeSpanFSR()
391 tsfsr.setEarliest(min)
393 self.
fsr[path].clear()
397 self.
log.debug(
'Event Count Input Addition')
398 self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
401 from ROOT
import string
403 keyedContainer = self.
fsr.retrieveObject(path)
405 assert keyedContainer.numberOfObjects() == 1
406 l = keyedContainer.containedObject(0)
409 nCont = keyedC.numberOfObjects()
410 for i
in xrange(nCont):
411 obj = keyedC.containedObject(i)
413 baseLumi.merge(nextLumi)
415 newLumi = gbl.LHCb.LumiFSR()
416 for r
in baseLumi.runs:
417 newLumi.addRunNumber(r)
418 for f
in baseLumi.files:
419 newLumi.addFileID(string(f))
420 for k
in baseLumi.keys:
421 increment, integral = baseLumi.info[k]
422 newLumi.addInfo(k, increment, integral)
424 self.
fsr[path].clear()
426 self.
fsr[path].
add(newLumi)
447 for r
in lumi.runNumbers():
450 for f
in lumi.fileIDs():
455 sa = s.split(
"info (key/incr/integral) : ")[-1]
456 sa = sa.split(
'/')[:-1]
458 k, i, t = rec.split()
462 self.
info[k] = (i, t)
466 assert otherLumi.__class__.__name__ ==
"LumiFSR" 468 for r
in otherLumi.runs:
475 for f
in otherLumi.files:
482 for k
in otherLumi.keys:
483 increment, integral = otherLumi.info[k]
485 myIncrement, myIntegral = self.
info[k]
486 self.
info[k] = (myIncrement + increment, myIntegral + integral)
488 self.
info[k] = (increment, integral)
493 s =
"LumiFSR Python class\n" 496 s +=
"\t\t%i\n" % (r)
499 s +=
"\t\t%s\n" % (f)
502 increment, integral = self.
info[k]
503 s +=
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
512 cl =
'LHCb::PackedCaloHypo' 513 assert o.__class__.__name__ == cl
516 self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
517 self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
527 self.
pos = (o.posE, o.posX, o.posY)
531 s =
"PackedCaloHypo : \n" 532 s +=
"\tcentX : %s\n" % (str(self.
centX))
533 s +=
"\tcentY : %s\n" % (str(self.
centY))
534 s +=
"\tcerr : %s\n" % (str(self.
cerr))
535 s +=
"\tcov : %s\n" % (str(self.
cov))
537 s +=
"\tfirstDigit : %s\n" % (str(self.
firstDigit))
538 s +=
"\tfirstHypo : %s\n" % (str(self.
firstHypo))
539 s +=
"\thypothesis : %s\n" % (str(self.
hypothesis))
540 s +=
"\tkey : %s\n" % (str(self.
key))
541 s +=
"\tlastCluster : %s\n" % (str(self.
lastCluster))
542 s +=
"\tlastDigit : %s\n" % (str(self.
lastDigit))
543 s +=
"\tlastHypo : %s\n" % (str(self.
lastHypo))
544 s +=
"\tlh : %s\n" % (str(self.
lh))
545 s +=
"\tpos : %s\n" % (str(self.
pos))
546 s +=
"\tz : %s\n" % (str(self.
z))
547 s +=
"---------------------------------------\n" 563 return self.
event.is_set()
579 s =
"---------- SyncMini --------------\n" 580 s +=
" Status : %s\n" % (self.
event.is_set())
581 s +=
" t : %5.2f\n" % (self.
t)
583 s +=
"Last Event : %s\n" % (self.
lastEvent.is_set())
584 s +=
"----------------------------------\n" 605 for i
in xrange(-2, nWorkers):
623 for i
in xrange(0, self.
limit, self.
step):
625 self.
log.info(
'%s : All procs done @ %i s' % (step, i))
628 time.sleep(self.
step)
632 self.
log.info(
"All processes : %s ok." % (step))
635 self.
log.critical(
'Some process is hanging on : %s' % (step))
637 hangString =
"%s : Proc/Stat : %i/%s" % (step, k,
639 self.
log.critical(hangString)
664 if sMini.check()
or sMini.checkLast():
665 if sMini.checkLast()
and sMini.check():
668 alive = time.time() - begin
670 "Audit : Node %i alive for %5.2f" % (k, alive))
676 wait = time.time() - sMini.getTime()
677 cond = wait > self.
limit 683 self.
log.critical(
'Single event wait : %5.2f' % (wait))
689 self.
log.info(
'TC met for event loop')
693 time.sleep(self.
step)
695 self.
log.info(
"All processes Completed all Events ok")
699 self.
log.critical(
'Some proc is hanging during Event processing!')
701 self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].check()))
707 currentStatus = [mini.check()
for mini
in self.
d.values()]
708 return all(currentStatus)
712 stat = [sMini.checkLast()
for sMini
in self.
d.values()]
724 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
727 n = evt[l].evtNumber()
736 n = evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
constexpr auto size(const T &, Args &&...) noexcept
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)