11 from __future__
import print_function
13 import multiprocessing
15 from multiprocessing
import Event, JoinableQueue, Process, cpu_count, current_process
16 from multiprocessing.queues
import Empty
19 from ROOT
import TBuffer, TBufferFile
21 from GaudiPython
import (
58 WAIT_INITIALISE = 60 * 5
59 WAIT_FIRST_EVENT = 60 * 3
60 WAIT_SINGLE_EVENT = 60 * 6
61 WAIT_FINALISE = 60 * 2
74 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
80 gbl.AIDA.IHistogram1D,
81 gbl.AIDA.IHistogram2D,
82 gbl.AIDA.IHistogram3D,
85 gbl.AIDA.IBaseHistogram,
89 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
93 "EvtCollectionStream":
"tuples",
94 "InputCopyStream":
"events",
95 "OutputStream":
"events",
96 "RecordStream":
"records",
97 "RunRecordStream":
"records",
98 "SequentialOutputStream":
"events",
99 "TagCollectionStream":
"tuples",
107 A class to represent a writer in the GaudiPython configuration
108 It can be non-trivial to access the name of the output file; it may be
109 specified in the DataSvc, or just on the writer, may be a list, or string
110 Also, there are three different types of writer (events, records, tuples)
111 so this bootstrap class provides easy access to this info while configuring
130 if hasattr(self.
w,
"Output"):
140 if hasattr(datasvc,
"Output"):
152 assert replaceThis.__class__.__name__ ==
"str"
153 assert withThis.__class__.__name__ ==
"str"
156 if old.__class__.__name__ ==
"list":
159 new = old.replace(replaceThis, withThis)
168 if hasattr(self.
w,
"ItemList"):
171 datasvc = config[self.
w.EvtDataSvc]
172 if hasattr(datasvc,
"ItemList"):
175 if hasattr(self.
w,
"OptItemList"):
178 datasvc = config[self.
w.EvtDataSvc]
179 if hasattr(datasvc,
"OptItemList"):
183 def set(self, key, output):
192 s +=
"Writer : %s\n" % (self.
wName)
193 s +=
"Writer Type : %s\n" % (self.
wType)
194 s +=
"Writer Output : %s\n" % (self.
output)
196 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
198 s +=
"Key for config : %s\n" % (self.
key)
199 s +=
"Output File : %s\n" % (self.
output)
200 s +=
"ItemList : %s\n" % (self.
ItemList)
211 GaudiPython algorithm used to clean up histos on the Reader and Workers
212 Only has a finalize method()
213 This retrieves a dictionary of path:histo objects and sends it to the
214 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
215 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
216 at the writer end, there will be an error.
220 PyAlgorithm.__init__(self)
229 self.
log.info(
"CollectHistograms Finalise (%s)" % (self.
_gmpc.nodeType))
230 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
231 ks = self.
_gmpc.hDict.keys()
232 self.
log.info(
"%i Objects in Histogram Store" % (len(ks)))
238 reps = len(ks) / chunk + 1
239 for i
in range(reps):
240 someKeys = ks[i * chunk : (i + 1) * chunk]
241 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
242 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
244 self.
log.debug(
"Signalling end of histos to Writer")
245 self.
_gmpc.hq.put(
"HISTOS_SENT")
246 self.
log.debug(
"Waiting on Sync Event")
247 self.
_gmpc.sEvent.wait()
248 self.
log.debug(
"Histo Sync Event set, clearing and returning")
249 self.
_gmpc.hvt.clearStore()
250 root = gbl.DataObject()
252 self.
_gmpc.hvt.setRoot(
"/stat", root)
287 assert item.__class__.__name__ ==
"tuple"
288 startTransmission = time.time()
292 while self.
qout._buffer:
294 self.
qoutTime += time.time() - startTransmission
301 startWait = time.time()
303 itemIn = self.
qin.
get(timeout=timeout)
306 self.
qinTime += time.time() - startWait
308 if itemIn.__class__.__name__ ==
"tuple":
315 self.
_gmpc.log.warning(
316 "TASK_DONE called too often by : %s" % (self.
_gmpc.nodeType)
322 "Finalize Event Communicator : %s %s" % (self.
_gmpc, self.
_gmpc.nodeType)
327 if self.
_gmpc.nodeType ==
"Reader":
328 downstream = self.
_gmpc.nWorkers
329 elif self.
_gmpc.nodeType ==
"Writer":
331 elif self.
_gmpc.nodeType ==
"Worker":
334 for i
in range(downstream):
336 if self.
_gmpc.nodeType !=
"Writer":
342 self.
log.name =
"%s-%i Audit " % (self.
_gmpc.nodeType, self.
_gmpc.nodeID)
343 self.
log.info(
"Items Sent : %i" % (self.
nSent))
344 self.
log.info(
"Items Received : %i" % (self.
nRecv))
346 self.
log.info(
"Data Received : %i" % (self.
sizeRecv))
347 self.
log.info(
"Q-out Time : %5.2f" % (self.
qoutTime))
348 self.
log.info(
"Q-in Time : %5.2f" % (self.
qinTime))
355 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
356 self.
T = gaudiTESSerializer
370 root = gbl.DataObject()
372 self.
evt.setRoot(
"/Event", root)
374 self.
T.loadBuffer(tbuf)
375 self.
tLoad += time.time() - t
381 tb = TBufferFile(TBuffer.kWrite)
382 self.
T.dumpBuffer(tb)
383 self.
tDump += time.time() - t
389 evIn =
"Events Loaded : %i" % (self.
nIn)
390 evOut =
"Events Dumped : %i" % (self.
nOut)
392 dataIn =
"Data Loaded : %i" % (din)
393 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
395 avgIn =
"Avg Buf Loaded : %5.2f Mb" % (din / (self.
nIn * MB))
396 maxIn =
"Max Buf Loaded : %5.2f Mb" % (
max(self.
buffersIn) / MB)
398 avgIn =
"Avg Buf Loaded : N/A"
399 maxIn =
"Max Buf Loaded : N/A"
401 dataOut =
"Data Dumped : %i" % (dout)
402 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
404 avgOut =
"Avg Buf Dumped : %5.2f Mb" % (din / (self.
nOut * MB))
405 maxOut =
"Max Buf Dumped : %5.2f Mb" % (
max(self.
buffersOut) / MB)
407 avgOut =
"Avg Buf Dumped : N/A"
408 maxOut =
"Max Buf Dumped : N/A"
409 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
410 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
442 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
453 current_process().name = nodeType
460 self.nWorkers, self.sEvent, self.config, self.
log = params
472 ks = self.config.
keys()
474 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
481 qPair, histq, fq = self.
queues
538 from AlgSmapShot
import SmapShot
541 ss = SmapShot(logname=smapsLog)
542 self.
a.addAlgorithm(ss)
545 self.
fsr = self.
a.filerecordsvc()
546 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
547 self.
pers = self.
a.service(
"EventPersistencySvc",
"IAddressCreator")
548 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
558 root = gbl.DataObject()
560 self.
evt.setRoot(
"/Event", root)
561 self.
ts.loadBuffer(tbufferfile)
564 if self.
app !=
"Gauss":
570 lst = [
"/Event/Gen/Header",
"/Event/Rec/Header"]
574 n = self.
evt[path].evtNumber()
584 n = self.
evt[
"/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
591 if self.
nIn > 0
or self.
nOut > 0:
594 self.
log.warning(
"Could not determine Event Number")
607 keys = [
"events",
"records",
"tuples",
"histos"]
612 wkeys = WRITERTYPES.keys()
613 for v
in self.config.values():
614 if v.__class__.__name__
in wkeys:
615 writerType = WRITERTYPES[v.__class__.__name__]
616 d[writerType].append(
MiniWriter(v, writerType, self.config))
618 self.
log.info(
"Writer Found : %s" % (v.name()))
621 if "HistogramPersistencySvc" in self.config.
keys():
622 hfile = self.config[
"HistogramPersistencySvc"].getProp(
"OutputFile")
623 d[
"histos"].append(hfile)
628 Method used by the GaudiPython algorithm CollectHistos
629 to obtain a dictionary of form { path : object }
630 representing the Histogram Store
632 nlist = self.
hvt.getHistoNames()
639 if type(o)
in aidatypes:
646 print(
"WARNING : no histograms to recover?")
656 if self.
app ==
"Gauss":
658 tool = self.
a.
tool(
"ToolSvc.EvtCounter")
663 self.
iTime = time.time() - start
671 self.
fTime = time.time() - start
675 allTime =
"Alive Time : %5.2f" % (self.
tTime)
676 initTime =
"Init Time : %5.2f" % (self.
iTime)
677 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
678 runTime =
"Run Time : %5.2f" % (self.
rTime)
679 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
680 tup = (allTime, initTime, frstTime, runTime, finTime)
692 def __init__(self, queues, events, params, subworkers):
693 GMPComponent.__init__(self,
"Reader", -1, queues, events, params, subworkers)
700 self.config[
"ApplicationMgr"].TopAlg = []
701 self.config[
"ApplicationMgr"].OutStream = []
702 if "HistogramPersistencySvc" in self.config.
keys():
703 self.config[
"HistogramPersistencySvc"].OutputFile =
""
704 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Reader]" + self.
msgFormat
705 self.
evtMax = self.config[
"ApplicationMgr"].EvtMax
708 tb = TBufferFile(TBuffer.kWrite)
710 self.
ts.dumpBuffer(tb)
717 startFirst = time.time()
718 self.
log.info(
"Reader : First Event")
720 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
726 if not bool(self.
evt[
"/Event"]):
727 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
733 lst = self.
evt.getHistoNames()
736 lst = self.
evt.getList()
737 if self.
app ==
"DaVinci":
738 daqnode = self.
evt.retrieveObject(
"/Event/DAQ").
registry()
740 self.
evt.getList(daqnode, lst, daqnode.address().
par())
742 self.
log.critical(
"Reader could not acquire TES List!")
745 self.
log.info(
"Reader : TES List : %i items" % (len(lst)))
750 self.
log.info(
"First Event Sent")
753 self.eventLoopSyncer.set()
754 self.
evt.clearStore()
762 libc = ctypes.CDLL(
"libc.so.6")
764 libc.prctl(15, name, 0, 0, 0)
766 startEngine = time.time()
767 self.
log.name =
"Reader"
768 self.
log.info(
"Reader Process starting")
775 self.
log.info(
"Reader Beginning Distribution")
778 self.
log.info(
"Reader First Event OK")
780 self.
log.critical(
"Reader Failed on First Event")
787 self.
log.info(
"evtMax( %i ) reached" % (self.
evtMax))
790 if not self.
stat.isSuccess():
791 self.
log.critical(
"Reader is Damaged!")
796 self.
rTime += time.time() - t
797 if not bool(self.
evt[
"/Event"]):
798 self.
log.warning(
"No More Events! (So Far : %i)" % (self.
nOut))
805 self.eventLoopSyncer.set()
806 self.
evt.clearStore()
807 self.
log.info(
"Setting <Last> Event")
811 self.
log.info(
"Reader : Event Distribution complete.")
812 self.
evcom.finalize()
814 self.
tTime = time.time() - startEngine
822 def __init__(self, workerID, queues, events, params, subworkers):
823 GMPComponent.__init__(
824 self,
"Worker", workerID, queues, events, params, subworkers
830 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
837 libc = ctypes.CDLL(
"libc.so.6")
839 libc.prctl(15, name, 0, 0, 0)
842 startEngine = time.time()
843 msg = self.
a.service(
"MessageSvc")
844 msg.Format =
"%-13s " % (
"[" + self.
log.name +
"]") + self.
msgFormat
846 self.
log.name =
"Worker-%i" % (self.
nodeID)
847 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
851 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
858 packet = self.
evcom.receive()
863 if packet ==
"FINISHED":
865 evtNumber, tbin = packet
866 if self.
cntr is not None:
868 self.
cntr.setEventCounter(evtNumber)
878 self.
rTime += time.time() - t
882 self.
log.name =
"Worker-%i" % (self.
nodeID)
883 self.
log.warning(
"Did not Execute Event")
884 self.
evt.clearStore()
889 self.
log.name =
"Worker-%i" % (self.
nodeID)
890 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
891 self.
evt.clearStore()
900 self.
inc.fireIncident(gbl.Incident(
"Subworker",
"EndEvent"))
901 self.eventLoopSyncer.set()
902 self.
evt.clearStore()
903 self.
log.name =
"Worker-%i" % (self.
nodeID)
904 self.
log.info(
"Setting <Last> Event %s" % (self.
nodeID))
907 self.
evcom.finalize()
910 self.
tTime = time.time() - startEngine
921 self.
inc = self.
a.service(
"IncidentSvc",
"IIncidentSvc")
929 For some output writers, a check is performed to see if the event has
930 executed certain algorithms.
931 These reside in the AcceptAlgs property for those writers
937 if hasattr(m.w,
"AcceptAlgs"):
938 acc += m.w.AcceptAlgs
939 if hasattr(m.w,
"RequireAlgs"):
940 req += m.w.RequireAlgs
941 if hasattr(m.w,
"VetoAlgs"):
943 return (acc, req, vet)
947 self.
a.algorithm(algName)._ialg.isExecuted()
948 and self.
a.algorithm(algName)._ialg.filterPassed()
956 Check the algorithm status for an event.
957 Depending on output writer settings, the event
958 may be declined based on various criteria.
959 This is a transcript of the check that occurs in GaudiSvc::OutputStream
963 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
965 for name
in self.acceptAlgs:
972 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
973 for name
in self.requireAlgs:
977 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
980 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
985 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
994 def __init__(self, workerID, queues, events, params, subworkers):
995 GMPComponent.__init__(
996 self,
"Worker", workerID, queues, events, params, subworkers
1002 self.
log.name =
"Worker-%i" % (self.
nodeID)
1003 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1012 self.config[
"EventSelector"].Input = []
1013 self.config[
"ApplicationMgr"].OutStream = []
1014 if "HistogramPersistencySvc" in self.config.
keys():
1015 self.config[
"HistogramPersistencySvc"].OutputFile =
""
1016 formatHead =
"[Worker-%i]" % (self.
nodeID)
1017 self.config[
"MessageSvc"].Format =
"%-13s " % formatHead + self.
msgFormat
1020 self.
log.info(
"Writer Type : %s\t : %i" % (key, len(lst)))
1025 newName = m.getNewName(
".",
".w%i." % (self.
nodeID))
1026 self.config[m.key].Output = newName
1036 if "ToolSvc.EvtCounter" not in self.config:
1037 from Configurables
import EvtCounter
1039 counter = EvtCounter()
1041 counter = self.config[
"ToolSvc.EvtCounter"]
1042 counter.UseIncident =
False
1045 self.
log.warning(
"Cannot configure EvtCounter")
1052 libc = ctypes.CDLL(
"libc.so.6")
1054 libc.prctl(15, name, 0, 0, 0)
1056 startEngine = time.time()
1057 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1062 self.
log.info(
"EVT WRITERS ON WORKER : %i" % (len(self.
writerDict[
"events"])))
1064 nEventWriters = len(self.
writerDict[
"events"])
1069 for item
in m.ItemList:
1070 hsh = item.find(
"#")
1074 for item
in m.OptItemList:
1075 hsh = item.find(
"#")
1078 optItemList.add(item)
1080 itemList -= optItemList
1081 for item
in sorted(itemList):
1082 self.
log.info(
" adding ItemList Item to ts : %s" % (item))
1083 self.
ts.addItem(item)
1084 for item
in sorted(optItemList):
1085 self.
log.info(
" adding Optional Item to ts : %s" % (item))
1086 self.
ts.addOptItem(item)
1088 self.
log.info(
"There is no Event Output for this app")
1094 packet = self.
evcom.receive()
1099 if packet ==
"FINISHED":
1101 evtNumber, tbin = packet
1102 if self.
cntr is not None:
1103 self.
cntr.setEventCounter(evtNumber)
1107 self.
log.info(
"Fork new subworkers")
1131 self.
rTime += time.time() - t
1135 self.
log.warning(
"Did not Execute Event")
1136 self.
evt.clearStore()
1141 self.
log.warning(
"Event did not pass : %i" % (evtNumber))
1142 self.
evt.clearStore()
1151 self.
inc.fireIncident(gbl.Incident(
"Worker",
"EndEvent"))
1152 self.eventLoopSyncer.set()
1153 self.
evt.clearStore()
1154 self.
log.info(
"Setting <Last> Event")
1157 self.
evcom.finalize()
1158 self.
log.info(
"Worker-%i Finished Processing Events" % (self.
nodeID))
1166 self.
log.info(
"Join subworkers")
1171 For some output writers, a check is performed to see if the event has
1172 executed certain algorithms.
1173 These reside in the AcceptAlgs property for those writers
1179 if hasattr(m.w,
"AcceptAlgs"):
1180 acc += m.w.AcceptAlgs
1181 if hasattr(m.w,
"RequireAlgs"):
1182 req += m.w.RequireAlgs
1183 if hasattr(m.w,
"VetoAlgs"):
1185 return (acc, req, vet)
1189 self.
a.algorithm(algName)._ialg.isExecuted()
1190 and self.
a.algorithm(algName)._ialg.filterPassed()
1198 Check the algorithm status for an event.
1199 Depending on output writer settings, the event
1200 may be declined based on various criteria.
1201 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1205 self.
log.debug(
"self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1207 for name
in self.acceptAlgs:
1214 self.
log.debug(
"self.requireAlgs is %s" % (str(self.requireAlgs)))
1215 for name
in self.requireAlgs:
1219 self.
log.info(
"Evt declined (requireAlgs) : %s" % (name))
1222 self.
log.debug(
"self.vetoAlgs is %s" % (str(self.
vetoAlgs)))
1227 self.
log.info(
"Evt declined : (vetoAlgs) : %s" % (name))
1236 def __init__(self, queues, events, params, subworkers):
1237 GMPComponent.__init__(self,
"Writer", -2, queues, events, params, subworkers)
1242 self.
log.name =
"Writer--2"
1248 self.config[
"ApplicationMgr"].TopAlg = []
1249 self.config[
"EventSelector"].Input = []
1251 self.config[
"MessageSvc"].Format =
"%-13s " %
"[Writer]" + self.
msgFormat
1257 libc = ctypes.CDLL(
"libc.so.6")
1259 libc.prctl(15, name, 0, 0, 0)
1269 current = (current + 1) % self.nWorkers
1270 packet = self.
evcoms[current].receive(timeout=0.01)
1273 if packet ==
"FINISHED":
1274 self.
log.info(
"Writer got FINISHED flag : Worker %i" % (current))
1276 self.
status[current] =
True
1278 self.
log.info(
"FINISHED recd from all workers, break loop")
1283 evtNumber, tbin = packet
1287 self.
rTime += time.time() - t
1289 self.
evt.clearStore()
1290 self.eventLoopSyncer.set()
1291 self.
log.name =
"Writer--2"
1292 self.
log.info(
"Setting <Last> Event")
1296 [e.finalize()
for e
in self.
evcoms]
1301 self.
log.info(
"Histo Store rebuilt ok")
1303 self.
log.warning(
"Histo Store Error in Rebuild")
1324 self.
log.name =
"GaudiPython-Parallel-Logger"
1325 self.
log.info(
"GaudiPython Parallel Process Co-ordinator beginning")
1334 self.
hq = JoinableQueue()
1335 self.
fq = JoinableQueue()
1339 self.
nWorkers, self.
log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1345 limit=WAIT_SINGLE_EVENT,
1347 firstEvent=WAIT_FIRST_EVENT,
1350 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE
1382 init = self.
sInit.d[nodeID].event
1383 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1384 fin = self.
sFin.d[nodeID].event
1385 return (init, run, fin)
1388 eventQ = self.
qs[nodeID]
1391 return (eventQ, histQ, fsrQ)
1396 self.
log.name =
"GaudiPython-Parallel-Logger"
1397 self.
log.info(
"INITIALISING SYSTEM")
1403 sc = self.
sInit.syncAll(step=
"Initialise")
1411 self.
log.name =
"GaudiPython-Parallel-Logger"
1412 self.
log.info(
"RUNNING SYSTEM")
1413 sc = self.
sRun.syncAll(step=
"Run")
1421 self.
log.name =
"GaudiPython-Parallel-Logger"
1422 self.
log.info(
"FINALISING SYSTEM")
1423 sc = self.
sFin.syncAll(step=
"Finalise")
1431 self.
log.info(
"Cleanly join all Processes")
1433 self.
log.info(
"Report Total Success to Main.py")
1438 children = multiprocessing.active_children()
1461 rwk = JoinableQueue()
1463 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1466 d[-2] = (workersWriter,
None)
1468 d[i] = (rwk, workersWriter[i])