11 from __future__
import print_function
13 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
14 from ROOT
import TBufferFile, TBuffer
15 import multiprocessing
16 from multiprocessing
import Process, Queue, JoinableQueue, Event
17 from multiprocessing
import cpu_count, current_process
18 from multiprocessing.queues
import Empty
23 from ROOT
import TParallelMergingFile
51 WAIT_INITIALISE = 60 * 5
52 WAIT_FIRST_EVENT = 60 * 3
53 WAIT_SINGLE_EVENT = 60 * 6
54 WAIT_FINALISE = 60 * 2
67 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
71 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
72 gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
73 gbl.AIDA.IBaseHistogram)
76 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
80 'EvtCollectionStream':
"tuples",
81 'InputCopyStream':
"events",
82 'OutputStream':
"events",
83 'RecordStream':
"records",
84 'RunRecordStream':
"records",
85 'SequentialOutputStream':
"events",
86 'TagCollectionStream':
"tuples"
94 A class to represent a writer in the GaudiPython configuration
95 It can be non-trivial to access the name of the output file; it may be
96 specified in the DataSvc, or just on the writer, may be a list, or string
97 Also, there are three different types of writer (events, records, tuples)
98 so this bootstrap class provides easy access to this info while configuring
117 if hasattr(self.
w,
"Output"):
127 if hasattr(datasvc,
"Output"):
139 assert replaceThis.__class__.__name__ ==
'str'
140 assert withThis.__class__.__name__ ==
'str'
143 if old.__class__.__name__ ==
'list':
146 new = old.replace(replaceThis, withThis)
155 if hasattr(self.
w,
"ItemList"):
158 datasvc = config[self.
w.EvtDataSvc]
159 if hasattr(datasvc,
"ItemList"):
162 if hasattr(self.
w,
"OptItemList"):
165 datasvc = config[self.
w.EvtDataSvc]
166 if hasattr(datasvc,
"OptItemList"):
170 def set(self, key, output):
179 s +=
"Writer : %s\n" % (self.
wName)
180 s +=
"Writer Type : %s\n" % (self.
wType)
181 s +=
"Writer Output : %s\n" % (self.
output)
183 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
185 s +=
"Key for config : %s\n" % (self.
key)
186 s +=
"Output File : %s\n" % (self.
output)
187 s +=
"ItemList : %s\n" % (self.
ItemList)
198 GaudiPython algorithm used to clean up histos on the Reader and Workers
199 Only has a finalize method()
200 This retrieves a dictionary of path:histo objects and sends it to the
201 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
202 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
203 at the writer end, there will be an error.
207 PyAlgorithm.__init__(self)
217 'CollectHistograms Finalise (%s)' % (self.
_gmpc.nodeType))
218 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
219 ks = self.
_gmpc.hDict.keys()
220 self.
log.info(
'%i Objects in Histogram Store' % (len(ks)))
226 reps = len(ks) / chunk + 1
227 for i
in range(reps):
228 someKeys = ks[i * chunk:(i + 1) * chunk]
229 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
230 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
232 self.
log.debug(
'Signalling end of histos to Writer')
233 self.
_gmpc.hq.put(
'HISTOS_SENT')
234 self.
log.debug(
'Waiting on Sync Event')
235 self.
_gmpc.sEvent.wait()
236 self.
log.debug(
'Histo Sync Event set, clearing and returning')
237 self.
_gmpc.hvt.clearStore()
238 root = gbl.DataObject()
240 self.
_gmpc.hvt.setRoot(
'/stat', root)
275 assert item.__class__.__name__ ==
'tuple'
276 startTransmission = time.time()
280 while self.
qout._buffer:
282 self.
qoutTime += time.time() - startTransmission
289 startWait = time.time()
291 itemIn = self.
qin.
get(timeout=timeout)
294 self.
qinTime += time.time() - startWait
296 if itemIn.__class__.__name__ ==
'tuple':
303 self.
_gmpc.log.warning(
304 'TASK_DONE called too often by : %s' % (self.
_gmpc.nodeType))
308 self.
log.info(
'Finalize Event Communicator : %s %s' %
313 if self.
_gmpc.nodeType ==
'Reader':
314 downstream = self.
_gmpc.nWorkers
315 elif self.
_gmpc.nodeType ==
'Writer':
317 elif self.
_gmpc.nodeType ==
'Worker':
320 for i
in range(downstream):
322 if self.
_gmpc.nodeType !=
'Writer':
328 self.
log.name =
'%s-%i Audit ' % (self.
_gmpc.nodeType,
330 self.
log.info(
'Items Sent : %i' % (self.
nSent))
331 self.
log.info(
'Items Received : %i' % (self.
nRecv))
333 self.
log.info(
'Data Received : %i' % (self.
sizeRecv))
334 self.
log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
335 self.
log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
342 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
343 self.
T = gaudiTESSerializer
357 root = gbl.DataObject()
359 self.
evt.setRoot(
'/Event', root)
361 self.
T.loadBuffer(tbuf)
362 self.
tLoad += (time.time() - t)
368 tb = TBufferFile(TBuffer.kWrite)
369 self.
T.dumpBuffer(tb)
370 self.
tDump += (time.time() - t)
376 evIn =
"Events Loaded : %i" % (self.
nIn)
377 evOut =
"Events Dumped : %i" % (self.
nOut)
379 dataIn =
"Data Loaded : %i" % (din)
380 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
382 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
383 % (din / (self.
nIn * MB))
384 maxIn =
"Max Buf Loaded : %5.2f Mb"\
387 avgIn =
"Avg Buf Loaded : N/A"
388 maxIn =
"Max Buf Loaded : N/A"
390 dataOut =
"Data Dumped : %i" % (dout)
391 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
393 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
394 % (din / (self.
nOut * MB))
395 maxOut =
"Max Buf Dumped : %5.2f Mb"\
398 avgOut =
"Avg Buf Dumped : N/A"
399 maxOut =
"Max Buf Dumped : N/A"
400 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
401 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
431 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
442 current_process().name = nodeType
449 self.nWorkers, self.sEvent, self.config, self.
log = params
461 ks = self.config.
keys()
463 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
470 qPair, histq, fq = self.
queues
527 from AlgSmapShot
import SmapShot
529 ss = SmapShot(logname=smapsLog)
530 self.
a.addAlgorithm(ss)
533 self.
fsr = self.
a.filerecordsvc()
534 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
535 self.
pers = self.
a.service(
'EventPersistencySvc',
'IAddressCreator')
536 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
547 root = gbl.DataObject()
549 self.
evt.setRoot(
'/Event', root)
550 self.
ts.loadBuffer(tbufferfile)
553 if self.
app !=
'Gauss':
559 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
563 n = self.
evt[path].evtNumber()
573 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
580 if self.
nIn > 0
or self.
nOut > 0:
583 self.
log.warning(
'Could not determine Event Number')
596 keys = [
"events",
"records",
"tuples",
"histos"]
601 wkeys = WRITERTYPES.keys()
602 for v
in self.config.values():
603 if v.__class__.__name__
in wkeys:
604 writerType = WRITERTYPES[v.__class__.__name__]
605 d[writerType].append(
MiniWriter(v, writerType, self.config))
607 self.
log.info(
'Writer Found : %s' % (v.name()))
610 if 'HistogramPersistencySvc' in self.config.
keys():
611 hfile = self.config[
'HistogramPersistencySvc'].getProp(
613 d[
"histos"].append(hfile)
618 Method used by the GaudiPython algorithm CollectHistos
619 to obtain a dictionary of form { path : object }
620 representing the Histogram Store
622 nlist = self.
hvt.getHistoNames()
629 if type(o)
in aidatypes:
636 print(
'WARNING : no histograms to recover?')
646 if self.
app ==
'Gauss':
648 tool = self.
a.
tool(
"ToolSvc.EvtCounter")
653 self.
iTime = time.time() - start
661 self.
fTime = time.time() - start
665 allTime =
"Alive Time : %5.2f" % (self.
tTime)
666 initTime =
"Init Time : %5.2f" % (self.
iTime)
667 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
668 runTime =
"Run Time : %5.2f" % (self.
rTime)
669 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
670 tup = (allTime, initTime, frstTime, runTime, finTime)
682 def __init__(self, queues, events, params, subworkers):
683 GMPComponent.__init__(self,
'Reader', -1, queues, events, params,
691 self.config[
'ApplicationMgr'].TopAlg = []
692 self.config[
'ApplicationMgr'].OutStream = []
693 if "HistogramPersistencySvc" in self.config.
keys():
694 self.config[
'HistogramPersistencySvc'].OutputFile =
''
695 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Reader]' + \
697 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
700 tb = TBufferFile(TBuffer.kWrite)
702 self.
ts.dumpBuffer(tb)
709 startFirst = time.time()
710 self.
log.info(
'Reader : First Event')
712 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
718 if not bool(self.
evt[
'/Event']):
719 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
725 lst = self.
evt.getHistoNames()
728 lst = self.
evt.getList()
729 if self.
app ==
"DaVinci":
730 daqnode = self.
evt.retrieveObject(
733 self.
evt.getList(daqnode, lst,
734 daqnode.address().
par())
736 self.
log.critical(
'Reader could not acquire TES List!')
739 self.
log.info(
'Reader : TES List : %i items' % (len(lst)))
744 self.
log.info(
'First Event Sent')
747 self.eventLoopSyncer.set()
748 self.
evt.clearStore()
756 libc = ctypes.CDLL(
'libc.so.6')
758 libc.prctl(15, name, 0, 0, 0)
760 startEngine = time.time()
761 self.
log.name =
'Reader'
762 self.
log.info(
'Reader Process starting')
769 self.
log.info(
'Reader Beginning Distribution')
772 self.
log.info(
'Reader First Event OK')
774 self.
log.critical(
'Reader Failed on First Event')
781 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
784 if not self.
stat.isSuccess():
785 self.
log.critical(
'Reader is Damaged!')
790 self.
rTime += (time.time() - t)
791 if not bool(self.
evt[
'/Event']):
792 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
799 self.eventLoopSyncer.set()
800 self.
evt.clearStore()
801 self.
log.info(
'Setting <Last> Event')
805 self.
log.info(
'Reader : Event Distribution complete.')
806 self.
evcom.finalize()
808 self.
tTime = time.time() - startEngine
816 def __init__(self, workerID, queues, events, params, subworkers):
817 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
823 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
830 libc = ctypes.CDLL(
'libc.so.6')
832 libc.prctl(15, name, 0, 0, 0)
835 startEngine = time.time()
836 msg = self.
a.service(
'MessageSvc')
837 msg.Format =
'%-13s ' % (
'[' + self.
log.name +
']') + self.
msgFormat
839 self.
log.name =
"Worker-%i" % (self.
nodeID)
840 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
845 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
847 nEventWriters = len(self.
writerDict[
"events"])
853 packet = self.
evcom.receive()
858 if packet ==
'FINISHED':
860 evtNumber, tbin = packet
861 if self.
cntr !=
None:
863 self.
cntr.setEventCounter(evtNumber)
873 self.
rTime += (time.time() - t)
877 self.
log.name =
"Worker-%i" % (self.
nodeID)
878 self.
log.warning(
'Did not Execute Event')
879 self.
evt.clearStore()
884 self.
log.name =
"Worker-%i" % (self.
nodeID)
885 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
886 self.
evt.clearStore()
895 self.
inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
896 self.eventLoopSyncer.set()
897 self.
evt.clearStore()
898 self.
log.name =
"Worker-%i" % (self.
nodeID)
899 self.
log.info(
'Setting <Last> Event %s' % (self.
nodeID))
902 self.
evcom.finalize()
905 self.
tTime = time.time() - startEngine
916 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
925 For some output writers, a check is performed to see if the event has
926 executed certain algorithms.
927 These reside in the AcceptAlgs property for those writers
933 if hasattr(m.w,
'AcceptAlgs'):
934 acc += m.w.AcceptAlgs
935 if hasattr(m.w,
'RequireAlgs'):
936 req += m.w.RequireAlgs
937 if hasattr(m.w,
'VetoAlgs'):
939 return (acc, req, vet)
942 if self.
a.algorithm(algName)._ialg.isExecuted()\
943 and self.
a.algorithm(algName)._ialg.filterPassed():
950 Check the algorithm status for an event.
951 Depending on output writer settings, the event
952 may be declined based on various criteria.
953 This is a transcript of the check that occurs in GaudiSvc::OutputStream
957 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
959 for name
in self.acceptAlgs:
966 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
967 for name
in self.requireAlgs:
971 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
974 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
979 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
988 def __init__(self, workerID, queues, events, params, subworkers):
989 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
995 self.
log.name =
"Worker-%i" % (self.
nodeID)
996 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1005 self.config[
'EventSelector'].Input = []
1006 self.config[
'ApplicationMgr'].OutStream = []
1007 if "HistogramPersistencySvc" in self.config.
keys():
1008 self.config[
'HistogramPersistencySvc'].OutputFile =
''
1009 formatHead =
'[Worker-%i]' % (self.
nodeID)
1010 self.config[
'MessageSvc'].Format =
'%-13s ' % formatHead + \
1014 self.
log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1019 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1020 self.config[m.key].Output = newName
1030 if "ToolSvc.EvtCounter" not in self.config:
1031 from Configurables
import EvtCounter
1032 counter = EvtCounter()
1034 counter = self.config[
"ToolSvc.EvtCounter"]
1035 counter.UseIncident =
False
1038 self.
log.warning(
'Cannot configure EvtCounter')
1045 libc = ctypes.CDLL(
'libc.so.6')
1047 libc.prctl(15, name, 0, 0, 0)
1049 startEngine = time.time()
1050 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1056 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
1058 nEventWriters = len(self.
writerDict[
"events"])
1063 for item
in m.ItemList:
1064 hsh = item.find(
'#')
1068 for item
in m.OptItemList:
1069 hsh = item.find(
'#')
1072 optItemList.add(item)
1074 itemList -= optItemList
1075 for item
in sorted(itemList):
1076 self.
log.info(
' adding ItemList Item to ts : %s' % (item))
1077 self.
ts.addItem(item)
1078 for item
in sorted(optItemList):
1079 self.
log.info(
' adding Optional Item to ts : %s' % (item))
1080 self.
ts.addOptItem(item)
1082 self.
log.info(
'There is no Event Output for this app')
1088 packet = self.
evcom.receive()
1093 if packet ==
'FINISHED':
1095 evtNumber, tbin = packet
1096 if self.
cntr !=
None:
1097 self.
cntr.setEventCounter(evtNumber)
1101 self.
log.info(
"Fork new subworkers")
1105 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1117 self.
rTime += (time.time() - t)
1121 self.
log.warning(
'Did not Execute Event')
1122 self.
evt.clearStore()
1127 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
1128 self.
evt.clearStore()
1137 self.
inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1138 self.eventLoopSyncer.set()
1139 self.
evt.clearStore()
1140 self.
log.info(
'Setting <Last> Event')
1143 self.
evcom.finalize()
1144 self.
log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1152 self.
log.info(
'Join subworkers')
1157 For some output writers, a check is performed to see if the event has
1158 executed certain algorithms.
1159 These reside in the AcceptAlgs property for those writers
1165 if hasattr(m.w,
'AcceptAlgs'):
1166 acc += m.w.AcceptAlgs
1167 if hasattr(m.w,
'RequireAlgs'):
1168 req += m.w.RequireAlgs
1169 if hasattr(m.w,
'VetoAlgs'):
1171 return (acc, req, vet)
1174 if self.
a.algorithm(algName)._ialg.isExecuted()\
1175 and self.
a.algorithm(algName)._ialg.filterPassed():
1182 Check the algorithm status for an event.
1183 Depending on output writer settings, the event
1184 may be declined based on various criteria.
1185 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1189 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1191 for name
in self.acceptAlgs:
1198 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1199 for name
in self.requireAlgs:
1203 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
1206 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1211 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1220 def __init__(self, queues, events, params, subworkers):
1221 GMPComponent.__init__(self,
'Writer', -2, queues, events, params,
1227 self.
log.name =
"Writer--2"
1233 self.config[
'ApplicationMgr'].TopAlg = []
1234 self.config[
'EventSelector'].Input = []
1236 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Writer]' + \
1243 libc = ctypes.CDLL(
'libc.so.6')
1245 libc.prctl(15, name, 0, 0, 0)
1247 startEngine = time.time()
1255 stopCriteria = self.nWorkers
1257 current = (current + 1) % self.nWorkers
1258 packet = self.
evcoms[current].receive(timeout=0.01)
1261 if packet ==
'FINISHED':
1263 'Writer got FINISHED flag : Worker %i' % (current))
1265 self.
status[current] =
True
1267 self.
log.info(
'FINISHED recd from all workers, break loop')
1272 evtNumber, tbin = packet
1276 self.
rTime += (time.time() - t)
1278 self.
evt.clearStore()
1279 self.eventLoopSyncer.set()
1280 self.
log.name =
"Writer--2"
1281 self.
log.info(
'Setting <Last> Event')
1285 [e.finalize()
for e
in self.
evcoms]
1290 self.
log.info(
'Histo Store rebuilt ok')
1292 self.
log.warning(
'Histo Store Error in Rebuild')
1313 self.
log.name =
'GaudiPython-Parallel-Logger'
1314 self.
log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1323 self.
hq = JoinableQueue()
1324 self.
fq = JoinableQueue()
1330 limit=WAIT_INITIALISE,
1331 step=STEP_INITIALISE)
1336 limit=WAIT_SINGLE_EVENT,
1338 firstEvent=WAIT_FIRST_EVENT)
1340 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1369 init = self.
sInit.d[nodeID].event
1370 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1371 fin = self.
sFin.d[nodeID].event
1372 return (init, run, fin)
1375 eventQ = self.
qs[nodeID]
1378 return (eventQ, histQ, fsrQ)
1383 self.
log.name =
'GaudiPython-Parallel-Logger'
1384 self.
log.info(
'INITIALISING SYSTEM')
1390 sc = self.
sInit.syncAll(step=
"Initialise")
1398 self.
log.name =
'GaudiPython-Parallel-Logger'
1399 self.
log.info(
'RUNNING SYSTEM')
1400 sc = self.
sRun.syncAll(step=
"Run")
1408 self.
log.name =
'GaudiPython-Parallel-Logger'
1409 self.
log.info(
'FINALISING SYSTEM')
1410 sc = self.
sFin.syncAll(step=
"Finalise")
1418 self.
log.info(
"Cleanly join all Processes")
1420 self.
log.info(
"Report Total Success to Main.py")
1425 children = multiprocessing.active_children()
1448 rwk = JoinableQueue()
1450 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1453 d[-2] = (workersWriter,
None)
1455 d[i] = (rwk, workersWriter[i])