2 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT
import TBufferFile, TBuffer
5 from multiprocessing
import Process, Queue, JoinableQueue, Event
6 from multiprocessing
import cpu_count, current_process
7 from multiprocessing.queues
import Empty
12 from ROOT
import TParallelMergingFile
40 WAIT_INITIALISE = 60 * 5
41 WAIT_FIRST_EVENT = 60 * 3
42 WAIT_SINGLE_EVENT = 60 * 6
43 WAIT_FINALISE = 60 * 2
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
60 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
61 gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
62 gbl.AIDA.IBaseHistogram)
65 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
69 'EvtCollectionStream':
"tuples",
70 'InputCopyStream':
"events",
71 'OutputStream':
"events",
72 'RecordStream':
"records",
73 'RunRecordStream':
"records",
74 'SequentialOutputStream':
"events",
75 'TagCollectionStream':
"tuples" 83 A class to represent a writer in the GaudiPython configuration 84 It can be non-trivial to access the name of the output file; it may be 85 specified in the DataSvc, or just on the writer, may be a list, or string 86 Also, there are three different types of writer (events, records, tuples) 87 so this bootstrap class provides easy access to this info while configuring 106 if hasattr(self.
w,
"Output"):
116 if hasattr(datasvc,
"Output"):
128 assert replaceThis.__class__.__name__ ==
'str' 129 assert withThis.__class__.__name__ ==
'str' 132 if old.__class__.__name__ ==
'list':
135 new = old.replace(replaceThis, withThis)
144 if hasattr(self.
w,
"ItemList"):
147 datasvc = config[self.w.EvtDataSvc]
148 if hasattr(datasvc,
"ItemList"):
151 if hasattr(self.
w,
"OptItemList"):
154 datasvc = config[self.w.EvtDataSvc]
155 if hasattr(datasvc,
"OptItemList"):
159 def set(self, key, output):
168 s +=
"Writer : %s\n" % (self.
wName)
169 s +=
"Writer Type : %s\n" % (self.
wType)
170 s +=
"Writer Output : %s\n" % (self.
output)
172 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
174 s +=
"Key for config : %s\n" % (self.
key)
175 s +=
"Output File : %s\n" % (self.
output)
176 s +=
"ItemList : %s\n" % (self.
ItemList)
187 GaudiPython algorithm used to clean up histos on the Reader and Workers 188 Only has a finalize method() 189 This retrieves a dictionary of path:histo objects and sends it to the 190 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 191 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 192 at the writer end, there will be an error. 196 PyAlgorithm.__init__(self)
206 'CollectHistograms Finalise (%s)' % (self._gmpc.nodeType))
207 self._gmpc.hDict = self._gmpc.dumpHistograms()
208 ks = self._gmpc.hDict.keys()
209 self.log.info(
'%i Objects in Histogram Store' % (len(ks)))
215 reps = len(ks) / chunk + 1
216 for i
in xrange(reps):
217 someKeys = ks[i * chunk:(i + 1) * chunk]
218 smalld = dict([(key, self._gmpc.hDict[key])
for key
in someKeys])
219 self._gmpc.hq.put((self._gmpc.nodeID, smalld))
221 self.log.debug(
'Signalling end of histos to Writer')
222 self._gmpc.hq.put(
'HISTOS_SENT')
223 self.log.debug(
'Waiting on Sync Event')
224 self._gmpc.sEvent.wait()
225 self.log.debug(
'Histo Sync Event set, clearing and returning')
226 self._gmpc.hvt.clearStore()
227 root = gbl.DataObject()
229 self._gmpc.hvt.setRoot(
'/stat', root)
264 assert item.__class__.__name__ ==
'tuple' 265 startTransmission = time.time()
269 while self.qout._buffer:
271 self.
qoutTime += time.time() - startTransmission
278 startWait = time.time()
280 itemIn = self.qin.get(timeout=timeout)
283 self.
qinTime += time.time() - startWait
285 if itemIn.__class__.__name__ ==
'tuple':
292 self._gmpc.log.warning(
293 'TASK_DONE called too often by : %s' % (self._gmpc.nodeType))
297 self.log.info(
'Finalize Event Communicator : %s %s' %
298 (self.
_gmpc, self._gmpc.nodeType))
302 if self._gmpc.nodeType ==
'Reader':
303 downstream = self._gmpc.nWorkers
304 elif self._gmpc.nodeType ==
'Writer':
306 elif self._gmpc.nodeType ==
'Worker':
309 for i
in xrange(downstream):
310 self.qout.put(
'FINISHED')
311 if self._gmpc.nodeType !=
'Writer':
317 self.log.name =
'%s-%i Audit ' % (self._gmpc.nodeType,
319 self.log.info(
'Items Sent : %i' % (self.
nSent))
320 self.log.info(
'Items Received : %i' % (self.
nRecv))
321 self.log.info(
'Data Sent : %i' % (self.
sizeSent))
322 self.log.info(
'Data Received : %i' % (self.
sizeRecv))
323 self.log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
324 self.log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
331 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
332 self.
T = gaudiTESSerializer
346 root = gbl.DataObject()
348 self.evt.setRoot(
'/Event', root)
350 self.T.loadBuffer(tbuf)
351 self.
tLoad += (time.time() - t)
353 self.buffersIn.append(tbuf.Length())
357 tb = TBufferFile(TBuffer.kWrite)
358 self.T.dumpBuffer(tb)
359 self.
tDump += (time.time() - t)
361 self.buffersOut.append(tb.Length())
365 evIn =
"Events Loaded : %i" % (self.
nIn)
366 evOut =
"Events Dumped : %i" % (self.
nOut)
368 dataIn =
"Data Loaded : %i" % (din)
369 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
371 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
372 % (din / (self.
nIn * MB))
373 maxIn =
"Max Buf Loaded : %5.2f Mb"\
376 avgIn =
"Avg Buf Loaded : N/A" 377 maxIn =
"Max Buf Loaded : N/A" 379 dataOut =
"Data Dumped : %i" % (dout)
380 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
382 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
383 % (din / (self.
nOut * MB))
384 maxOut =
"Max Buf Dumped : %5.2f Mb"\
387 avgOut =
"Avg Buf Dumped : N/A" 388 maxOut =
"Max Buf Dumped : N/A" 389 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
390 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
404 self.log.name =
"%s-%i TESSerializer" % (self.
nodeType, self.
nodeID)
420 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
431 current_process().name = nodeType
438 self.nWorkers, self.sEvent, self.config, self.
log = params
450 ks = self.config.keys()
452 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
459 qPair, histq, fq = self.
queues 473 self.evcoms.append(ec)
516 from AlgSmapShot
import SmapShot
518 ss = SmapShot(logname=smapsLog)
519 self.a.addAlgorithm(ss)
520 self.
evt = self.a.evtsvc()
521 self.
hvt = self.a.histsvc()
522 self.
fsr = self.a.filerecordsvc()
523 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
524 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator')
525 self.
ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.
pers)
536 root = gbl.DataObject()
538 self.evt.setRoot(
'/Event', root)
539 self.ts.loadBuffer(tbufferfile)
542 if self.
app !=
'Gauss':
548 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
552 n = self.
evt[path].evtNumber()
562 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
569 if self.
nIn > 0
or self.
nOut > 0:
572 self.log.warning(
'Could not determine Event Number')
585 keys = [
"events",
"records",
"tuples",
"histos"]
590 wkeys = WRITERTYPES.keys()
591 for v
in self.config.values():
592 if v.__class__.__name__
in wkeys:
593 writerType = WRITERTYPES[v.__class__.__name__]
594 d[writerType].append(
MiniWriter(v, writerType, self.config))
596 self.log.info(
'Writer Found : %s' % (v.name()))
599 if 'HistogramPersistencySvc' in self.config.keys():
600 hfile = self.config[
'HistogramPersistencySvc'].getProp(
602 d[
"histos"].append(hfile)
607 Method used by the GaudiPython algorithm CollectHistos 608 to obtain a dictionary of form { path : object } 609 representing the Histogram Store 611 nlist = self.hvt.getHistoNames()
618 if type(o)
in aidatypes:
625 print 'WARNING : no histograms to recover?' 635 if self.
app ==
'Gauss':
637 tool = self.a.tool(
"ToolSvc.EvtCounter")
638 self.
cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
642 self.
iTime = time.time() - start
649 self.finalEvent.set()
650 self.
fTime = time.time() - start
654 allTime =
"Alive Time : %5.2f" % (self.
tTime)
655 initTime =
"Init Time : %5.2f" % (self.
iTime)
656 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
657 runTime =
"Run Time : %5.2f" % (self.
rTime)
658 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
659 tup = (allTime, initTime, frstTime, runTime, finTime)
671 def __init__(self, queues, events, params, subworkers):
672 GMPComponent.__init__(self,
'Reader', -1, queues, events, params,
680 self.config[
'ApplicationMgr'].TopAlg = []
681 self.config[
'ApplicationMgr'].OutStream = []
682 if "HistogramPersistencySvc" in self.config.keys():
683 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 684 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Reader]' + \
686 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
689 tb = TBufferFile(TBuffer.kWrite)
691 self.ts.dumpBuffer(tb)
698 startFirst = time.time()
699 self.log.info(
'Reader : First Event')
701 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
707 if not bool(self.
evt[
'/Event']):
708 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
714 lst = self.evt.getHistoNames()
717 lst = self.evt.getList()
718 if self.
app ==
"DaVinci":
719 daqnode = self.evt.retrieveObject(
722 self.evt.getList(daqnode, lst,
723 daqnode.address().
par())
725 self.log.critical(
'Reader could not acquire TES List!')
728 self.log.info(
'Reader : TES List : %i items' % (len(lst)))
733 self.log.info(
'First Event Sent')
736 self.eventLoopSyncer.set()
737 self.evt.clearStore()
745 libc = ctypes.CDLL(
'libc.so.6')
747 libc.prctl(15, name, 0, 0, 0)
749 startEngine = time.time()
750 self.log.name =
'Reader' 751 self.log.info(
'Reader Process starting')
758 self.log.info(
'Reader Beginning Distribution')
761 self.log.info(
'Reader First Event OK')
763 self.log.critical(
'Reader Failed on First Event')
770 self.log.info(
'evtMax( %i ) reached' % (self.
evtMax))
773 if not self.stat.isSuccess():
774 self.log.critical(
'Reader is Damaged!')
779 self.
rTime += (time.time() - t)
780 if not bool(self.
evt[
'/Event']):
781 self.log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
788 self.eventLoopSyncer.set()
789 self.evt.clearStore()
790 self.log.info(
'Setting <Last> Event')
794 self.log.info(
'Reader : Event Distribution complete.')
795 self.evcom.finalize()
797 self.
tTime = time.time() - startEngine
805 def __init__(self, workerID, queues, events, params, subworkers):
806 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
812 self.log.info(
"Subworker-%i Created OK" % (self.
nodeID))
819 libc = ctypes.CDLL(
'libc.so.6')
821 libc.prctl(15, name, 0, 0, 0)
824 startEngine = time.time()
825 msg = self.a.service(
'MessageSvc')
826 msg.Format =
'%-13s ' % (
'[' + self.log.name +
']') + self.
msgFormat 828 self.log.name =
"Worker-%i" % (self.
nodeID)
829 self.log.info(
"Subworker %i starting Engine" % (self.
nodeID))
834 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
836 nEventWriters = len(self.
writerDict[
"events"])
842 packet = self.evcom.receive()
847 if packet ==
'FINISHED':
849 evtNumber, tbin = packet
850 if self.
cntr !=
None:
852 self.cntr.setEventCounter(evtNumber)
858 sc = self.a.executeEvent()
862 self.
rTime += (time.time() - t)
866 self.log.name =
"Worker-%i" % (self.
nodeID)
867 self.log.warning(
'Did not Execute Event')
868 self.evt.clearStore()
873 self.log.name =
"Worker-%i" % (self.
nodeID)
874 self.log.warning(
'Event did not pass : %i' % (evtNumber))
875 self.evt.clearStore()
884 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
885 self.eventLoopSyncer.set()
886 self.evt.clearStore()
887 self.log.name =
"Worker-%i" % (self.
nodeID)
888 self.log.info(
'Setting <Last> Event %s' % (self.
nodeID))
891 self.evcom.finalize()
893 self.filerecordsAgent.SendFileRecords()
894 self.
tTime = time.time() - startEngine
905 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
914 For some output writers, a check is performed to see if the event has 915 executed certain algorithms. 916 These reside in the AcceptAlgs property for those writers 922 if hasattr(m.w,
'AcceptAlgs'):
923 acc += m.w.AcceptAlgs
924 if hasattr(m.w,
'RequireAlgs'):
925 req += m.w.RequireAlgs
926 if hasattr(m.w,
'VetoAlgs'):
928 return (acc, req, vet)
931 if self.a.algorithm(algName)._ialg.isExecuted()\
932 and self.a.algorithm(algName)._ialg.filterPassed():
939 Check the algorithm status for an event. 940 Depending on output writer settings, the event 941 may be declined based on various criteria. 942 This is a transcript of the check that occurs in GaudiSvc::OutputStream 946 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
948 for name
in self.acceptAlgs:
955 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
956 for name
in self.requireAlgs:
960 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
963 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
968 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
977 def __init__(self, workerID, queues, events, params, subworkers):
978 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
984 self.log.name =
"Worker-%i" % (self.
nodeID)
985 self.log.info(
"Worker-%i Created OK" % (self.
nodeID))
994 self.config[
'EventSelector'].Input = []
995 self.config[
'ApplicationMgr'].OutStream = []
996 if "HistogramPersistencySvc" in self.config.keys():
997 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 998 formatHead =
'[Worker-%i]' % (self.
nodeID)
999 self.config[
'MessageSvc'].Format =
'%-13s ' % formatHead + \
1002 for key, lst
in self.writerDict.iteritems():
1003 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1008 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1009 self.config[m.key].Output = newName
1019 if "ToolSvc.EvtCounter" not in self.config:
1020 from Configurables
import EvtCounter
1021 counter = EvtCounter()
1023 counter = self.config[
"ToolSvc.EvtCounter"]
1024 counter.UseIncident =
False 1027 self.log.warning(
'Cannot configure EvtCounter')
1034 libc = ctypes.CDLL(
'libc.so.6')
1036 libc.prctl(15, name, 0, 0, 0)
1038 startEngine = time.time()
1039 self.log.info(
"Worker %i starting Engine" % (self.
nodeID))
1045 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
1047 nEventWriters = len(self.
writerDict[
"events"])
1052 for item
in m.ItemList:
1053 hsh = item.find(
'#')
1057 for item
in m.OptItemList:
1058 hsh = item.find(
'#')
1061 optItemList.add(item)
1063 itemList -= optItemList
1064 for item
in sorted(itemList):
1065 self.log.info(
' adding ItemList Item to ts : %s' % (item))
1066 self.ts.addItem(item)
1067 for item
in sorted(optItemList):
1068 self.log.info(
' adding Optional Item to ts : %s' % (item))
1069 self.ts.addOptItem(item)
1071 self.log.info(
'There is no Event Output for this app')
1077 packet = self.evcom.receive()
1082 if packet ==
'FINISHED':
1084 evtNumber, tbin = packet
1085 if self.
cntr !=
None:
1086 self.cntr.setEventCounter(evtNumber)
1093 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1094 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1099 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1107 sc = self.a.executeEvent()
1111 self.
rTime += (time.time() - t)
1115 self.log.warning(
'Did not Execute Event')
1116 self.evt.clearStore()
1121 self.log.warning(
'Event did not pass : %i' % (evtNumber))
1122 self.evt.clearStore()
1131 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1132 self.eventLoopSyncer.set()
1133 self.evt.clearStore()
1134 self.log.info(
'Setting <Last> Event')
1135 self.lastEvent.set()
1137 self.evcom.finalize()
1138 self.log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1140 self.filerecordsAgent.SendFileRecords()
1146 self.log.info(
'Join subworkers')
1151 For some output writers, a check is performed to see if the event has 1152 executed certain algorithms. 1153 These reside in the AcceptAlgs property for those writers 1159 if hasattr(m.w,
'AcceptAlgs'):
1160 acc += m.w.AcceptAlgs
1161 if hasattr(m.w,
'RequireAlgs'):
1162 req += m.w.RequireAlgs
1163 if hasattr(m.w,
'VetoAlgs'):
1165 return (acc, req, vet)
1168 if self.a.algorithm(algName)._ialg.isExecuted()\
1169 and self.a.algorithm(algName)._ialg.filterPassed():
1176 Check the algorithm status for an event. 1177 Depending on output writer settings, the event 1178 may be declined based on various criteria. 1179 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1183 self.log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1185 for name
in self.acceptAlgs:
1192 self.log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1193 for name
in self.requireAlgs:
1197 self.log.info(
'Evt declined (requireAlgs) : %s' % (name))
1200 self.log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1205 self.log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1214 def __init__(self, queues, events, params, subworkers):
1215 GMPComponent.__init__(self,
'Writer', -2, queues, events, params,
1221 self.log.name =
"Writer--2" 1227 self.config[
'ApplicationMgr'].TopAlg = []
1228 self.config[
'EventSelector'].Input = []
1230 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Writer]' + \
1234 for key, lst
in self.writerDict.iteritems():
1235 self.log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1242 self.log.debug(
'Processing Event Writer : %s' % (m))
1243 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers)
1244 self.config[m.key].Output = newName
1254 self.log.debug(
'Processing FileRecords Writer: %s' % (m))
1255 newName = m.getNewName(
1256 '.',
'.p%i.' % self.nWorkers, extra=
" OPT='RECREATE'")
1257 self.config[m.key].Output = newName
1260 hs =
"HistogramPersistencySvc" 1262 if hs
in self.config.keys():
1263 n = self.config[hs].OutputFile
1265 newName = self.config[hs].OutputFile.replace(
1266 '.',
'.p%i.' % (self.nWorkers))
1267 self.config[hs].OutputFile = newName
1273 libc = ctypes.CDLL(
'libc.so.6')
1275 libc.prctl(15, name, 0, 0, 0)
1277 startEngine = time.time()
1285 stopCriteria = self.nWorkers
1287 current = (current + 1) % self.nWorkers
1288 packet = self.
evcoms[current].receive(timeout=0.01)
1291 if packet ==
'FINISHED':
1293 'Writer got FINISHED flag : Worker %i' % (current))
1295 self.
status[current] =
True 1297 self.log.info(
'FINISHED recd from all workers, break loop')
1302 evtNumber, tbin = packet
1305 self.a.executeEvent()
1306 self.
rTime += (time.time() - t)
1308 self.evt.clearStore()
1309 self.eventLoopSyncer.set()
1310 self.log.name =
"Writer--2" 1311 self.log.info(
'Setting <Last> Event')
1312 self.lastEvent.set()
1315 [e.finalize()
for e
in self.
evcoms]
1317 sc = self.histoAgent.Receive()
1318 sc = self.histoAgent.RebuildHistoStore()
1320 self.log.info(
'Histo Store rebuilt ok')
1322 self.log.warning(
'Histo Store Error in Rebuild')
1325 sc = self.filerecordsAgent.Receive()
1326 self.filerecordsAgent.Rebuild()
1343 self.log.name =
'GaudiPython-Parallel-Logger' 1344 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1353 self.
hq = JoinableQueue()
1354 self.
fq = JoinableQueue()
1360 limit=WAIT_INITIALISE,
1361 step=STEP_INITIALISE)
1366 limit=WAIT_SINGLE_EVENT,
1368 firstEvent=WAIT_FIRST_EVENT)
1370 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1382 self.subworkers.append(sub)
1394 self.system.append(self.
writer)
1395 self.system.append(wk)
1396 self.system.append(self.
reader)
1399 init = self.sInit.d[nodeID].event
1400 run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1401 fin = self.sFin.d[nodeID].event
1402 return (init, run, fin)
1405 eventQ = self.
qs[nodeID]
1408 return (eventQ, histQ, fsrQ)
1413 self.log.name =
'GaudiPython-Parallel-Logger' 1414 self.log.info(
'INITIALISING SYSTEM')
1420 sc = self.sInit.syncAll(step=
"Initialise")
1428 self.log.name =
'GaudiPython-Parallel-Logger' 1429 self.log.info(
'RUNNING SYSTEM')
1430 sc = self.sRun.syncAll(step=
"Run")
1438 self.log.name =
'GaudiPython-Parallel-Logger' 1439 self.log.info(
'FINALISING SYSTEM')
1440 sc = self.sFin.syncAll(step=
"Finalise")
1448 self.log.info(
"Cleanly join all Processes")
1450 self.log.info(
"Report Total Success to Main.py")
1455 children = multiprocessing.active_children()
1465 self.system.reverse()
1478 rwk = JoinableQueue()
1480 workersWriter = [JoinableQueue()
for i
in xrange(self.
nWorkers)]
1483 d[-2] = (workersWriter,
None)
1485 d[i] = (rwk, workersWriter[i])
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
def IdentifyWriters(self)
def __init__(self, workerID, queues, events, params, subworkers)
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)