11 from __future__
import print_function
13 import multiprocessing
17 from multiprocessing
import (
25 from multiprocessing.queues
import Empty
29 from ROOT
import TBuffer, TBufferFile, TParallelMergingFile
31 from GaudiPython
import (
68 WAIT_INITIALISE = 60 * 5
69 WAIT_FIRST_EVENT = 60 * 3
70 WAIT_SINGLE_EVENT = 60 * 6
71 WAIT_FINALISE = 60 * 2
84 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
90 gbl.AIDA.IHistogram1D,
91 gbl.AIDA.IHistogram2D,
92 gbl.AIDA.IHistogram3D,
95 gbl.AIDA.IBaseHistogram,
99 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
103 "EvtCollectionStream":
"tuples",
104 "InputCopyStream":
"events",
105 "OutputStream":
"events",
106 "RecordStream":
"records",
107 "RunRecordStream":
"records",
108 "SequentialOutputStream":
"events",
109 "TagCollectionStream":
"tuples",
117 A class to represent a writer in the GaudiPython configuration
118 It can be non-trivial to access the name of the output file; it may be
119 specified in the DataSvc, or just on the writer, may be a list, or string
120 Also, there are three different types of writer (events, records, tuples)
121 so this bootstrap class provides easy access to this info while configuring
140 if hasattr(self.
w,
"Output"):
150 if hasattr(datasvc,
"Output"):
162 assert replaceThis.__class__.__name__ ==
"str"
163 assert withThis.__class__.__name__ ==
"str"
166 if old.__class__.__name__ ==
"list":
169 new = old.replace(replaceThis, withThis)
178 if hasattr(self.
w,
"ItemList"):
181 datasvc = config[self.
w.EvtDataSvc]
182 if hasattr(datasvc,
"ItemList"):
185 if hasattr(self.
w,
"OptItemList"):
188 datasvc = config[self.
w.EvtDataSvc]
189 if hasattr(datasvc,
"OptItemList"):
193 def set(self, key, output):
202 s +=
"Writer : %s\n" % (self.
wName)
203 s +=
"Writer Type : %s\n" % (self.
wType)
204 s +=
"Writer Output : %s\n" % (self.
output)
206 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
208 s +=
"Key for config : %s\n" % (self.
key)
209 s +=
"Output File : %s\n" % (self.
output)
210 s +=
"ItemList : %s\n" % (self.
ItemList)
221 GaudiPython algorithm used to clean up histos on the Reader and Workers
222 Only has a finalize method()
223 This retrieves a dictionary of path:histo objects and sends it to the
224 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
225 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
226 at the writer end, there will be an error.
230 PyAlgorithm.__init__(self)
239 self.
log.info(
"CollectHistograms Finalise (%s)" % (self.
_gmpc.nodeType))
240 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
241 ks = self.
_gmpc.hDict.keys()
242 self.
log.info(
"%i Objects in Histogram Store" % (len(ks)))
248 reps = len(ks) / chunk + 1
249 for i
in range(reps):
250 someKeys = ks[i * chunk : (i + 1) * chunk]
251 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
252 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
254 self.
log.debug(
"Signalling end of histos to Writer")
255 self.
_gmpc.hq.put(
"HISTOS_SENT")
256 self.
log.debug(
"Waiting on Sync Event")
257 self.
_gmpc.sEvent.wait()
258 self.
log.debug(
"Histo Sync Event set, clearing and returning")
259 self.
_gmpc.hvt.clearStore()
260 root = gbl.DataObject()
262 self.
_gmpc.hvt.setRoot(
"/stat", root)
297 assert item.__class__.__name__ ==
"tuple"
298 startTransmission = time.time()
302 while self.
qout._buffer:
304 self.
qoutTime += time.time() - startTransmission
311 startWait = time.time()
313 itemIn = self.
qin.
get(timeout=timeout)
316 self.
qinTime += time.time() - startWait
318 if itemIn.__class__.__name__ ==
"tuple":
325 self.
_gmpc.log.warning(
326 "TASK_DONE called too often by : %s" % (self.
_gmpc.nodeType)
332 "Finalize Event Communicator : %s %s" % (self.
_gmpc, self.
_gmpc.nodeType)
337 if self.
_gmpc.nodeType ==
"Reader":
338 downstream = self.
_gmpc.nWorkers
339 elif self.
_gmpc.nodeType ==
"Writer":
341 elif self.
_gmpc.nodeType ==
"Worker":
344 for i
in range(downstream):
346 if self.
_gmpc.nodeType !=
"Writer":
352 self.
log.name =
"%s-%i Audit " % (self.
_gmpc.nodeType, self.
_gmpc.nodeID)
353 self.
log.info(
"Items Sent : %i" % (self.
nSent))
354 self.
log.info(
"Items Received : %i" % (self.
nRecv))
356 self.
log.info(
"Data Received : %i" % (self.
sizeRecv))
357 self.
log.info(
"Q-out Time : %5.2f" % (self.
qoutTime))
358 self.
log.info(
"Q-in Time : %5.2f" % (self.
qinTime))
365 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
366 self.
T = gaudiTESSerializer
380 root = gbl.DataObject()
382 self.
evt.setRoot(
"/Event", root)
384 self.
T.loadBuffer(tbuf)
385 self.
tLoad += time.time() - t
391 tb = TBufferFile(TBuffer.kWrite)
392 self.
T.dumpBuffer(tb)
393 self.
tDump += time.time() - t
399 evIn =
"Events Loaded : %i" % (self.
nIn)
400 evOut =
"Events Dumped : %i" % (self.
nOut)
402 dataIn =
"Data Loaded : %i" % (din)
403 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
405 avgIn =
"Avg Buf Loaded : %5.2f Mb" % (din / (self.
nIn * MB))
406 maxIn =
"Max Buf Loaded : %5.2f Mb" % (
max(self.
buffersIn) / MB)
408 avgIn =
"Avg Buf Loaded : N/A"
409 maxIn =
"Max Buf Loaded : N/A"
411 dataOut =
"Data Dumped : %i" % (dout)
412 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
414 avgOut =
"Avg Buf Dumped : %5.2f Mb" % (din / (self.
nOut * MB))
415 maxOut =
"Max Buf Dumped : %5.2f Mb" % (
max(self.
buffersOut) / MB)
417 avgOut =
"Avg Buf Dumped : N/A"
418 maxOut =
"Max Buf Dumped : N/A"
419 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
420 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
452 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
463 current_process().name = nodeType
470 self.nWorkers, self.sEvent, self.config, self.
log = params
482 ks = self.config.
keys()
484 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
491 qPair, histq, fq = self.
queues
548 from AlgSmapShot
import SmapShot
551 ss = SmapShot(logname=smapsLog)
552 self.
a.addAlgorithm(ss)
555 self.
fsr = self.
a.filerecordsvc()
556 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
557 self.
pers = self.
a.service(
"EventPersistencySvc",
"IAddressCreator")
558 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
568 root = gbl.DataObject()
570 self.
evt.setRoot(
"/Event", root)
571 self.
ts.loadBuffer(tbufferfile)
574 if self.
app !=
"Gauss":
580 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
584 n = self.
evt[path].evtNumber()
594 n = self.
evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
601 if self.
nIn > 0
or self.
nOut > 0:
604 self.
log.warning(
"Could not determine Event Number")
617 keys = [
"events",
"records",
"tuples",
"histos"]
622 wkeys = WRITERTYPES.keys()
623 for v
in self.config.values():
624 if v.__class__.__name__
in wkeys:
625 writerType = WRITERTYPES[v.__class__.__name__]
626 d[writerType].append(
MiniWriter(v, writerType, self.config))
628 self.
log.info(
"Writer Found : %s" % (v.name()))
631 if "HistogramPersistencySvc" in self.config.
keys():
632 hfile = self.config[
"HistogramPersistencySvc"].getProp(
"OutputFile")
633 d[
"histos"].append(hfile)
638 Method used by the GaudiPython algorithm CollectHistos
639 to obtain a dictionary of form { path : object }
640 representing the Histogram Store
642 nlist = self.
hvt.getHistoNames()
649 if type(o)
in aidatypes:
656 print(
"WARNING : no histograms to recover?")
666 if self.
app ==
"Gauss":
668 tool = self.
a.
tool(
"ToolSvc.EvtCounter")
673 self.
iTime = time.time() - start
681 self.
fTime = time.time() - start
685 allTime =
"Alive Time : %5.2f" % (self.
tTime)
686 initTime =
"Init Time : %5.2f" % (self.
iTime)
687 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
688 runTime =
"Run Time : %5.2f" % (self.
rTime)
689 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
690 tup = (allTime, initTime, frstTime, runTime, finTime)
702 def __init__(self, queues, events, params, subworkers):
703 GMPComponent.__init__(self,
"Reader", -1, queues, events, params, subworkers)
710 self.config[
"ApplicationMgr"].TopAlg = []
711 self.config[
"ApplicationMgr"].OutStream = []
712 if "HistogramPersistencySvc" in self.config.
keys():
713 self.config[
"HistogramPersistencySvc"].OutputFile =
""
714 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Reader]" + self.
msgFormat
715 self.
evtMax = self.config[
"ApplicationMgr"].EvtMax
718 tb = TBufferFile(TBuffer.kWrite)
720 self.
ts.dumpBuffer(tb)
727 startFirst = time.time()
728 self.
log.info(
"Reader : First Event")
730 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
736 if not bool(self.
evt[
"/Event"]):
737 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
743 lst = self.
evt.getHistoNames()
746 lst = self.
evt.getList()
747 if self.
app ==
"DaVinci":
748 daqnode = self.
evt.retrieveObject(
"/Event/DAQ").
registry()
750 self.
evt.getList(daqnode, lst, daqnode.address().
par())
752 self.
log.critical(
"Reader could not acquire TES List!")
755 self.
log.info(
"Reader : TES List : %i items" % (len(lst)))
760 self.
log.info(
"First Event Sent")
763 self.eventLoopSyncer.set()
764 self.
evt.clearStore()
773 libc = ctypes.CDLL(
"libc.so.6")
775 libc.prctl(15, name, 0, 0, 0)
777 startEngine = time.time()
778 self.
log.name =
"Reader"
779 self.
log.info(
"Reader Process starting")
786 self.
log.info(
"Reader Beginning Distribution")
789 self.
log.info(
"Reader First Event OK")
791 self.
log.critical(
"Reader Failed on First Event")
798 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
801 if not self.
stat.isSuccess():
802 self.
log.critical(
"Reader is Damaged!")
807 self.
rTime += time.time() - t
808 if not bool(self.
evt[
"/Event"]):
809 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
816 self.eventLoopSyncer.set()
817 self.
evt.clearStore()
818 self.
log.info(
"Setting <Last> Event")
822 self.
log.info(
"Reader : Event Distribution complete.")
823 self.
evcom.finalize()
825 self.
tTime = time.time() - startEngine
833 def __init__(self, workerID, queues, events, params, subworkers):
834 GMPComponent.__init__(
835 self,
"Worker", workerID, queues, events, params, subworkers
841 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
849 libc = ctypes.CDLL(
"libc.so.6")
851 libc.prctl(15, name, 0, 0, 0)
854 startEngine = time.time()
855 msg = self.
a.service(
"MessageSvc")
856 msg.Format =
"%-13s " % (
"[" + self.
log.name +
"]") + self.
msgFormat
858 self.
log.name =
"Worker-%i" % (self.
nodeID)
859 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
863 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
865 nEventWriters = len(self.
writerDict[
"events"])
871 packet = self.
evcom.receive()
876 if packet ==
"FINISHED":
878 evtNumber, tbin = packet
879 if self.
cntr !=
None:
881 self.
cntr.setEventCounter(evtNumber)
891 self.
rTime += time.time() - t
895 self.
log.name =
"Worker-%i" % (self.
nodeID)
896 self.
log.warning(
"Did not Execute Event")
897 self.
evt.clearStore()
902 self.
log.name =
"Worker-%i" % (self.
nodeID)
903 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
904 self.
evt.clearStore()
913 self.
inc.fireIncident(gbl.Incident(
"Subworker",
"EndEvent"))
914 self.eventLoopSyncer.set()
915 self.
evt.clearStore()
916 self.
log.name =
"Worker-%i" % (self.
nodeID)
917 self.
log.info(
"Setting <Last> Event %s" % (self.
nodeID))
920 self.
evcom.finalize()
923 self.
tTime = time.time() - startEngine
934 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
942 For some output writers, a check is performed to see if the event has
943 executed certain algorithms.
944 These reside in the AcceptAlgs property for those writers
950 if hasattr(m.w,
"AcceptAlgs"):
951 acc += m.w.AcceptAlgs
952 if hasattr(m.w,
"RequireAlgs"):
953 req += m.w.RequireAlgs
954 if hasattr(m.w,
"VetoAlgs"):
956 return (acc, req, vet)
960 self.
a.algorithm(algName)._ialg.isExecuted()
961 and self.
a.algorithm(algName)._ialg.filterPassed()
969 Check the algorithm status for an event.
970 Depending on output writer settings, the event
971 may be declined based on various criteria.
972 This is a transcript of the check that occurs in GaudiSvc::OutputStream
976 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
978 for name
in self.acceptAlgs:
985 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
986 for name
in self.requireAlgs:
990 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
993 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
998 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
1007 def __init__(self, workerID, queues, events, params, subworkers):
1008 GMPComponent.__init__(
1009 self,
"Worker", workerID, queues, events, params, subworkers
1015 self.
log.name =
"Worker-%i" % (self.
nodeID)
1016 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1025 self.config[
"EventSelector"].Input = []
1026 self.config[
"ApplicationMgr"].OutStream = []
1027 if "HistogramPersistencySvc" in self.config.
keys():
1028 self.config[
"HistogramPersistencySvc"].OutputFile =
""
1029 formatHead =
"[Worker-%i]" % (self.
nodeID)
1030 self.config[
"MessageSvc"].Format =
"%-13s " % formatHead + self.
msgFormat
1033 self.
log.info(
"Writer Type : %s\t : %i" % (key, len(lst)))
1038 newName = m.getNewName(
".",
".w%i." % (self.
nodeID))
1039 self.config[m.key].Output = newName
1049 if "ToolSvc.EvtCounter" not in self.config:
1050 from Configurables
import EvtCounter
1052 counter = EvtCounter()
1054 counter = self.config[
"ToolSvc.EvtCounter"]
1055 counter.UseIncident =
False
1058 self.
log.warning(
"Cannot configure EvtCounter")
1066 libc = ctypes.CDLL(
"libc.so.6")
1068 libc.prctl(15, name, 0, 0, 0)
1070 startEngine = time.time()
1071 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1076 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
1078 nEventWriters = len(self.
writerDict[
"events"])
1083 for item
in m.ItemList:
1084 hsh = item.find(
"#")
1088 for item
in m.OptItemList:
1089 hsh = item.find(
"#")
1092 optItemList.add(item)
1094 itemList -= optItemList
1095 for item
in sorted(itemList):
1096 self.
log.info(
" adding ItemList Item to ts : %s" % (item))
1097 self.
ts.addItem(item)
1098 for item
in sorted(optItemList):
1099 self.
log.info(
" adding Optional Item to ts : %s" % (item))
1100 self.
ts.addOptItem(item)
1102 self.
log.info(
"There is no Event Output for this app")
1108 packet = self.
evcom.receive()
1113 if packet ==
"FINISHED":
1115 evtNumber, tbin = packet
1116 if self.
cntr !=
None:
1117 self.
cntr.setEventCounter(evtNumber)
1121 self.
log.info(
"Fork new subworkers")
1145 self.
rTime += time.time() - t
1149 self.
log.warning(
"Did not Execute Event")
1150 self.
evt.clearStore()
1155 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
1156 self.
evt.clearStore()
1165 self.
inc.fireIncident(gbl.Incident(
"Worker",
"EndEvent"))
1166 self.eventLoopSyncer.set()
1167 self.
evt.clearStore()
1168 self.
log.info(
"Setting <Last> Event")
1171 self.
evcom.finalize()
1172 self.
log.info(
"Worker-%i Finished Processing Events" % (self.
nodeID))
1180 self.
log.info(
"Join subworkers")
1185 For some output writers, a check is performed to see if the event has
1186 executed certain algorithms.
1187 These reside in the AcceptAlgs property for those writers
1193 if hasattr(m.w,
"AcceptAlgs"):
1194 acc += m.w.AcceptAlgs
1195 if hasattr(m.w,
"RequireAlgs"):
1196 req += m.w.RequireAlgs
1197 if hasattr(m.w,
"VetoAlgs"):
1199 return (acc, req, vet)
1203 self.
a.algorithm(algName)._ialg.isExecuted()
1204 and self.
a.algorithm(algName)._ialg.filterPassed()
1212 Check the algorithm status for an event.
1213 Depending on output writer settings, the event
1214 may be declined based on various criteria.
1215 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1219 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1221 for name
in self.acceptAlgs:
1228 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
1229 for name
in self.requireAlgs:
1233 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
1236 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
1241 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
1250 def __init__(self, queues, events, params, subworkers):
1251 GMPComponent.__init__(self,
"Writer", -2, queues, events, params, subworkers)
1256 self.
log.name =
"Writer--2"
1262 self.config[
"ApplicationMgr"].TopAlg = []
1263 self.config[
"EventSelector"].Input = []
1265 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Writer]" + self.
msgFormat
1272 libc = ctypes.CDLL(
"libc.so.6")
1274 libc.prctl(15, name, 0, 0, 0)
1276 startEngine = time.time()
1284 stopCriteria = self.nWorkers
1286 current = (current + 1) % self.nWorkers
1287 packet = self.
evcoms[current].receive(timeout=0.01)
1290 if packet ==
"FINISHED":
1291 self.
log.info(
"Writer got FINISHED flag : Worker %i" % (current))
1293 self.
status[current] =
True
1295 self.
log.info(
"FINISHED recd from all workers, break loop")
1300 evtNumber, tbin = packet
1304 self.
rTime += time.time() - t
1306 self.
evt.clearStore()
1307 self.eventLoopSyncer.set()
1308 self.
log.name =
"Writer--2"
1309 self.
log.info(
"Setting <Last> Event")
1313 [e.finalize()
for e
in self.
evcoms]
1318 self.
log.info(
"Histo Store rebuilt ok")
1320 self.
log.warning(
"Histo Store Error in Rebuild")
1341 self.
log.name =
"GaudiPython-Parallel-Logger"
1342 self.
log.info(
"GaudiPython Parallel Process Co-ordinator beginning")
1351 self.
hq = JoinableQueue()
1352 self.
fq = JoinableQueue()
1356 self.
nWorkers, self.
log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1362 limit=WAIT_SINGLE_EVENT,
1364 firstEvent=WAIT_FIRST_EVENT,
1367 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE
1399 init = self.
sInit.d[nodeID].event
1400 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1401 fin = self.
sFin.d[nodeID].event
1402 return (init, run, fin)
1405 eventQ = self.
qs[nodeID]
1408 return (eventQ, histQ, fsrQ)
1413 self.
log.name =
"GaudiPython-Parallel-Logger"
1414 self.
log.info(
"INITIALISING SYSTEM")
1420 sc = self.
sInit.syncAll(step=
"Initialise")
1428 self.
log.name =
"GaudiPython-Parallel-Logger"
1429 self.
log.info(
"RUNNING SYSTEM")
1430 sc = self.
sRun.syncAll(step=
"Run")
1438 self.
log.name =
"GaudiPython-Parallel-Logger"
1439 self.
log.info(
"FINALISING SYSTEM")
1440 sc = self.
sFin.syncAll(step=
"Finalise")
1448 self.
log.info(
"Cleanly join all Processes")
1450 self.
log.info(
"Report Total Success to Main.py")
1455 children = multiprocessing.active_children()
1478 rwk = JoinableQueue()
1480 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1483 d[-2] = (workersWriter,
None)
1485 d[i] = (rwk, workersWriter[i])