13 from multiprocessing 
import Event
 
   15 from GaudiPython 
import FAILURE, SUCCESS, gbl
 
   56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
 
   83         assert tup.__class__.__name__ == 
"tuple" 
   87         hstatus = self.
_gmpc.nWorkers + 1  
 
   90             if tup == 
"HISTOS_SENT":
 
   91                 self.
log.debug(
"received HISTOS_SENT message")
 
   97         self.
_gmpc.sEvent.set()
 
   98         self.
log.info(
"Writer received all histo bundles and set sync event")
 
  103         Rebuild the Histogram Store from the histos received by Receive() 
  104         If we have a histo which is not in the store, 
  105         book and fill it according to self.bookingDict 
  106         If we have a histo with a matching histo in the store, 
  107         add the two histos, remembering that aida2root must be used on 
  108         the Stored histo for compatibility. 
  112             workerID, histDict = tup
 
  116             for n 
in histDict.keys():
 
  118                 obj = self.
hvt.retrieve(n)
 
  124                         self.
log.warning(
"FAILED TO ADD : %s" % (str(obj)))
 
  133                                 "FAILED TO REGISTER : %s\tto%s" 
  134                                 % (o.__class__.__name__, n)
 
  139                             "No booking method for: %s\t%s\t%s" 
  140                             % (n, 
type(o), o.__class__.__name__)
 
  144         hs = self.
hvt.getHistoNames()
 
  145         self.
log.info(
"Histo Store Rebuilt : ")
 
  146         self.
log.info(
"  Contains %i objects." % (len(hs)))
 
  147         self.
log.info(
"  Errors in Rebuilding : %i" % (errors))
 
  152         Register a DataObject to the Histo Store 
  154         self.
_gmpc.hvt.registerObject(n, o)
 
  158         Register a ROOT 1D THisto to the Histo Store 
  160         obj = self.
hvt._ihs.book(
 
  163             o.GetXaxis().GetNbins(),
 
  164             o.GetXaxis().GetXmin(),
 
  165             o.GetXaxis().GetXmax(),
 
  171         Register a ROOT 2D THisto to the Histo Store 
  173         obj = self.
hvt._ihs.book(
 
  176             o.GetXaxis().GetNbins(),
 
  177             o.GetXaxis().GetXmin(),
 
  178             o.GetXaxis().GetXmax(),
 
  179             o.GetYaxis().GetNbins(),
 
  180             o.GetYaxis().GetXmin(),
 
  181             o.GetYaxis().GetXmax(),
 
  187         Register a ROOT 3D THisto to the Histo Store 
  189         obj = self.
hvt._ihs.book(
 
  192             o.GetXaxis().GetXbins(),
 
  193             o.GetXaxis().GetXmin(),
 
  194             o.GetXaxis().GetXmax(),
 
  195             o.GetYaxis().GetXbins(),
 
  196             o.GetYaxis().GetXmin(),
 
  197             o.GetYaxis().GetXmax(),
 
  198             o.GetZaxis().GetXbins(),
 
  199             o.GetZaxis().GetXmin(),
 
  200             o.GetZaxis().GetXmax(),
 
  206         Register a ROOT TProfile to the Histo Store 
  208         obj = self.
hvt._ihs.bookProf(
 
  211             o.GetXaxis().GetNbins(),
 
  212             o.GetXaxis().GetXmin(),
 
  213             o.GetXaxis().GetXmax(),
 
  220         Register a ROOT TProfile2D to the Histo Store 
  222         obj = self.
hvt._ihs.bookProf(
 
  225             o.GetXaxis().GetNbins(),
 
  226             o.GetXaxis().GetXmin(),
 
  227             o.GetXaxis().GetXmax(),
 
  228             o.GetYaxis().GetNbins(),
 
  229             o.GetYaxis().GetXmin(),
 
  230             o.GetYaxis().GetXmax(),
 
  272         self.
log.info(
"Sending FileRecords...")
 
  273         lst = self.
fsr.getHistoNames()
 
  277             self.
log.info(
"No FileRecords Data to send to Writer.")
 
  278             self.
q.
put(
"END_FSR")
 
  282         if "/FileRecords" in lst:
 
  283             lst.remove(
"/FileRecords")
 
  286             o = self.
fsr.retrieveObject(l)
 
  287             if hasattr(o, 
"configureDirectAccess"):
 
  288                 o.configureDirectAccess()
 
  290             if self.
_gmpc.nodeID == 0:
 
  291                 self.
objectsOut.append((0, l, pickle.dumps(o)))
 
  295                 if l == 
"/FileRecords/EventCountFSR":
 
  296                     tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
 
  298                 elif "KeyedContainer" in o.__class__.__name__:
 
  300                     nObjects = o.numberOfObjects()
 
  303                             "Keyed Container %s with %i objects" % (l, nObjects)
 
  305                         tup = (self.
_gmpc.nodeID, l, pickle.dumps(o))
 
  308                     self.
log.info(
"Ignoring %s in FSR" % o.__class__.__name__)
 
  310         self.
log.debug(
"Done with FSR store, just to send to Writer.")
 
  313             self.
log.debug(
"%i FSR objects to Writer" % (len(self.
objectsOut)))
 
  315                 self.
log.debug(
"\t%s" % (ob[0]))
 
  318             self.
log.info(
"Valid FSR Store, but no data to send to Writer")
 
  319         self.
log.info(
"SendFSR complete")
 
  320         self.
q.
put(
"END_FSR")
 
  325         self.
log.info(
"Receiving FSR store data...")
 
  326         nc = self.
_gmpc.nWorkers
 
  328             objects = self.
q.
get()
 
  329             if objects == 
"END_FSR":
 
  340         self.
log.info(
"All FSR data received")
 
  345         for sourceNode, path, serialob 
in self.
objectsIn:
 
  346             self.
log.debug(
"Working with %s" % (path))
 
  347             ob = pickle.loads(serialob)
 
  348             if hasattr(ob, 
"update"):
 
  350             if hasattr(ob, 
"numberOfObjects"):
 
  351                 nCont = ob.numberOfObjects()
 
  353                     "\t %s has containedObjects : %i" % (
type(ob).__name__, nCont)
 
  356                 self.
log.debug(
"Registering Object to : %s" % (path))
 
  357                 self.
fsr.registerObject(path, ob)
 
  359                 self.
log.debug(
"Merging Object to : %s" % (path))
 
  365         self.
log.info(
"FSR Store Rebuilt.  Correcting EventCountFSR")
 
  366         if bool(self.
fsr._idp):  
 
  367             ecount = 
"/FileRecords/EventCountFSR" 
  369                 self.
fsr[ecount].setOutput(self.
_gmpc.nIn)
 
  371                     "Event Counter Output set : %s : %i" 
  375             self.
log.debug(
"FSR store reconstructed!")
 
  376             lst = self.
fsr.getHistoNames()
 
  379                     ob = self.
fsr.retrieveObject(l)
 
  380                     if hasattr(ob, 
"configureDirectAccess"):
 
  381                         ob.configureDirectAccess()
 
  382                     if hasattr(ob, 
"containedObjects"):
 
  385                             "\t%s (cont. objects : %i)" % (l, ob.numberOfObjects())
 
  388                         self.
log.debug(
"\t%s" % (l))
 
  389         self.
log.info(
"FSR Store fully rebuilt.")
 
  394         if path == 
"/FileRecords/TimeSpanFSR":
 
  397         elif path == 
"/FileRecords/EventCountFSR":
 
  402             "KeyedContainer" in ob.__class__.__name__
 
  403             and "LumiFSR" in ob.__class__.__name__
 
  408             self.
log.info(
"Skipping Merge of %s at %s" % (ob.__class__.__name__, path))
 
  411         ob2 = self.
fsr.retrieveObject(path)
 
  412         if ob.containedObjects().
size():
 
  413             sz = ob.containedObjects().
size()
 
  414             cob = ob2.containedObjects()[0]
 
  418                 cob = ob.containedObjects()[j]
 
  419                 self.
log.debug(
"Adding TimeSpanFSR")
 
  420                 if cob.earliest() < min:
 
  422                 if cob.latest() > max:
 
  426             tsfsr = gbl.LHCb.TimeSpanFSR()
 
  427             tsfsr.setEarliest(min)
 
  429             self.
fsr[path].clear()
 
  433         self.
log.debug(
"Event Count Input Addition")
 
  434         self.
fsr[path].setInput(self.
fsr[path].input() + ob.input())
 
  437         from ROOT 
import string
 
  440         keyedContainer = self.
fsr.retrieveObject(path)
 
  442         assert keyedContainer.numberOfObjects() == 1
 
  443         l = keyedContainer.containedObject(0)
 
  446         nCont = keyedC.numberOfObjects()
 
  447         for i 
in range(nCont):
 
  448             obj = keyedC.containedObject(i)
 
  450             baseLumi.merge(nextLumi)
 
  452         newLumi = gbl.LHCb.LumiFSR()
 
  453         for r 
in baseLumi.runs:
 
  454             newLumi.addRunNumber(r)
 
  455         for f 
in baseLumi.files:
 
  456             newLumi.addFileID(string(f))
 
  457         for k 
in baseLumi.keys:
 
  458             increment, integral = baseLumi.info[k]
 
  459             newLumi.addInfo(k, increment, integral)
 
  461         self.
fsr[path].clear()
 
  463         self.
fsr[path].
add(newLumi)
 
  484         for r 
in lumi.runNumbers():
 
  487         for f 
in lumi.fileIDs():
 
  492         sa = s.split(
"info (key/incr/integral) : ")[-1]
 
  493         sa = sa.split(
"/")[:-1]
 
  495             k, i, t = rec.split()
 
  499             self.
info[k] = (i, t)
 
  503         assert otherLumi.__class__.__name__ == 
"LumiFSR" 
  505         for r 
in otherLumi.runs:
 
  512         for f 
in otherLumi.files:
 
  519         for k 
in otherLumi.keys:
 
  520             increment, integral = otherLumi.info[k]
 
  522                 myIncrement, myIntegral = self.
info[k]
 
  523                 self.
info[k] = (myIncrement + increment, myIntegral + integral)
 
  525                 self.
info[k] = (increment, integral)
 
  530         s = 
"LumiFSR Python class\n" 
  533             s += 
"\t\t%i\n" % (r)
 
  536             s += 
"\t\t%s\n" % (f)
 
  539             increment, integral = self.
info[k]
 
  540             s += 
"\t\t%i\t%i\t%i\n" % (k, increment, integral)
 
  549         cl = 
"LHCb::PackedCaloHypo" 
  550         assert o.__class__.__name__ == cl
 
  553         self.
cerr = (o.cerr00, o.cerr10, o.cerr11)
 
  554         self.
cov = (o.cov00, o.cov10, o.cov11, o.cov20, o.cov21, o.cov22)
 
  564         self.
pos = (o.posE, o.posX, o.posY)
 
  568         s = 
"PackedCaloHypo : \n" 
  569         s += 
"\tcentX        : %s\n" % (str(self.
centX))
 
  570         s += 
"\tcentY        : %s\n" % (str(self.
centY))
 
  571         s += 
"\tcerr         : %s\n" % (str(self.
cerr))
 
  572         s += 
"\tcov          : %s\n" % (str(self.
cov))
 
  574         s += 
"\tfirstDigit   : %s\n" % (str(self.
firstDigit))
 
  575         s += 
"\tfirstHypo    : %s\n" % (str(self.
firstHypo))
 
  576         s += 
"\thypothesis   : %s\n" % (str(self.
hypothesis))
 
  577         s += 
"\tkey          : %s\n" % (str(self.
key))
 
  578         s += 
"\tlastCluster  : %s\n" % (str(self.
lastCluster))
 
  579         s += 
"\tlastDigit    : %s\n" % (str(self.
lastDigit))
 
  580         s += 
"\tlastHypo     : %s\n" % (str(self.
lastHypo))
 
  581         s += 
"\tlh           : %s\n" % (str(self.
lh))
 
  582         s += 
"\tpos          : %s\n" % (str(self.
pos))
 
  583         s += 
"\tz            : %s\n" % (str(self.
z))
 
  584         s += 
"---------------------------------------\n" 
  600         return self.
event.is_set()
 
  616         s = 
"---------- SyncMini --------------\n" 
  617         s += 
"    Status : %s\n" % (self.
event.is_set())
 
  618         s += 
"         t : %5.2f\n" % (self.
t)
 
  620             s += 
"Last Event : %s\n" % (self.
lastEvent.is_set())
 
  621         s += 
"----------------------------------\n" 
  630         self, nWorkers, log, manyEvents=False, limit=None, step=None, firstEvent=None
 
  638         for i 
in range(-2, nWorkers):
 
  658                 self.
log.info(
"%s : All procs done @ %i s" % (step, i))
 
  661                 time.sleep(self.
step)
 
  665             self.
log.info(
"All processes : %s ok." % (step))
 
  668             self.
log.critical(
"Some process is hanging on : %s" % (step))
 
  670                 hangString = 
"%s : Proc/Stat : %i/%s" % (step, k, self.
d[k].
check())
 
  671                 self.
log.critical(hangString)
 
  696                 if sMini.check() 
or sMini.checkLast():
 
  697                     if sMini.checkLast() 
and sMini.check():
 
  700                         alive = time.time() - begin
 
  701                         self.
log.info(
"Audit : Node %i alive for %5.2f" % (k, alive))
 
  707                     wait = time.time() - sMini.getTime()
 
  708                     cond = wait > self.
limit 
  714                         self.
log.critical(
"Single event wait : %5.2f" % (wait))
 
  720                 self.
log.info(
"TC met for event loop")
 
  724                 time.sleep(self.
step)
 
  726         self.
log.info(
"All processes Completed all Events ok")
 
  730         self.
log.critical(
"Some proc is hanging during Event processing!")
 
  732             self.
log.critical(
"Proc/Stat : %i / %s" % (k, self.
d[k].
check()))
 
  738         currentStatus = [mini.check() 
for mini 
in self.
d.values()]
 
  739         return all(currentStatus)
 
  743         stat = [sMini.checkLast() 
for sMini 
in self.
d.values()]
 
  755     lst = [
"/Event/Gen/Header", 
"/Event/Rec/Header"]
 
  758             n = evt[l].evtNumber()
 
  767         n = evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]