12 import multiprocessing
15 from multiprocessing
import Event, JoinableQueue, Process, cpu_count, current_process
16 from multiprocessing.queues
import Empty
18 from ROOT
import TBuffer, TBufferFile
21 from GaudiPython
import (
32 with warnings.catch_warnings():
33 warnings.simplefilter(
"ignore")
63 WAIT_INITIALISE = 60 * 10
64 WAIT_FIRST_EVENT = 60 * 3
65 WAIT_SINGLE_EVENT = 60 * 6
66 WAIT_FINALISE = 60 * 2
79 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
85 gbl.AIDA.IHistogram1D,
86 gbl.AIDA.IHistogram2D,
87 gbl.AIDA.IHistogram3D,
90 gbl.AIDA.IBaseHistogram,
94 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
98 "EvtCollectionStream":
"tuples",
99 "InputCopyStream":
"events",
100 "OutputStream":
"events",
101 "RecordStream":
"records",
102 "RunRecordStream":
"records",
103 "SequentialOutputStream":
"events",
104 "TagCollectionStream":
"tuples",
112 A class to represent a writer in the GaudiPython configuration
113 It can be non-trivial to access the name of the output file; it may be
114 specified in the DataSvc, or just on the writer, may be a list, or string
115 Also, there are three different types of writer (events, records, tuples)
116 so this bootstrap class provides easy access to this info while configuring
135 if hasattr(self.
w,
"Output"):
145 if hasattr(datasvc,
"Output"):
157 assert replaceThis.__class__.__name__ ==
"str"
158 assert withThis.__class__.__name__ ==
"str"
161 if old.__class__.__name__ ==
"list":
164 new = old.replace(replaceThis, withThis)
173 if hasattr(self.
w,
"ItemList"):
176 datasvc = config[self.
w.EvtDataSvc]
177 if hasattr(datasvc,
"ItemList"):
180 if hasattr(self.
w,
"OptItemList"):
183 datasvc = config[self.
w.EvtDataSvc]
184 if hasattr(datasvc,
"OptItemList"):
188 def set(self, key, output):
197 s +=
"Writer : %s\n" % (self.
wName)
198 s +=
"Writer Type : %s\n" % (self.
wType)
199 s +=
"Writer Output : %s\n" % (self.
output)
201 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
203 s +=
"Key for config : %s\n" % (self.
key)
204 s +=
"Output File : %s\n" % (self.
output)
205 s +=
"ItemList : %s\n" % (self.
ItemList)
216 GaudiPython algorithm used to clean up histos on the Reader and Workers
217 Only has a finalize method()
218 This retrieves a dictionary of path:histo objects and sends it to the
219 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
220 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
221 at the writer end, there will be an error.
225 PyAlgorithm.__init__(self)
234 self.
log.info(
"CollectHistograms Finalise (%s)" % (self.
_gmpc.nodeType))
235 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
236 ks = list(self.
_gmpc.hDict.keys())
237 self.
log.info(
"%i Objects in Histogram Store" % (len(ks)))
243 reps = int(len(ks) / chunk + 1)
244 for i
in range(reps):
245 someKeys = ks[i * chunk : (i + 1) * chunk]
246 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
247 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
249 self.
log.debug(
"Signalling end of histos to Writer")
250 self.
_gmpc.hq.put(
"HISTOS_SENT")
251 self.
log.debug(
"Waiting on Sync Event")
252 self.
_gmpc.sEvent.wait()
253 self.
log.debug(
"Histo Sync Event set, clearing and returning")
254 self.
_gmpc.hvt.clearStore()
255 root = gbl.DataObject()
257 self.
_gmpc.hvt.setRoot(
"/stat", root)
292 assert item.__class__.__name__ ==
"tuple"
293 startTransmission = time.time()
297 while self.
qout._buffer:
299 self.
qoutTime += time.time() - startTransmission
306 startWait = time.time()
308 itemIn = self.
qin.
get(timeout=timeout)
311 self.
qinTime += time.time() - startWait
313 if itemIn.__class__.__name__ ==
"tuple":
320 self.
_gmpc.log.warning(
321 "TASK_DONE called too often by : %s" % (self.
_gmpc.nodeType)
327 "Finalize Event Communicator : %s %s" % (self.
_gmpc, self.
_gmpc.nodeType)
332 if self.
_gmpc.nodeType ==
"Reader":
333 downstream = self.
_gmpc.nWorkers
334 elif self.
_gmpc.nodeType ==
"Writer":
336 elif self.
_gmpc.nodeType ==
"Worker":
339 for i
in range(downstream):
341 if self.
_gmpc.nodeType !=
"Writer":
347 self.
log.name =
"%s-%i Audit " % (self.
_gmpc.nodeType, self.
_gmpc.nodeID)
348 self.
log.info(
"Items Sent : %i" % (self.
nSent))
349 self.
log.info(
"Items Received : %i" % (self.
nRecv))
351 self.
log.info(
"Data Received : %i" % (self.
sizeRecv))
352 self.
log.info(
"Q-out Time : %5.2f" % (self.
qoutTime))
353 self.
log.info(
"Q-in Time : %5.2f" % (self.
qinTime))
360 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
361 self.
T = gaudiTESSerializer
375 root = gbl.DataObject()
377 self.
evt.setRoot(
"/Event", root)
379 self.
T.loadBuffer(tbuf)
380 self.
tLoad += time.time() - t
386 tb = TBufferFile(TBuffer.kWrite)
387 self.
T.dumpBuffer(tb)
388 self.
tDump += time.time() - t
394 evIn =
"Events Loaded : %i" % (self.
nIn)
395 evOut =
"Events Dumped : %i" % (self.
nOut)
397 dataIn =
"Data Loaded : %i" % (din)
398 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
400 avgIn =
"Avg Buf Loaded : %5.2f Mb" % (din / (self.
nIn * MB))
401 maxIn =
"Max Buf Loaded : %5.2f Mb" % (max(self.
buffersIn) / MB)
403 avgIn =
"Avg Buf Loaded : N/A"
404 maxIn =
"Max Buf Loaded : N/A"
406 dataOut =
"Data Dumped : %i" % (dout)
407 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
409 avgOut =
"Avg Buf Dumped : %5.2f Mb" % (din / (self.
nOut * MB))
410 maxOut =
"Max Buf Dumped : %5.2f Mb" % (max(self.
buffersOut) / MB)
412 avgOut =
"Avg Buf Dumped : N/A"
413 maxOut =
"Max Buf Dumped : N/A"
414 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
415 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
447 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
458 current_process().name = nodeType
465 self.nWorkers, self.sEvent, self.config, self.
log = params
477 ks = self.config.
keys()
479 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
486 qPair, histq, fq = self.
queues
543 from AlgSmapShot
import SmapShot
546 ss = SmapShot(logname=smapsLog)
547 self.
a.addAlgorithm(ss)
550 self.
fsr = self.
a.filerecordsvc()
551 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
552 self.
pers = self.
a.service(
"EventPersistencySvc",
"IAddressCreator")
553 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
563 root = gbl.DataObject()
565 self.
evt.setRoot(
"/Event", root)
566 self.
ts.loadBuffer(tbufferfile)
569 if self.
app !=
"Gauss":
575 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
579 n = self.
evt[path].evtNumber()
589 n = self.
evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
596 if self.
nIn > 0
or self.
nOut > 0:
599 self.
log.warning(
"Could not determine Event Number")
612 keys = [
"events",
"records",
"tuples",
"histos"]
617 wkeys = WRITERTYPES.keys()
618 for v
in self.config.values():
619 if v.__class__.__name__
in wkeys:
620 writerType = WRITERTYPES[v.__class__.__name__]
621 d[writerType].append(
MiniWriter(v, writerType, self.config))
623 self.
log.info(
"Writer Found : %s" % (v.name()))
626 if "HistogramPersistencySvc" in self.config.
keys():
627 hfile = self.config[
"HistogramPersistencySvc"].getProp(
"OutputFile")
628 d[
"histos"].append(hfile)
633 Method used by the GaudiPython algorithm CollectHistos
634 to obtain a dictionary of form { path : object }
635 representing the Histogram Store
637 nlist = self.
hvt.getHistoNames()
644 if type(o)
in aidatypes:
651 print(
"WARNING : no histograms to recover?")
661 if self.
app ==
"Gauss":
662 tool = self.
a.
tool(
"ToolSvc.EvtCounter")
665 self.
cntr = cppyy.nullptr
667 self.
iTime = time.time() - start
675 self.
fTime = time.time() - start
679 allTime =
"Alive Time : %5.2f" % (self.
tTime)
680 initTime =
"Init Time : %5.2f" % (self.
iTime)
681 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
682 runTime =
"Run Time : %5.2f" % (self.
rTime)
683 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
684 tup = (allTime, initTime, frstTime, runTime, finTime)
696 def __init__(self, queues, events, params, subworkers):
697 GMPComponent.__init__(self,
"Reader", -1, queues, events, params, subworkers)
704 self.config[
"ApplicationMgr"].TopAlg = []
705 self.config[
"ApplicationMgr"].OutStream = []
706 if "HistogramPersistencySvc" in self.config.
keys():
707 self.config[
"HistogramPersistencySvc"].OutputFile =
""
708 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Reader]" + self.
msgFormat
709 self.
evtMax = self.config[
"ApplicationMgr"].EvtMax
712 tb = TBufferFile(TBuffer.kWrite)
714 self.
ts.dumpBuffer(tb)
721 startFirst = time.time()
722 self.
log.info(
"Reader : First Event")
724 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
730 if not bool(self.
evt[
"/Event"]):
731 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
737 lst = self.
evt.getHistoNames()
740 lst = self.
evt.getList()
741 if self.
app ==
"DaVinci":
742 daqnode = self.
evt.retrieveObject(
"/Event/DAQ").
registry()
744 self.
evt.getList(daqnode, lst, daqnode.address().
par())
746 self.
log.critical(
"Reader could not acquire TES List!")
749 self.
log.info(
"Reader : TES List : %i items" % (len(lst)))
754 self.
log.info(
"First Event Sent")
757 self.eventLoopSyncer.set()
758 self.
evt.clearStore()
766 libc = ctypes.CDLL(
"libc.so.6")
768 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
770 startEngine = time.time()
771 self.
log.name =
"Reader"
772 self.
log.info(
"Reader Process starting")
779 self.
log.info(
"Reader Beginning Distribution")
782 self.
log.info(
"Reader First Event OK")
784 self.
log.critical(
"Reader Failed on First Event")
791 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
794 if not self.
stat.isSuccess():
795 self.
log.critical(
"Reader is Damaged!")
800 self.
rTime += time.time() - t
801 if not bool(self.
evt[
"/Event"]):
802 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
809 self.eventLoopSyncer.set()
810 self.
evt.clearStore()
811 self.
log.info(
"Setting <Last> Event")
815 self.
log.info(
"Reader : Event Distribution complete.")
816 self.
evcom.finalize()
818 self.
tTime = time.time() - startEngine
826 def __init__(self, workerID, queues, events, params, subworkers):
827 GMPComponent.__init__(
828 self,
"Worker", workerID, queues, events, params, subworkers
834 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
841 libc = ctypes.CDLL(
"libc.so.6")
843 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
846 startEngine = time.time()
847 msg = self.
a.service(
"MessageSvc")
848 msg.Format =
"%-13s " % (
"[" + self.
log.name +
"]") + self.
msgFormat
850 self.
log.name =
"Worker-%i" % (self.
nodeID)
851 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
855 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
862 packet = self.
evcom.receive()
867 if packet ==
"FINISHED":
869 evtNumber, tbin = packet
870 if self.
cntr != cppyy.nullptr:
871 self.
cntr.setEventCounter(evtNumber)
881 self.
rTime += time.time() - t
885 self.
log.name =
"Worker-%i" % (self.
nodeID)
886 self.
log.warning(
"Did not Execute Event")
887 self.
evt.clearStore()
892 self.
log.name =
"Worker-%i" % (self.
nodeID)
893 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
894 self.
evt.clearStore()
903 self.
inc.fireIncident(gbl.Incident(
"Subworker",
"EndEvent"))
904 self.eventLoopSyncer.set()
905 self.
evt.clearStore()
906 self.
log.name =
"Worker-%i" % (self.
nodeID)
907 self.
log.info(
"Setting <Last> Event %s" % (self.
nodeID))
910 self.
evcom.finalize()
913 self.
tTime = time.time() - startEngine
924 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
932 For some output writers, a check is performed to see if the event has
933 executed certain algorithms.
934 These reside in the AcceptAlgs property for those writers
940 if hasattr(m.w,
"AcceptAlgs"):
941 acc += m.w.AcceptAlgs
942 if hasattr(m.w,
"RequireAlgs"):
943 req += m.w.RequireAlgs
944 if hasattr(m.w,
"VetoAlgs"):
946 return (acc, req, vet)
950 self.
a.algorithm(algName)._ialg.isExecuted()
951 and self.
a.algorithm(algName)._ialg.filterPassed()
959 Check the algorithm status for an event.
960 Depending on output writer settings, the event
961 may be declined based on various criteria.
962 This is a transcript of the check that occurs in GaudiSvc::OutputStream
966 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
968 for name
in self.acceptAlgs:
975 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
976 for name
in self.requireAlgs:
980 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
983 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
988 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
997 def __init__(self, workerID, queues, events, params, subworkers):
998 GMPComponent.__init__(
999 self,
"Worker", workerID, queues, events, params, subworkers
1005 self.
log.name =
"Worker-%i" % (self.
nodeID)
1006 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1014 self.config[
"EventSelector"].Input = []
1015 self.config[
"ApplicationMgr"].OutStream = []
1016 if "HistogramPersistencySvc" in self.config.
keys():
1017 self.config[
"HistogramPersistencySvc"].OutputFile =
""
1018 formatHead =
"[Worker-%i]" % (self.
nodeID)
1019 self.config[
"MessageSvc"].Format =
"%-13s " % formatHead + self.
msgFormat
1022 self.
log.info(
"Writer Type : %s\t : %i" % (key, len(lst)))
1027 newName = m.getNewName(
".",
".w%i." % (self.
nodeID))
1028 self.config[m.key].Output = newName
1038 if "ToolSvc.EvtCounter" not in self.config:
1039 from Configurables
import EvtCounter
1041 counter = EvtCounter()
1043 counter = self.config[
"ToolSvc.EvtCounter"]
1044 counter.UseIncident =
False
1047 self.
log.warning(
"Cannot configure EvtCounter")
1053 libc = ctypes.CDLL(
"libc.so.6")
1055 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1057 startEngine = time.time()
1058 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1063 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
1065 nEventWriters = len(self.
writerDict[
"events"])
1070 for item
in m.ItemList:
1071 hsh = item.find(
"#")
1075 for item
in m.OptItemList:
1076 hsh = item.find(
"#")
1079 optItemList.add(item)
1081 itemList -= optItemList
1082 for item
in sorted(itemList):
1083 self.
log.info(
" adding ItemList Item to ts : %s" % (item))
1084 self.
ts.addItem(item)
1085 for item
in sorted(optItemList):
1086 self.
log.info(
" adding Optional Item to ts : %s" % (item))
1087 self.
ts.addOptItem(item)
1089 self.
log.info(
"There is no Event Output for this app")
1095 packet = self.
evcom.receive()
1100 if packet ==
"FINISHED":
1102 evtNumber, tbin = packet
1103 if self.
cntr != cppyy.nullptr:
1104 self.
cntr.setEventCounter(evtNumber)
1108 self.
log.info(
"Fork new subworkers")
1132 self.
rTime += time.time() - t
1136 self.
log.warning(
"Did not Execute Event")
1137 self.
evt.clearStore()
1142 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
1143 self.
evt.clearStore()
1152 self.
inc.fireIncident(gbl.Incident(
"Worker",
"EndEvent"))
1153 self.eventLoopSyncer.set()
1154 self.
evt.clearStore()
1155 self.
log.info(
"Setting <Last> Event")
1158 self.
evcom.finalize()
1159 self.
log.info(
"Worker-%i Finished Processing Events" % (self.
nodeID))
1167 self.
log.info(
"Join subworkers")
1172 For some output writers, a check is performed to see if the event has
1173 executed certain algorithms.
1174 These reside in the AcceptAlgs property for those writers
1180 if hasattr(m.w,
"AcceptAlgs"):
1181 acc += m.w.AcceptAlgs
1182 if hasattr(m.w,
"RequireAlgs"):
1183 req += m.w.RequireAlgs
1184 if hasattr(m.w,
"VetoAlgs"):
1186 return (acc, req, vet)
1190 self.
a.algorithm(algName)._ialg.isExecuted()
1191 and self.
a.algorithm(algName)._ialg.filterPassed()
1199 Check the algorithm status for an event.
1200 Depending on output writer settings, the event
1201 may be declined based on various criteria.
1202 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1206 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1208 for name
in self.acceptAlgs:
1215 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
1216 for name
in self.requireAlgs:
1220 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
1223 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
1228 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
1237 def __init__(self, queues, events, params, subworkers):
1238 GMPComponent.__init__(self,
"Writer", -2, queues, events, params, subworkers)
1243 self.
log.name =
"Writer--2"
1249 self.config[
"ApplicationMgr"].TopAlg = []
1250 self.config[
"EventSelector"].Input = []
1252 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Writer]" + self.
msgFormat
1258 libc = ctypes.CDLL(
"libc.so.6")
1260 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1270 current = (current + 1) % self.nWorkers
1271 packet = self.
evcoms[current].receive(timeout=0.01)
1274 if packet ==
"FINISHED":
1275 self.
log.info(
"Writer got FINISHED flag : Worker %i" % (current))
1277 self.
status[current] =
True
1279 self.
log.info(
"FINISHED recd from all workers, break loop")
1284 evtNumber, tbin = packet
1288 self.
rTime += time.time() - t
1290 self.
evt.clearStore()
1291 self.eventLoopSyncer.set()
1292 self.
log.name =
"Writer--2"
1293 self.
log.info(
"Setting <Last> Event")
1297 [e.finalize()
for e
in self.
evcoms]
1302 self.
log.info(
"Histo Store rebuilt ok")
1304 self.
log.warning(
"Histo Store Error in Rebuild")
1324 self.
log.name =
"GaudiPython-Parallel-Logger"
1325 self.
log.info(
"GaudiPython Parallel Process Co-ordinator beginning")
1334 self.
hq = JoinableQueue()
1335 self.
fq = JoinableQueue()
1339 self.
nWorkers, self.
log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1345 limit=WAIT_SINGLE_EVENT,
1347 firstEvent=WAIT_FIRST_EVENT,
1350 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE
1382 init = self.
sInit.d[nodeID].event
1383 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1384 fin = self.
sFin.d[nodeID].event
1385 return (init, run, fin)
1388 eventQ = self.
qs[nodeID]
1391 return (eventQ, histQ, fsrQ)
1395 self.
log.name =
"GaudiPython-Parallel-Logger"
1396 self.
log.info(
"INITIALISING SYSTEM")
1402 sc = self.
sInit.syncAll(step=
"Initialise")
1410 self.
log.name =
"GaudiPython-Parallel-Logger"
1411 self.
log.info(
"RUNNING SYSTEM")
1412 sc = self.
sRun.syncAll(step=
"Run")
1420 self.
log.name =
"GaudiPython-Parallel-Logger"
1421 self.
log.info(
"FINALISING SYSTEM")
1422 sc = self.
sFin.syncAll(step=
"Finalise")
1430 self.
log.info(
"Cleanly join all Processes")
1432 self.
log.info(
"Report Total Success to Main.py")
1437 children = multiprocessing.active_children()
1460 rwk = JoinableQueue()
1462 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1465 d[-2] = (workersWriter,
None)
1467 d[i] = (rwk, workersWriter[i])