2 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT
import TBufferFile, TBuffer
5 from multiprocessing
import Process, Queue, JoinableQueue, Event
6 from multiprocessing
import cpu_count, current_process
7 from multiprocessing.queues
import Empty
12 from ROOT
import TParallelMergingFile
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
61 aidatypes = (gbl.AIDA.IHistogram,
62 gbl.AIDA.IHistogram1D,
63 gbl.AIDA.IHistogram2D,
64 gbl.AIDA.IHistogram3D,
67 gbl.AIDA.IBaseHistogram)
70 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
73 WRITERTYPES = {
'EvtCollectionStream':
"tuples",
74 'InputCopyStream':
"events",
75 'OutputStream':
"events",
76 'RecordStream':
"records",
77 'RunRecordStream':
"records",
78 'SequentialOutputStream':
"events",
79 'TagCollectionStream':
"tuples"}
86 A class to represent a writer in the GaudiPython configuration 87 It can be non-trivial to access the name of the output file; it may be 88 specified in the DataSvc, or just on the writer, may be a list, or string 89 Also, there are three different types of writer (events, records, tuples) 90 so this bootstrap class provides easy access to this info while configuring 109 if hasattr(self.
w,
"Output"):
119 if hasattr(datasvc,
"Output"):
131 assert replaceThis.__class__.__name__ ==
'str' 132 assert withThis.__class__.__name__ ==
'str' 135 if old.__class__.__name__ ==
'list':
138 new = old.replace(replaceThis, withThis)
147 if hasattr(self.
w,
"ItemList"):
150 datasvc = config[self.w.EvtDataSvc]
151 if hasattr(datasvc,
"ItemList"):
154 if hasattr(self.
w,
"OptItemList"):
157 datasvc = config[self.w.EvtDataSvc]
158 if hasattr(datasvc,
"OptItemList"):
162 def set(self, key, output):
171 s +=
"Writer : %s\n" % (self.
wName)
172 s +=
"Writer Type : %s\n" % (self.
wType)
173 s +=
"Writer Output : %s\n" % (self.
output)
175 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
177 s +=
"Key for config : %s\n" % (self.
key)
178 s +=
"Output File : %s\n" % (self.
output)
179 s +=
"ItemList : %s\n" % (self.
ItemList)
189 GaudiPython algorithm used to clean up histos on the Reader and Workers 190 Only has a finalize method() 191 This retrieves a dictionary of path:histo objects and sends it to the 192 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 193 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 194 at the writer end, there will be an error. 198 PyAlgorithm.__init__(self)
207 self.log.info(
'CollectHistograms Finalise (%s)' %
208 (self._gmpc.nodeType))
209 self._gmpc.hDict = self._gmpc.dumpHistograms()
210 ks = self._gmpc.hDict.keys()
211 self.log.info(
'%i Objects in Histogram Store' % (len(ks)))
217 reps = len(ks) / chunk + 1
218 for i
in xrange(reps):
219 someKeys = ks[i * chunk: (i + 1) * chunk]
220 smalld = dict([(key, self._gmpc.hDict[key])
for key
in someKeys])
221 self._gmpc.hq.put((self._gmpc.nodeID, smalld))
223 self.log.debug(
'Signalling end of histos to Writer')
224 self._gmpc.hq.put(
'HISTOS_SENT')
225 self.log.debug(
'Waiting on Sync Event')
226 self._gmpc.sEvent.wait()
227 self.log.debug(
'Histo Sync Event set, clearing and returning')
228 self._gmpc.hvt.clearStore()
229 root = gbl.DataObject()
231 self._gmpc.hvt.setRoot(
'/stat', root)
265 assert item.__class__.__name__ ==
'tuple' 266 startTransmission = time.time()
270 while self.qout._buffer:
272 self.
qoutTime += time.time() - startTransmission
279 startWait = time.time()
281 itemIn = self.qin.get(timeout=timeout)
284 self.
qinTime += time.time() - startWait
286 if itemIn.__class__.__name__ ==
'tuple':
293 self._gmpc.log.warning(
'TASK_DONE called too often by : %s' 294 % (self._gmpc.nodeType))
298 self.log.info(
'Finalize Event Communicator : %s %s' %
299 (self.
_gmpc, self._gmpc.nodeType))
303 if self._gmpc.nodeType ==
'Reader':
304 downstream = self._gmpc.nWorkers
305 elif self._gmpc.nodeType ==
'Writer':
307 elif self._gmpc.nodeType ==
'Worker':
310 for i
in xrange(downstream):
311 self.qout.put(
'FINISHED')
312 if self._gmpc.nodeType !=
'Writer':
318 self.log.name =
'%s-%i Audit ' % (self._gmpc.nodeType,
320 self.log.info(
'Items Sent : %i' % (self.
nSent))
321 self.log.info(
'Items Received : %i' % (self.
nRecv))
322 self.log.info(
'Data Sent : %i' % (self.
sizeSent))
323 self.log.info(
'Data Received : %i' % (self.
sizeRecv))
324 self.log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
325 self.log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
331 def __init__(self, gaudiTESSerializer, evtDataSvc,
332 nodeType, nodeID, log):
333 self.
T = gaudiTESSerializer
347 root = gbl.DataObject()
349 self.evt.setRoot(
'/Event', root)
351 self.T.loadBuffer(tbuf)
352 self.
tLoad += (time.time() - t)
354 self.buffersIn.append(tbuf.Length())
358 tb = TBufferFile(TBuffer.kWrite)
359 self.T.dumpBuffer(tb)
360 self.
tDump += (time.time() - t)
362 self.buffersOut.append(tb.Length())
366 evIn =
"Events Loaded : %i" % (self.
nIn)
367 evOut =
"Events Dumped : %i" % (self.
nOut)
369 dataIn =
"Data Loaded : %i" % (din)
370 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
372 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
373 % (din / (self.
nIn * MB))
374 maxIn =
"Max Buf Loaded : %5.2f Mb"\
377 avgIn =
"Avg Buf Loaded : N/A" 378 maxIn =
"Max Buf Loaded : N/A" 380 dataOut =
"Data Dumped : %i" % (dout)
381 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
383 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
384 % (din / (self.
nOut * MB))
385 maxOut =
"Max Buf Dumped : %5.2f Mb"\
388 avgOut =
"Avg Buf Dumped : N/A" 389 maxOut =
"Max Buf Dumped : N/A" 390 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
391 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
405 self.log.name =
"%s-%i TESSerializer" % (self.
nodeType, self.
nodeID)
420 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
431 current_process().name = nodeType
438 self.nWorkers, self.sEvent, self.config, self.
log = params
450 ks = self.config.keys()
452 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
459 qPair, histq, fq = self.
queues 473 self.evcoms.append(ec)
516 from AlgSmapShot
import SmapShot
518 ss = SmapShot(logname=smapsLog)
519 self.a.addAlgorithm(ss)
520 self.
evt = self.a.evtsvc()
521 self.
hvt = self.a.histsvc()
522 self.
fsr = self.a.filerecordsvc()
523 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
524 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator')
525 self.
ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.
pers)
536 root = gbl.DataObject()
538 self.evt.setRoot(
'/Event', root)
539 self.ts.loadBuffer(tbufferfile)
542 if self.
app !=
'Gauss':
548 lst = [
'/Event/Gen/Header',
553 n = self.
evt[path].evtNumber()
563 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
570 if self.
nIn > 0
or self.
nOut > 0:
573 self.log.warning(
'Could not determine Event Number')
586 keys = [
"events",
"records",
"tuples",
"histos"]
591 wkeys = WRITERTYPES.keys()
592 for v
in self.config.values():
593 if v.__class__.__name__
in wkeys:
594 writerType = WRITERTYPES[v.__class__.__name__]
595 d[writerType].append(
MiniWriter(v, writerType, self.config))
597 self.log.info(
'Writer Found : %s' % (v.name()))
600 if 'HistogramPersistencySvc' in self.config.keys():
601 hfile = self.config[
'HistogramPersistencySvc'].getProp(
603 d[
"histos"].append(hfile)
608 Method used by the GaudiPython algorithm CollectHistos 609 to obtain a dictionary of form { path : object } 610 representing the Histogram Store 612 nlist = self.hvt.getHistoNames()
619 if type(o)
in aidatypes:
626 print 'WARNING : no histograms to recover?' 636 if self.
app ==
'Gauss':
638 tool = self.a.tool(
"ToolSvc.EvtCounter")
639 self.
cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
643 self.
iTime = time.time() - start
650 self.finalEvent.set()
651 self.
fTime = time.time() - start
655 allTime =
"Alive Time : %5.2f" % (self.
tTime)
656 initTime =
"Init Time : %5.2f" % (self.
iTime)
657 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
658 runTime =
"Run Time : %5.2f" % (self.
rTime)
659 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
660 tup = (allTime, initTime, frstTime, runTime, finTime)
671 def __init__(self, queues, events, params, subworkers):
672 GMPComponent.__init__(self,
'Reader', -1, queues,
673 events, params, subworkers)
680 self.config[
'ApplicationMgr'].TopAlg = []
681 self.config[
'ApplicationMgr'].OutStream = []
682 if "HistogramPersistencySvc" in self.config.keys():
683 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 684 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Reader]' + \
686 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
689 tb = TBufferFile(TBuffer.kWrite)
691 self.ts.dumpBuffer(tb)
698 startFirst = time.time()
699 self.log.info(
'Reader : First Event')
701 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
707 if not bool(self.
evt[
'/Event']):
708 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
714 lst = self.evt.getHistoNames()
717 lst = self.evt.getList()
718 if self.
app ==
"DaVinci":
719 daqnode = self.evt.retrieveObject(
723 daqnode, lst, daqnode.address().
par())
725 self.log.critical(
'Reader could not acquire TES List!')
728 self.log.info(
'Reader : TES List : %i items' % (len(lst)))
733 self.log.info(
'First Event Sent')
736 self.eventLoopSyncer.set()
737 self.evt.clearStore()
745 libc = ctypes.CDLL(
'libc.so.6')
747 libc.prctl(15, name, 0, 0, 0)
749 startEngine = time.time()
750 self.log.name =
'Reader' 751 self.log.info(
'Reader Process starting')
758 self.log.info(
'Reader Beginning Distribution')
761 self.log.info(
'Reader First Event OK')
763 self.log.critical(
'Reader Failed on First Event')
770 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
773 if not self.stat.isSuccess():
774 self.log.critical(
'Reader is Damaged!')
779 self.
rTime += (time.time() - t)
780 if not bool(self.
evt[
'/Event']):
781 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
788 self.eventLoopSyncer.set()
789 self.evt.clearStore()
790 self.log.info(
'Setting <Last> Event')
794 self.log.info(
'Reader : Event Distribution complete.')
795 self.evcom.finalize()
797 self.
tTime = time.time() - startEngine
804 def __init__(self, workerID, queues, events, params, subworkers):
805 GMPComponent.__init__(self,
'Worker', workerID,
806 queues, events, params, subworkers)
811 self.log.info(
"Subworker-%i Created OK" % (self.
nodeID))
818 libc = ctypes.CDLL(
'libc.so.6')
820 libc.prctl(15, name, 0, 0, 0)
823 startEngine = time.time()
824 msg = self.a.service(
'MessageSvc')
825 msg.Format =
'%-13s ' % (
'['+self.log.name+
']') + self.
msgFormat 827 self.log.name =
"Worker-%i" % (self.
nodeID)
828 self.log.info(
"Subworker %i starting Engine" % (self.
nodeID))
832 self.log.info(
'EVT WRITERS ON WORKER : %i' 835 nEventWriters = len(self.
writerDict[
"events"])
841 packet = self.evcom.receive()
846 if packet ==
'FINISHED':
848 evtNumber, tbin = packet
849 if self.
cntr !=
None:
851 self.cntr.setEventCounter(evtNumber)
857 sc = self.a.executeEvent()
861 self.
rTime += (time.time() - t)
865 self.log.name =
"Worker-%i" % (self.
nodeID)
866 self.log.warning(
'Did not Execute Event')
867 self.evt.clearStore()
872 self.log.name =
"Worker-%i" % (self.
nodeID)
873 self.log.warning(
'Event did not pass : %i' % (evtNumber))
874 self.evt.clearStore()
883 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
884 self.eventLoopSyncer.set()
885 self.evt.clearStore()
886 self.log.name =
"Worker-%i" % (self.
nodeID)
887 self.log.info(
'Setting <Last> Event %s' % (self.
nodeID))
890 self.evcom.finalize()
892 self.filerecordsAgent.SendFileRecords()
893 self.
tTime = time.time() - startEngine
904 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
913 For some output writers, a check is performed to see if the event has 914 executed certain algorithms. 915 These reside in the AcceptAlgs property for those writers 921 if hasattr(m.w,
'AcceptAlgs'):
922 acc += m.w.AcceptAlgs
923 if hasattr(m.w,
'RequireAlgs'):
924 req += m.w.RequireAlgs
925 if hasattr(m.w,
'VetoAlgs'):
927 return (acc, req, vet)
930 if self.a.algorithm(algName)._ialg.isExecuted()\
931 and self.a.algorithm(algName)._ialg.filterPassed():
938 Check the algorithm status for an event. 939 Depending on output writer settings, the event 940 may be declined based on various criteria. 941 This is a transcript of the check that occurs in GaudiSvc::OutputStream 945 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
947 for name
in self.acceptAlgs:
954 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
955 for name
in self.requireAlgs:
959 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
962 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
967 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
975 def __init__(self, workerID, queues, events, params, subworkers):
976 GMPComponent.__init__(self,
'Worker', workerID,
977 queues, events, params, subworkers)
982 self.log.name =
"Worker-%i" % (self.
nodeID)
983 self.log.info(
"Worker-%i Created OK" % (self.
nodeID))
992 self.config[
'EventSelector'].Input = []
993 self.config[
'ApplicationMgr'].OutStream = []
994 if "HistogramPersistencySvc" in self.config.keys():
995 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 996 formatHead =
'[Worker-%i]' % (self.
nodeID)
997 self.config[
'MessageSvc'].Format =
'%-13s ' % formatHead + \
1000 for key, lst
in self.writerDict.iteritems():
1001 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1006 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1007 self.config[m.key].Output = newName
1017 if "ToolSvc.EvtCounter" not in self.config:
1018 from Configurables
import EvtCounter
1019 counter = EvtCounter()
1021 counter = self.config[
"ToolSvc.EvtCounter"]
1022 counter.UseIncident =
False 1025 self.log.warning(
'Cannot configure EvtCounter')
1032 libc = ctypes.CDLL(
'libc.so.6')
1034 libc.prctl(15, name, 0, 0, 0)
1036 startEngine = time.time()
1037 self.log.info(
"Worker %i starting Engine" % (self.
nodeID))
1042 self.log.info(
'EVT WRITERS ON WORKER : %i' 1045 nEventWriters = len(self.
writerDict[
"events"])
1050 for item
in m.ItemList:
1051 hsh = item.find(
'#')
1055 for item
in m.OptItemList:
1056 hsh = item.find(
'#')
1059 optItemList.add(item)
1061 itemList -= optItemList
1062 for item
in sorted(itemList):
1063 self.log.info(
' adding ItemList Item to ts : %s' % (item))
1064 self.ts.addItem(item)
1065 for item
in sorted(optItemList):
1066 self.log.info(
' adding Optional Item to ts : %s' % (item))
1067 self.ts.addOptItem(item)
1069 self.log.info(
'There is no Event Output for this app')
1075 packet = self.evcom.receive()
1080 if packet ==
'FINISHED':
1082 evtNumber, tbin = packet
1083 if self.
cntr !=
None:
1084 self.cntr.setEventCounter(evtNumber)
1091 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1092 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1097 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1105 sc = self.a.executeEvent()
1109 self.
rTime += (time.time() - t)
1113 self.log.warning(
'Did not Execute Event')
1114 self.evt.clearStore()
1119 self.log.warning(
'Event did not pass : %i' % (evtNumber))
1120 self.evt.clearStore()
1129 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1130 self.eventLoopSyncer.set()
1131 self.evt.clearStore()
1132 self.log.info(
'Setting <Last> Event')
1133 self.lastEvent.set()
1135 self.evcom.finalize()
1136 self.log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1138 self.filerecordsAgent.SendFileRecords()
1144 self.log.info(
'Join subworkers')
1149 For some output writers, a check is performed to see if the event has 1150 executed certain algorithms. 1151 These reside in the AcceptAlgs property for those writers 1157 if hasattr(m.w,
'AcceptAlgs'):
1158 acc += m.w.AcceptAlgs
1159 if hasattr(m.w,
'RequireAlgs'):
1160 req += m.w.RequireAlgs
1161 if hasattr(m.w,
'VetoAlgs'):
1163 return (acc, req, vet)
1166 if self.a.algorithm(algName)._ialg.isExecuted()\
1167 and self.a.algorithm(algName)._ialg.filterPassed():
1174 Check the algorithm status for an event. 1175 Depending on output writer settings, the event 1176 may be declined based on various criteria. 1177 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1181 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1183 for name
in self.acceptAlgs:
1190 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1191 for name
in self.requireAlgs:
1195 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
1198 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1203 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1211 def __init__(self, queues, events, params, subworkers):
1212 GMPComponent.__init__(self,
'Writer', -2, queues,
1213 events, params, subworkers)
1218 self.log.name =
"Writer--2" 1224 self.config[
'ApplicationMgr'].TopAlg = []
1225 self.config[
'EventSelector'].Input = []
1227 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Writer]' + \
1231 for key, lst
in self.writerDict.iteritems():
1232 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1239 self.log.debug(
'Processing Event Writer : %s' % (m))
1240 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers)
1241 self.config[m.key].Output = newName
1251 self.log.debug(
'Processing FileRecords Writer: %s' % (m))
1252 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers,
1253 extra=
" OPT='RECREATE'")
1254 self.config[m.key].Output = newName
1257 hs =
"HistogramPersistencySvc" 1259 if hs
in self.config.keys():
1260 n = self.config[hs].OutputFile
1262 newName = self.config[hs].OutputFile.replace(
'.',
1263 '.p%i.' % (self.nWorkers))
1264 self.config[hs].OutputFile = newName
1270 libc = ctypes.CDLL(
'libc.so.6')
1272 libc.prctl(15, name, 0, 0, 0)
1274 startEngine = time.time()
1282 stopCriteria = self.nWorkers
1284 current = (current + 1) % self.nWorkers
1285 packet = self.
evcoms[current].receive(timeout=0.01)
1288 if packet ==
'FINISHED':
1290 'Writer got FINISHED flag : Worker %i' % (current))
1292 self.
status[current] =
True 1294 self.log.info(
'FINISHED recd from all workers, break loop')
1299 evtNumber, tbin = packet
1302 self.a.executeEvent()
1303 self.
rTime += (time.time() - t)
1305 self.evt.clearStore()
1306 self.eventLoopSyncer.set()
1307 self.log.name =
"Writer--2" 1308 self.log.info(
'Setting <Last> Event')
1309 self.lastEvent.set()
1312 [e.finalize()
for e
in self.
evcoms]
1314 sc = self.histoAgent.Receive()
1315 sc = self.histoAgent.RebuildHistoStore()
1317 self.log.info(
'Histo Store rebuilt ok')
1319 self.log.warning(
'Histo Store Error in Rebuild')
1322 sc = self.filerecordsAgent.Receive()
1323 self.filerecordsAgent.Rebuild()
1339 self.log.name =
'GaudiPython-Parallel-Logger' 1340 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1349 self.
hq = JoinableQueue()
1350 self.
fq = JoinableQueue()
1354 limit=WAIT_INITIALISE,
1355 step=STEP_INITIALISE)
1358 limit=WAIT_SINGLE_EVENT,
1360 firstEvent=WAIT_FIRST_EVENT)
1362 limit=WAIT_FINALISE,
1375 self.subworkers.append(sub)
1385 self.system.append(self.
writer)
1386 self.system.append(wk)
1387 self.system.append(self.
reader)
1390 init = self.sInit.d[nodeID].event
1391 run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1392 fin = self.sFin.d[nodeID].event
1393 return (init, run, fin)
1396 eventQ = self.
qs[nodeID]
1399 return (eventQ, histQ, fsrQ)
1404 self.log.name =
'GaudiPython-Parallel-Logger' 1405 self.log.info(
'INITIALISING SYSTEM')
1411 sc = self.sInit.syncAll(step=
"Initialise")
1419 self.log.name =
'GaudiPython-Parallel-Logger' 1420 self.log.info(
'RUNNING SYSTEM')
1421 sc = self.sRun.syncAll(step=
"Run")
1429 self.log.name =
'GaudiPython-Parallel-Logger' 1430 self.log.info(
'FINALISING SYSTEM')
1431 sc = self.sFin.syncAll(step=
"Finalise")
1439 self.log.info(
"Cleanly join all Processes")
1441 self.log.info(
"Report Total Success to Main.py")
1446 children = multiprocessing.active_children()
1456 self.system.reverse()
1469 rwk = JoinableQueue()
1471 workersWriter = [JoinableQueue()
for i
in xrange(self.
nWorkers)]
1474 d[-2] = (workersWriter,
None)
1476 d[i] = (rwk, workersWriter[i])
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
def IdentifyWriters(self)
def __init__(self, workerID, queues, events, params, subworkers)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)