11 from __future__
import print_function
13 import multiprocessing
16 from multiprocessing
import Event, JoinableQueue, Process, cpu_count, current_process
17 from multiprocessing.queues
import Empty
20 from ROOT
import TBuffer, TBufferFile
22 from GaudiPython
import (
33 with warnings.catch_warnings():
34 warnings.simplefilter(
"ignore")
64 WAIT_INITIALISE = 60 * 10
65 WAIT_FIRST_EVENT = 60 * 3
66 WAIT_SINGLE_EVENT = 60 * 6
67 WAIT_FINALISE = 60 * 2
80 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
86 gbl.AIDA.IHistogram1D,
87 gbl.AIDA.IHistogram2D,
88 gbl.AIDA.IHistogram3D,
91 gbl.AIDA.IBaseHistogram,
95 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
99 "EvtCollectionStream":
"tuples",
100 "InputCopyStream":
"events",
101 "OutputStream":
"events",
102 "RecordStream":
"records",
103 "RunRecordStream":
"records",
104 "SequentialOutputStream":
"events",
105 "TagCollectionStream":
"tuples",
113 A class to represent a writer in the GaudiPython configuration
114 It can be non-trivial to access the name of the output file; it may be
115 specified in the DataSvc, or just on the writer, may be a list, or string
116 Also, there are three different types of writer (events, records, tuples)
117 so this bootstrap class provides easy access to this info while configuring
136 if hasattr(self.
w,
"Output"):
146 if hasattr(datasvc,
"Output"):
158 assert replaceThis.__class__.__name__ ==
"str"
159 assert withThis.__class__.__name__ ==
"str"
162 if old.__class__.__name__ ==
"list":
165 new = old.replace(replaceThis, withThis)
174 if hasattr(self.
w,
"ItemList"):
177 datasvc = config[self.
w.EvtDataSvc]
178 if hasattr(datasvc,
"ItemList"):
181 if hasattr(self.
w,
"OptItemList"):
184 datasvc = config[self.
w.EvtDataSvc]
185 if hasattr(datasvc,
"OptItemList"):
189 def set(self, key, output):
198 s +=
"Writer : %s\n" % (self.
wName)
199 s +=
"Writer Type : %s\n" % (self.
wType)
200 s +=
"Writer Output : %s\n" % (self.
output)
202 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
204 s +=
"Key for config : %s\n" % (self.
key)
205 s +=
"Output File : %s\n" % (self.
output)
206 s +=
"ItemList : %s\n" % (self.
ItemList)
217 GaudiPython algorithm used to clean up histos on the Reader and Workers
218 Only has a finalize method()
219 This retrieves a dictionary of path:histo objects and sends it to the
220 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
221 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
222 at the writer end, there will be an error.
226 PyAlgorithm.__init__(self)
235 self.
log.info(
"CollectHistograms Finalise (%s)" % (self.
_gmpc.nodeType))
236 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
237 ks = list(self.
_gmpc.hDict.keys())
238 self.
log.info(
"%i Objects in Histogram Store" % (len(ks)))
244 reps = int(len(ks) / chunk + 1)
245 for i
in range(reps):
246 someKeys = ks[i * chunk : (i + 1) * chunk]
247 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
248 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
250 self.
log.debug(
"Signalling end of histos to Writer")
251 self.
_gmpc.hq.put(
"HISTOS_SENT")
252 self.
log.debug(
"Waiting on Sync Event")
253 self.
_gmpc.sEvent.wait()
254 self.
log.debug(
"Histo Sync Event set, clearing and returning")
255 self.
_gmpc.hvt.clearStore()
256 root = gbl.DataObject()
258 self.
_gmpc.hvt.setRoot(
"/stat", root)
293 assert item.__class__.__name__ ==
"tuple"
294 startTransmission = time.time()
298 while self.
qout._buffer:
300 self.
qoutTime += time.time() - startTransmission
307 startWait = time.time()
309 itemIn = self.
qin.
get(timeout=timeout)
312 self.
qinTime += time.time() - startWait
314 if itemIn.__class__.__name__ ==
"tuple":
321 self.
_gmpc.log.warning(
322 "TASK_DONE called too often by : %s" % (self.
_gmpc.nodeType)
328 "Finalize Event Communicator : %s %s" % (self.
_gmpc, self.
_gmpc.nodeType)
333 if self.
_gmpc.nodeType ==
"Reader":
334 downstream = self.
_gmpc.nWorkers
335 elif self.
_gmpc.nodeType ==
"Writer":
337 elif self.
_gmpc.nodeType ==
"Worker":
340 for i
in range(downstream):
342 if self.
_gmpc.nodeType !=
"Writer":
348 self.
log.name =
"%s-%i Audit " % (self.
_gmpc.nodeType, self.
_gmpc.nodeID)
349 self.
log.info(
"Items Sent : %i" % (self.
nSent))
350 self.
log.info(
"Items Received : %i" % (self.
nRecv))
352 self.
log.info(
"Data Received : %i" % (self.
sizeRecv))
353 self.
log.info(
"Q-out Time : %5.2f" % (self.
qoutTime))
354 self.
log.info(
"Q-in Time : %5.2f" % (self.
qinTime))
361 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
362 self.
T = gaudiTESSerializer
376 root = gbl.DataObject()
378 self.
evt.setRoot(
"/Event", root)
380 self.
T.loadBuffer(tbuf)
381 self.
tLoad += time.time() - t
387 tb = TBufferFile(TBuffer.kWrite)
388 self.
T.dumpBuffer(tb)
389 self.
tDump += time.time() - t
395 evIn =
"Events Loaded : %i" % (self.
nIn)
396 evOut =
"Events Dumped : %i" % (self.
nOut)
398 dataIn =
"Data Loaded : %i" % (din)
399 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
401 avgIn =
"Avg Buf Loaded : %5.2f Mb" % (din / (self.
nIn * MB))
402 maxIn =
"Max Buf Loaded : %5.2f Mb" % (
max(self.
buffersIn) / MB)
404 avgIn =
"Avg Buf Loaded : N/A"
405 maxIn =
"Max Buf Loaded : N/A"
407 dataOut =
"Data Dumped : %i" % (dout)
408 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
410 avgOut =
"Avg Buf Dumped : %5.2f Mb" % (din / (self.
nOut * MB))
411 maxOut =
"Max Buf Dumped : %5.2f Mb" % (
max(self.
buffersOut) / MB)
413 avgOut =
"Avg Buf Dumped : N/A"
414 maxOut =
"Max Buf Dumped : N/A"
415 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
416 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
448 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
459 current_process().name = nodeType
466 self.nWorkers, self.sEvent, self.config, self.
log = params
478 ks = self.config.
keys()
480 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
487 qPair, histq, fq = self.
queues
544 from AlgSmapShot
import SmapShot
547 ss = SmapShot(logname=smapsLog)
548 self.
a.addAlgorithm(ss)
551 self.
fsr = self.
a.filerecordsvc()
552 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
553 self.
pers = self.
a.service(
"EventPersistencySvc",
"IAddressCreator")
554 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
564 root = gbl.DataObject()
566 self.
evt.setRoot(
"/Event", root)
567 self.
ts.loadBuffer(tbufferfile)
570 if self.
app !=
"Gauss":
576 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
580 n = self.
evt[path].evtNumber()
590 n = self.
evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
597 if self.
nIn > 0
or self.
nOut > 0:
600 self.
log.warning(
"Could not determine Event Number")
613 keys = [
"events",
"records",
"tuples",
"histos"]
618 wkeys = WRITERTYPES.keys()
619 for v
in self.config.values():
620 if v.__class__.__name__
in wkeys:
621 writerType = WRITERTYPES[v.__class__.__name__]
622 d[writerType].append(
MiniWriter(v, writerType, self.config))
624 self.
log.info(
"Writer Found : %s" % (v.name()))
627 if "HistogramPersistencySvc" in self.config.
keys():
628 hfile = self.config[
"HistogramPersistencySvc"].getProp(
"OutputFile")
629 d[
"histos"].append(hfile)
634 Method used by the GaudiPython algorithm CollectHistos
635 to obtain a dictionary of form { path : object }
636 representing the Histogram Store
638 nlist = self.
hvt.getHistoNames()
645 if type(o)
in aidatypes:
652 print(
"WARNING : no histograms to recover?")
662 if self.
app ==
"Gauss":
663 tool = self.
a.
tool(
"ToolSvc.EvtCounter")
666 self.
cntr = cppyy.nullptr
668 self.
iTime = time.time() - start
676 self.
fTime = time.time() - start
680 allTime =
"Alive Time : %5.2f" % (self.
tTime)
681 initTime =
"Init Time : %5.2f" % (self.
iTime)
682 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
683 runTime =
"Run Time : %5.2f" % (self.
rTime)
684 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
685 tup = (allTime, initTime, frstTime, runTime, finTime)
697 def __init__(self, queues, events, params, subworkers):
698 GMPComponent.__init__(self,
"Reader", -1, queues, events, params, subworkers)
705 self.config[
"ApplicationMgr"].TopAlg = []
706 self.config[
"ApplicationMgr"].OutStream = []
707 if "HistogramPersistencySvc" in self.config.
keys():
708 self.config[
"HistogramPersistencySvc"].OutputFile =
""
709 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Reader]" + self.
msgFormat
710 self.
evtMax = self.config[
"ApplicationMgr"].EvtMax
713 tb = TBufferFile(TBuffer.kWrite)
715 self.
ts.dumpBuffer(tb)
722 startFirst = time.time()
723 self.
log.info(
"Reader : First Event")
725 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
731 if not bool(self.
evt[
"/Event"]):
732 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
738 lst = self.
evt.getHistoNames()
741 lst = self.
evt.getList()
742 if self.
app ==
"DaVinci":
743 daqnode = self.
evt.retrieveObject(
"/Event/DAQ").
registry()
745 self.
evt.getList(daqnode, lst, daqnode.address().
par())
747 self.
log.critical(
"Reader could not acquire TES List!")
750 self.
log.info(
"Reader : TES List : %i items" % (len(lst)))
755 self.
log.info(
"First Event Sent")
758 self.eventLoopSyncer.set()
759 self.
evt.clearStore()
767 libc = ctypes.CDLL(
"libc.so.6")
769 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
771 startEngine = time.time()
772 self.
log.name =
"Reader"
773 self.
log.info(
"Reader Process starting")
780 self.
log.info(
"Reader Beginning Distribution")
783 self.
log.info(
"Reader First Event OK")
785 self.
log.critical(
"Reader Failed on First Event")
792 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
795 if not self.
stat.isSuccess():
796 self.
log.critical(
"Reader is Damaged!")
801 self.
rTime += time.time() - t
802 if not bool(self.
evt[
"/Event"]):
803 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
810 self.eventLoopSyncer.set()
811 self.
evt.clearStore()
812 self.
log.info(
"Setting <Last> Event")
816 self.
log.info(
"Reader : Event Distribution complete.")
817 self.
evcom.finalize()
819 self.
tTime = time.time() - startEngine
827 def __init__(self, workerID, queues, events, params, subworkers):
828 GMPComponent.__init__(
829 self,
"Worker", workerID, queues, events, params, subworkers
835 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
842 libc = ctypes.CDLL(
"libc.so.6")
844 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
847 startEngine = time.time()
848 msg = self.
a.service(
"MessageSvc")
849 msg.Format =
"%-13s " % (
"[" + self.
log.name +
"]") + self.
msgFormat
851 self.
log.name =
"Worker-%i" % (self.
nodeID)
852 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
856 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
863 packet = self.
evcom.receive()
868 if packet ==
"FINISHED":
870 evtNumber, tbin = packet
871 if self.
cntr != cppyy.nullptr:
872 self.
cntr.setEventCounter(evtNumber)
882 self.
rTime += time.time() - t
886 self.
log.name =
"Worker-%i" % (self.
nodeID)
887 self.
log.warning(
"Did not Execute Event")
888 self.
evt.clearStore()
893 self.
log.name =
"Worker-%i" % (self.
nodeID)
894 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
895 self.
evt.clearStore()
904 self.
inc.fireIncident(gbl.Incident(
"Subworker",
"EndEvent"))
905 self.eventLoopSyncer.set()
906 self.
evt.clearStore()
907 self.
log.name =
"Worker-%i" % (self.
nodeID)
908 self.
log.info(
"Setting <Last> Event %s" % (self.
nodeID))
911 self.
evcom.finalize()
914 self.
tTime = time.time() - startEngine
925 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
933 For some output writers, a check is performed to see if the event has
934 executed certain algorithms.
935 These reside in the AcceptAlgs property for those writers
941 if hasattr(m.w,
"AcceptAlgs"):
942 acc += m.w.AcceptAlgs
943 if hasattr(m.w,
"RequireAlgs"):
944 req += m.w.RequireAlgs
945 if hasattr(m.w,
"VetoAlgs"):
947 return (acc, req, vet)
951 self.
a.algorithm(algName)._ialg.isExecuted()
952 and self.
a.algorithm(algName)._ialg.filterPassed()
960 Check the algorithm status for an event.
961 Depending on output writer settings, the event
962 may be declined based on various criteria.
963 This is a transcript of the check that occurs in GaudiSvc::OutputStream
967 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
969 for name
in self.acceptAlgs:
976 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
977 for name
in self.requireAlgs:
981 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
984 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
989 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
998 def __init__(self, workerID, queues, events, params, subworkers):
999 GMPComponent.__init__(
1000 self,
"Worker", workerID, queues, events, params, subworkers
1006 self.
log.name =
"Worker-%i" % (self.
nodeID)
1007 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1015 self.config[
"EventSelector"].Input = []
1016 self.config[
"ApplicationMgr"].OutStream = []
1017 if "HistogramPersistencySvc" in self.config.
keys():
1018 self.config[
"HistogramPersistencySvc"].OutputFile =
""
1019 formatHead =
"[Worker-%i]" % (self.
nodeID)
1020 self.config[
"MessageSvc"].Format =
"%-13s " % formatHead + self.
msgFormat
1023 self.
log.info(
"Writer Type : %s\t : %i" % (key, len(lst)))
1028 newName = m.getNewName(
".",
".w%i." % (self.
nodeID))
1029 self.config[m.key].Output = newName
1039 if "ToolSvc.EvtCounter" not in self.config:
1040 from Configurables
import EvtCounter
1042 counter = EvtCounter()
1044 counter = self.config[
"ToolSvc.EvtCounter"]
1045 counter.UseIncident =
False
1048 self.
log.warning(
"Cannot configure EvtCounter")
1054 libc = ctypes.CDLL(
"libc.so.6")
1056 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1058 startEngine = time.time()
1059 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1064 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
1066 nEventWriters = len(self.
writerDict[
"events"])
1071 for item
in m.ItemList:
1072 hsh = item.find(
"#")
1076 for item
in m.OptItemList:
1077 hsh = item.find(
"#")
1080 optItemList.add(item)
1082 itemList -= optItemList
1083 for item
in sorted(itemList):
1084 self.
log.info(
" adding ItemList Item to ts : %s" % (item))
1085 self.
ts.addItem(item)
1086 for item
in sorted(optItemList):
1087 self.
log.info(
" adding Optional Item to ts : %s" % (item))
1088 self.
ts.addOptItem(item)
1090 self.
log.info(
"There is no Event Output for this app")
1096 packet = self.
evcom.receive()
1101 if packet ==
"FINISHED":
1103 evtNumber, tbin = packet
1104 if self.
cntr != cppyy.nullptr:
1105 self.
cntr.setEventCounter(evtNumber)
1109 self.
log.info(
"Fork new subworkers")
1133 self.
rTime += time.time() - t
1137 self.
log.warning(
"Did not Execute Event")
1138 self.
evt.clearStore()
1143 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
1144 self.
evt.clearStore()
1153 self.
inc.fireIncident(gbl.Incident(
"Worker",
"EndEvent"))
1154 self.eventLoopSyncer.set()
1155 self.
evt.clearStore()
1156 self.
log.info(
"Setting <Last> Event")
1159 self.
evcom.finalize()
1160 self.
log.info(
"Worker-%i Finished Processing Events" % (self.
nodeID))
1168 self.
log.info(
"Join subworkers")
1173 For some output writers, a check is performed to see if the event has
1174 executed certain algorithms.
1175 These reside in the AcceptAlgs property for those writers
1181 if hasattr(m.w,
"AcceptAlgs"):
1182 acc += m.w.AcceptAlgs
1183 if hasattr(m.w,
"RequireAlgs"):
1184 req += m.w.RequireAlgs
1185 if hasattr(m.w,
"VetoAlgs"):
1187 return (acc, req, vet)
1191 self.
a.algorithm(algName)._ialg.isExecuted()
1192 and self.
a.algorithm(algName)._ialg.filterPassed()
1200 Check the algorithm status for an event.
1201 Depending on output writer settings, the event
1202 may be declined based on various criteria.
1203 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1207 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1209 for name
in self.acceptAlgs:
1216 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
1217 for name
in self.requireAlgs:
1221 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
1224 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
1229 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
1238 def __init__(self, queues, events, params, subworkers):
1239 GMPComponent.__init__(self,
"Writer", -2, queues, events, params, subworkers)
1244 self.
log.name =
"Writer--2"
1250 self.config[
"ApplicationMgr"].TopAlg = []
1251 self.config[
"EventSelector"].Input = []
1253 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Writer]" + self.
msgFormat
1259 libc = ctypes.CDLL(
"libc.so.6")
1261 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1271 current = (current + 1) % self.nWorkers
1272 packet = self.
evcoms[current].receive(timeout=0.01)
1275 if packet ==
"FINISHED":
1276 self.
log.info(
"Writer got FINISHED flag : Worker %i" % (current))
1278 self.
status[current] =
True
1280 self.
log.info(
"FINISHED recd from all workers, break loop")
1285 evtNumber, tbin = packet
1289 self.
rTime += time.time() - t
1291 self.
evt.clearStore()
1292 self.eventLoopSyncer.set()
1293 self.
log.name =
"Writer--2"
1294 self.
log.info(
"Setting <Last> Event")
1298 [e.finalize()
for e
in self.
evcoms]
1303 self.
log.info(
"Histo Store rebuilt ok")
1305 self.
log.warning(
"Histo Store Error in Rebuild")
1325 self.
log.name =
"GaudiPython-Parallel-Logger"
1326 self.
log.info(
"GaudiPython Parallel Process Co-ordinator beginning")
1335 self.
hq = JoinableQueue()
1336 self.
fq = JoinableQueue()
1340 self.
nWorkers, self.
log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1346 limit=WAIT_SINGLE_EVENT,
1348 firstEvent=WAIT_FIRST_EVENT,
1351 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE
1383 init = self.
sInit.d[nodeID].event
1384 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1385 fin = self.
sFin.d[nodeID].event
1386 return (init, run, fin)
1389 eventQ = self.
qs[nodeID]
1392 return (eventQ, histQ, fsrQ)
1396 self.
log.name =
"GaudiPython-Parallel-Logger"
1397 self.
log.info(
"INITIALISING SYSTEM")
1403 sc = self.
sInit.syncAll(step=
"Initialise")
1411 self.
log.name =
"GaudiPython-Parallel-Logger"
1412 self.
log.info(
"RUNNING SYSTEM")
1413 sc = self.
sRun.syncAll(step=
"Run")
1421 self.
log.name =
"GaudiPython-Parallel-Logger"
1422 self.
log.info(
"FINALISING SYSTEM")
1423 sc = self.
sFin.syncAll(step=
"Finalise")
1431 self.
log.info(
"Cleanly join all Processes")
1433 self.
log.info(
"Report Total Success to Main.py")
1438 children = multiprocessing.active_children()
1461 rwk = JoinableQueue()
1463 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1466 d[-2] = (workersWriter,
None)
1468 d[i] = (rwk, workersWriter[i])