2 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT
import TBufferFile, TBuffer
5 from multiprocessing
import Process, Queue, JoinableQueue, Event
6 from multiprocessing
import cpu_count, current_process
7 from multiprocessing.queues
import Empty
12 from ROOT
import TParallelMergingFile
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
61 aidatypes = (gbl.AIDA.IHistogram,
62 gbl.AIDA.IHistogram1D,
63 gbl.AIDA.IHistogram2D,
64 gbl.AIDA.IHistogram3D,
67 gbl.AIDA.IBaseHistogram)
70 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
73 WRITERTYPES = {
'EvtCollectionStream':
"tuples",
74 'InputCopyStream':
"events",
75 'OutputStream':
"events",
76 'RecordStream':
"records",
77 'RunRecordStream':
"records",
78 'SequentialOutputStream':
"events",
79 'TagCollectionStream':
"tuples"}
86 A class to represent a writer in the GaudiPython configuration 87 It can be non-trivial to access the name of the output file; it may be 88 specified in the DataSvc, or just on the writer, may be a list, or string 89 Also, there are three different types of writer (events, records, tuples) 90 so this bootstrap class provides easy access to this info while configuring 109 if hasattr(self.
w,
"Output"):
119 if hasattr(datasvc,
"Output"):
131 assert replaceThis.__class__.__name__ ==
'str' 132 assert withThis.__class__.__name__ ==
'str' 135 if old.__class__.__name__ ==
'list':
138 new = old.replace(replaceThis, withThis)
147 if hasattr(self.
w,
"ItemList"):
150 datasvc = config[self.w.EvtDataSvc]
151 if hasattr(datasvc,
"ItemList"):
154 if hasattr(self.
w,
"OptItemList"):
157 datasvc = config[self.w.EvtDataSvc]
158 if hasattr(datasvc,
"OptItemList"):
162 def set(self, key, output):
171 s +=
"Writer : %s\n" % (self.
wName)
172 s +=
"Writer Type : %s\n" % (self.
wType)
173 s +=
"Writer Output : %s\n" % (self.
output)
175 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
177 s +=
"Key for config : %s\n" % (self.
key)
178 s +=
"Output File : %s\n" % (self.
output)
179 s +=
"ItemList : %s\n" % (self.
ItemList)
189 GaudiPython algorithm used to clean up histos on the Reader and Workers 190 Only has a finalize method() 191 This retrieves a dictionary of path:histo objects and sends it to the 192 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 193 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 194 at the writer end, there will be an error. 198 PyAlgorithm.__init__(self)
207 self.log.info(
'CollectHistograms Finalise (%s)' %
208 (self._gmpc.nodeType))
209 self._gmpc.hDict = self._gmpc.dumpHistograms()
210 ks = self._gmpc.hDict.keys()
211 self.log.info(
'%i Objects in Histogram Store' % (len(ks)))
217 reps = len(ks) / chunk + 1
218 for i
in xrange(reps):
219 someKeys = ks[i * chunk: (i + 1) * chunk]
220 smalld = dict([(key, self._gmpc.hDict[key])
for key
in someKeys])
221 self._gmpc.hq.put((self._gmpc.nodeID, smalld))
223 self.log.debug(
'Signalling end of histos to Writer')
224 self._gmpc.hq.put(
'HISTOS_SENT')
225 self.log.debug(
'Waiting on Sync Event')
226 self._gmpc.sEvent.wait()
227 self.log.debug(
'Histo Sync Event set, clearing and returning')
228 self._gmpc.hvt.clearStore()
229 root = gbl.DataObject()
231 self._gmpc.hvt.setRoot(
'/stat', root)
265 assert item.__class__.__name__ ==
'tuple' 266 startTransmission = time.time()
270 while self.qout._buffer:
272 self.
qoutTime += time.time() - startTransmission
279 startWait = time.time()
281 itemIn = self.qin.get(timeout=timeout)
284 self.
qinTime += time.time() - startWait
286 if itemIn.__class__.__name__ ==
'tuple':
293 self._gmpc.log.warning(
'TASK_DONE called too often by : %s' 294 % (self._gmpc.nodeType))
298 self.log.info(
'Finalize Event Communicator : %s %s' %
299 (self.
_gmpc, self._gmpc.nodeType))
303 if self._gmpc.nodeType ==
'Reader':
304 downstream = self._gmpc.nWorkers
305 elif self._gmpc.nodeType ==
'Writer':
307 elif self._gmpc.nodeType ==
'Worker':
310 for i
in xrange(downstream):
311 self.qout.put(
'FINISHED')
312 if self._gmpc.nodeType !=
'Writer':
318 self.log.name =
'%s-%i Audit ' % (self._gmpc.nodeType,
320 self.log.info(
'Items Sent : %i' % (self.
nSent))
321 self.log.info(
'Items Received : %i' % (self.
nRecv))
322 self.log.info(
'Data Sent : %i' % (self.
sizeSent))
323 self.log.info(
'Data Received : %i' % (self.
sizeRecv))
324 self.log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
325 self.log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
331 def __init__(self, gaudiTESSerializer, evtDataSvc,
332 nodeType, nodeID, log):
333 self.
T = gaudiTESSerializer
347 root = gbl.DataObject()
349 self.evt.setRoot(
'/Event', root)
351 self.T.loadBuffer(tbuf)
352 self.
tLoad += (time.time() - t)
354 self.buffersIn.append(tbuf.Length())
358 tb = TBufferFile(TBuffer.kWrite)
359 self.T.dumpBuffer(tb)
360 self.
tDump += (time.time() - t)
362 self.buffersOut.append(tb.Length())
366 evIn =
"Events Loaded : %i" % (self.
nIn)
367 evOut =
"Events Dumped : %i" % (self.
nOut)
369 dataIn =
"Data Loaded : %i" % (din)
370 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
372 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
373 % (din / (self.
nIn * MB))
374 maxIn =
"Max Buf Loaded : %5.2f Mb"\
377 avgIn =
"Avg Buf Loaded : N/A" 378 maxIn =
"Max Buf Loaded : N/A" 380 dataOut =
"Data Dumped : %i" % (dout)
381 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
383 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
384 % (din / (self.
nOut * MB))
385 maxOut =
"Max Buf Dumped : %5.2f Mb"\
388 avgOut =
"Avg Buf Dumped : N/A" 389 maxOut =
"Max Buf Dumped : N/A" 390 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
391 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
405 self.log.name =
"%s-%i TESSerializer" % (self.
nodeType, self.
nodeID)
420 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
431 current_process().name = nodeType
438 self.nWorkers, self.sEvent, self.config, self.
log = params
449 ks = self.config.keys()
451 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
458 qPair, histq, fq = self.
queues 472 self.evcoms.append(ec)
515 from AlgSmapShot
import SmapShot
517 ss = SmapShot(logname=smapsLog)
518 self.a.addAlgorithm(ss)
519 self.
evt = self.a.evtsvc()
520 self.
hvt = self.a.histsvc()
521 self.
fsr = self.a.filerecordsvc()
522 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
523 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator')
524 self.
ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.
pers)
535 root = gbl.DataObject()
537 self.evt.setRoot(
'/Event', root)
538 self.ts.loadBuffer(tbufferfile)
541 if self.
app !=
'Gauss':
547 lst = [
'/Event/Gen/Header',
552 n = self.
evt[path].evtNumber()
562 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
569 if self.
nIn > 0
or self.
nOut > 0:
572 self.log.warning(
'Could not determine Event Number')
585 keys = [
"events",
"records",
"tuples",
"histos"]
590 wkeys = WRITERTYPES.keys()
591 for v
in self.config.values():
592 if v.__class__.__name__
in wkeys:
593 writerType = WRITERTYPES[v.__class__.__name__]
594 d[writerType].append(
MiniWriter(v, writerType, self.config))
596 self.log.info(
'Writer Found : %s' % (v.name()))
599 if 'HistogramPersistencySvc' in self.config.keys():
600 hfile = self.config[
'HistogramPersistencySvc'].getProp(
602 d[
"histos"].append(hfile)
607 Method used by the GaudiPython algorithm CollectHistos 608 to obtain a dictionary of form { path : object } 609 representing the Histogram Store 611 nlist = self.hvt.getHistoNames()
618 if type(o)
in aidatypes:
625 print 'WARNING : no histograms to recover?' 635 if self.
app ==
'Gauss':
637 tool = self.a.tool(
"ToolSvc.EvtCounter")
638 self.
cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
642 self.
iTime = time.time() - start
649 self.finalEvent.set()
650 self.
fTime = time.time() - start
654 allTime =
"Alive Time : %5.2f" % (self.
tTime)
655 initTime =
"Init Time : %5.2f" % (self.
iTime)
656 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
657 runTime =
"Run Time : %5.2f" % (self.
rTime)
658 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
659 tup = (allTime, initTime, frstTime, runTime, finTime)
670 def __init__(self, queues, events, params, subworkers):
671 GMPComponent.__init__(self,
'Reader', -1, queues,
672 events, params, subworkers)
679 self.config[
'ApplicationMgr'].TopAlg = []
680 self.config[
'ApplicationMgr'].OutStream = []
681 if "HistogramPersistencySvc" in self.config.keys():
682 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 683 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M' 684 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
687 tb = TBufferFile(TBuffer.kWrite)
689 self.ts.dumpBuffer(tb)
696 startFirst = time.time()
697 self.log.info(
'Reader : First Event')
699 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
705 if not bool(self.
evt[
'/Event']):
706 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
712 lst = self.evt.getHistoNames()
715 lst = self.evt.getList()
716 if self.
app ==
"DaVinci":
717 daqnode = self.evt.retrieveObject(
721 daqnode, lst, daqnode.address().
par())
723 self.log.critical(
'Reader could not acquire TES List!')
726 self.log.info(
'Reader : TES List : %i items' % (len(lst)))
731 self.log.info(
'First Event Sent')
734 self.eventLoopSyncer.set()
735 self.evt.clearStore()
743 libc = ctypes.CDLL(
'libc.so.6')
745 libc.prctl(15, name, 0, 0, 0)
747 startEngine = time.time()
748 self.log.name =
'Reader' 749 self.log.info(
'Reader Process starting')
756 self.log.info(
'Reader Beginning Distribution')
759 self.log.info(
'Reader First Event OK')
761 self.log.critical(
'Reader Failed on First Event')
768 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
771 if not self.stat.isSuccess():
772 self.log.critical(
'Reader is Damaged!')
777 self.
rTime += (time.time() - t)
778 if not bool(self.
evt[
'/Event']):
779 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
786 self.eventLoopSyncer.set()
787 self.evt.clearStore()
788 self.log.info(
'Setting <Last> Event')
792 self.log.info(
'Reader : Event Distribution complete.')
793 self.evcom.finalize()
795 self.
tTime = time.time() - startEngine
802 def __init__(self, workerID, queues, events, params, subworkers):
803 GMPComponent.__init__(self,
'Worker', workerID,
804 queues, events, params, subworkers)
809 self.log.info(
"Subworker-%i Created OK" % (self.
nodeID))
816 libc = ctypes.CDLL(
'libc.so.6')
818 libc.prctl(15, name, 0, 0, 0)
821 startEngine = time.time()
822 msg = self.a.service(
'MessageSvc')
823 msg.Format =
'[' + self.log.name +
'] % F%18W%S%7W%R%T %0W%M' 825 self.log.name =
"Worker-%i" % (self.
nodeID)
826 self.log.info(
"Subworker %i starting Engine" % (self.
nodeID))
830 self.log.info(
'EVT WRITERS ON WORKER : %i' 833 nEventWriters = len(self.
writerDict[
"events"])
839 packet = self.evcom.receive()
844 if packet ==
'FINISHED':
846 evtNumber, tbin = packet
847 if self.
cntr !=
None:
849 self.cntr.setEventCounter(evtNumber)
855 sc = self.a.executeEvent()
859 self.
rTime += (time.time() - t)
863 self.log.name =
"Worker-%i" % (self.
nodeID)
864 self.log.warning(
'Did not Execute Event')
865 self.evt.clearStore()
870 self.log.name =
"Worker-%i" % (self.
nodeID)
871 self.log.warning(
'Event did not pass : %i' % (evtNumber))
872 self.evt.clearStore()
881 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
882 self.eventLoopSyncer.set()
883 self.evt.clearStore()
884 self.log.name =
"Worker-%i" % (self.
nodeID)
885 self.log.info(
'Setting <Last> Event %s' % (self.
nodeID))
888 self.evcom.finalize()
890 self.filerecordsAgent.SendFileRecords()
891 self.
tTime = time.time() - startEngine
902 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
911 For some output writers, a check is performed to see if the event has 912 executed certain algorithms. 913 These reside in the AcceptAlgs property for those writers 919 if hasattr(m.w,
'AcceptAlgs'):
920 acc += m.w.AcceptAlgs
921 if hasattr(m.w,
'RequireAlgs'):
922 req += m.w.RequireAlgs
923 if hasattr(m.w,
'VetoAlgs'):
925 return (acc, req, vet)
928 if self.a.algorithm(algName)._ialg.isExecuted()\
929 and self.a.algorithm(algName)._ialg.filterPassed():
936 Check the algorithm status for an event. 937 Depending on output writer settings, the event 938 may be declined based on various criteria. 939 This is a transcript of the check that occurs in GaudiSvc::OutputStream 943 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
945 for name
in self.acceptAlgs:
952 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
953 for name
in self.requireAlgs:
957 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
960 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
965 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
973 def __init__(self, workerID, queues, events, params, subworkers):
974 GMPComponent.__init__(self,
'Worker', workerID,
975 queues, events, params, subworkers)
980 self.log.name =
"Worker-%i" % (self.
nodeID)
981 self.log.info(
"Worker-%i Created OK" % (self.
nodeID))
990 self.config[
'EventSelector'].Input = []
991 self.config[
'ApplicationMgr'].OutStream = []
992 if "HistogramPersistencySvc" in self.config.keys():
993 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 994 formatHead =
'[Worker-%i] ' % (self.
nodeID)
995 self.config[
'MessageSvc'].Format = formatHead + \
996 '% F%18W%S%7W%R%T %0W%M' 998 for key, lst
in self.writerDict.iteritems():
999 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1004 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1005 self.config[m.key].Output = newName
1015 if "ToolSvc.EvtCounter" not in self.config:
1016 from Configurables
import EvtCounter
1017 counter = EvtCounter()
1019 counter = self.config[
"ToolSvc.EvtCounter"]
1020 counter.UseIncident =
False 1023 self.log.warning(
'Cannot configure EvtCounter')
1030 libc = ctypes.CDLL(
'libc.so.6')
1032 libc.prctl(15, name, 0, 0, 0)
1034 startEngine = time.time()
1035 self.log.info(
"Worker %i starting Engine" % (self.
nodeID))
1040 self.log.info(
'EVT WRITERS ON WORKER : %i' 1043 nEventWriters = len(self.
writerDict[
"events"])
1048 for item
in m.ItemList:
1049 hsh = item.find(
'#')
1053 for item
in m.OptItemList:
1054 hsh = item.find(
'#')
1057 optItemList.add(item)
1059 itemList -= optItemList
1060 for item
in sorted(itemList):
1061 self.log.info(
' adding ItemList Item to ts : %s' % (item))
1062 self.ts.addItem(item)
1063 for item
in sorted(optItemList):
1064 self.log.info(
' adding Optional Item to ts : %s' % (item))
1065 self.ts.addOptItem(item)
1067 self.log.info(
'There is no Event Output for this app')
1073 packet = self.evcom.receive()
1078 if packet ==
'FINISHED':
1080 evtNumber, tbin = packet
1081 if self.
cntr !=
None:
1082 self.cntr.setEventCounter(evtNumber)
1089 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1090 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1095 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1103 sc = self.a.executeEvent()
1107 self.
rTime += (time.time() - t)
1111 self.log.warning(
'Did not Execute Event')
1112 self.evt.clearStore()
1117 self.log.warning(
'Event did not pass : %i' % (evtNumber))
1118 self.evt.clearStore()
1127 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1128 self.eventLoopSyncer.set()
1129 self.evt.clearStore()
1130 self.log.info(
'Setting <Last> Event')
1131 self.lastEvent.set()
1133 self.evcom.finalize()
1134 self.log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1136 self.filerecordsAgent.SendFileRecords()
1142 self.log.info(
'Join subworkers')
1147 For some output writers, a check is performed to see if the event has 1148 executed certain algorithms. 1149 These reside in the AcceptAlgs property for those writers 1155 if hasattr(m.w,
'AcceptAlgs'):
1156 acc += m.w.AcceptAlgs
1157 if hasattr(m.w,
'RequireAlgs'):
1158 req += m.w.RequireAlgs
1159 if hasattr(m.w,
'VetoAlgs'):
1161 return (acc, req, vet)
1164 if self.a.algorithm(algName)._ialg.isExecuted()\
1165 and self.a.algorithm(algName)._ialg.filterPassed():
1172 Check the algorithm status for an event. 1173 Depending on output writer settings, the event 1174 may be declined based on various criteria. 1175 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1179 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1181 for name
in self.acceptAlgs:
1188 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1189 for name
in self.requireAlgs:
1193 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
1196 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1201 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1209 def __init__(self, queues, events, params, subworkers):
1210 GMPComponent.__init__(self,
'Writer', -2, queues,
1211 events, params, subworkers)
1216 self.log.name =
"Writer--2" 1222 self.config[
'ApplicationMgr'].TopAlg = []
1223 self.config[
'EventSelector'].Input = []
1225 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M' 1228 for key, lst
in self.writerDict.iteritems():
1229 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1236 self.log.debug(
'Processing Event Writer : %s' % (m))
1237 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers)
1238 self.config[m.key].Output = newName
1248 self.log.debug(
'Processing FileRecords Writer: %s' % (m))
1249 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers,
1250 extra=
" OPT='RECREATE'")
1251 self.config[m.key].Output = newName
1254 hs =
"HistogramPersistencySvc" 1256 if hs
in self.config.keys():
1257 n = self.config[hs].OutputFile
1259 newName = self.config[hs].OutputFile.replace(
'.',
1260 '.p%i.' % (self.nWorkers))
1261 self.config[hs].OutputFile = newName
1267 libc = ctypes.CDLL(
'libc.so.6')
1269 libc.prctl(15, name, 0, 0, 0)
1271 startEngine = time.time()
1279 stopCriteria = self.nWorkers
1281 current = (current + 1) % self.nWorkers
1282 packet = self.
evcoms[current].receive(timeout=0.01)
1285 if packet ==
'FINISHED':
1287 'Writer got FINISHED flag : Worker %i' % (current))
1289 self.
status[current] =
True 1291 self.log.info(
'FINISHED recd from all workers, break loop')
1296 evtNumber, tbin = packet
1299 self.a.executeEvent()
1300 self.
rTime += (time.time() - t)
1302 self.evt.clearStore()
1303 self.eventLoopSyncer.set()
1304 self.log.name =
"Writer--2" 1305 self.log.info(
'Setting <Last> Event')
1306 self.lastEvent.set()
1309 [e.finalize()
for e
in self.
evcoms]
1311 sc = self.histoAgent.Receive()
1312 sc = self.histoAgent.RebuildHistoStore()
1314 self.log.info(
'Histo Store rebuilt ok')
1316 self.log.warning(
'Histo Store Error in Rebuild')
1319 sc = self.filerecordsAgent.Receive()
1320 self.filerecordsAgent.Rebuild()
1336 self.log.name =
'GaudiPython-Parallel-Logger' 1337 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1346 self.
hq = JoinableQueue()
1347 self.
fq = JoinableQueue()
1351 limit=WAIT_INITIALISE,
1352 step=STEP_INITIALISE)
1355 limit=WAIT_SINGLE_EVENT,
1357 firstEvent=WAIT_FIRST_EVENT)
1359 limit=WAIT_FINALISE,
1372 self.subworkers.append(sub)
1382 self.system.append(self.
writer)
1383 self.system.append(wk)
1384 self.system.append(self.
reader)
1387 init = self.sInit.d[nodeID].event
1388 run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1389 fin = self.sFin.d[nodeID].event
1390 return (init, run, fin)
1393 eventQ = self.
qs[nodeID]
1396 return (eventQ, histQ, fsrQ)
1401 self.log.name =
'GaudiPython-Parallel-Logger' 1402 self.log.info(
'INITIALISING SYSTEM')
1408 sc = self.sInit.syncAll(step=
"Initialise")
1416 self.log.name =
'GaudiPython-Parallel-Logger' 1417 self.log.info(
'RUNNING SYSTEM')
1418 sc = self.sRun.syncAll(step=
"Run")
1426 self.log.name =
'GaudiPython-Parallel-Logger' 1427 self.log.info(
'FINALISING SYSTEM')
1428 sc = self.sFin.syncAll(step=
"Finalise")
1436 self.log.info(
"Cleanly join all Processes")
1438 self.log.info(
"Report Total Success to Main.py")
1443 children = multiprocessing.active_children()
1453 self.system.reverse()
1466 rwk = JoinableQueue()
1468 workersWriter = [JoinableQueue()
for i
in xrange(self.
nWorkers)]
1471 d[-2] = (workersWriter,
None)
1473 d[i] = (rwk, workersWriter[i])
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
def IdentifyWriters(self)
def __init__(self, workerID, queues, events, params, subworkers)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)