1 from __future__
import print_function
3 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
4 from ROOT
import TBufferFile, TBuffer
6 from multiprocessing
import Process, Queue, JoinableQueue, Event
7 from multiprocessing
import cpu_count, current_process
8 from multiprocessing.queues
import Empty
13 from ROOT
import TParallelMergingFile
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
61 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
62 gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
63 gbl.AIDA.IBaseHistogram)
66 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
70 'EvtCollectionStream':
"tuples",
71 'InputCopyStream':
"events",
72 'OutputStream':
"events",
73 'RecordStream':
"records",
74 'RunRecordStream':
"records",
75 'SequentialOutputStream':
"events",
76 'TagCollectionStream':
"tuples" 84 A class to represent a writer in the GaudiPython configuration 85 It can be non-trivial to access the name of the output file; it may be 86 specified in the DataSvc, or just on the writer, may be a list, or string 87 Also, there are three different types of writer (events, records, tuples) 88 so this bootstrap class provides easy access to this info while configuring 107 if hasattr(self.
w,
"Output"):
117 if hasattr(datasvc,
"Output"):
129 assert replaceThis.__class__.__name__ ==
'str' 130 assert withThis.__class__.__name__ ==
'str' 133 if old.__class__.__name__ ==
'list':
136 new = old.replace(replaceThis, withThis)
145 if hasattr(self.
w,
"ItemList"):
148 datasvc = config[self.
w.EvtDataSvc]
149 if hasattr(datasvc,
"ItemList"):
152 if hasattr(self.
w,
"OptItemList"):
155 datasvc = config[self.
w.EvtDataSvc]
156 if hasattr(datasvc,
"OptItemList"):
160 def set(self, key, output):
169 s +=
"Writer : %s\n" % (self.
wName)
170 s +=
"Writer Type : %s\n" % (self.
wType)
171 s +=
"Writer Output : %s\n" % (self.
output)
173 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
175 s +=
"Key for config : %s\n" % (self.
key)
176 s +=
"Output File : %s\n" % (self.
output)
177 s +=
"ItemList : %s\n" % (self.
ItemList)
188 GaudiPython algorithm used to clean up histos on the Reader and Workers 189 Only has a finalize method() 190 This retrieves a dictionary of path:histo objects and sends it to the 191 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 192 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 193 at the writer end, there will be an error. 197 PyAlgorithm.__init__(self)
207 'CollectHistograms Finalise (%s)' % (self.
_gmpc.nodeType))
208 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
209 ks = self.
_gmpc.hDict.keys()
210 self.
log.
info(
'%i Objects in Histogram Store' % (len(ks)))
216 reps = len(ks) / chunk + 1
217 for i
in range(reps):
218 someKeys = ks[i * chunk:(i + 1) * chunk]
219 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
220 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
222 self.
log.
debug(
'Signalling end of histos to Writer')
223 self.
_gmpc.hq.put(
'HISTOS_SENT')
224 self.
log.
debug(
'Waiting on Sync Event')
225 self.
_gmpc.sEvent.wait()
226 self.
log.
debug(
'Histo Sync Event set, clearing and returning')
227 self.
_gmpc.hvt.clearStore()
228 root = gbl.DataObject()
230 self.
_gmpc.hvt.setRoot(
'/stat', root)
265 assert item.__class__.__name__ ==
'tuple' 266 startTransmission = time.time()
270 while self.
qout._buffer:
272 self.
qoutTime += time.time() - startTransmission
279 startWait = time.time()
281 itemIn = self.
qin.
get(timeout=timeout)
284 self.
qinTime += time.time() - startWait
286 if itemIn.__class__.__name__ ==
'tuple':
293 self.
_gmpc.log.warning(
294 'TASK_DONE called too often by : %s' % (self.
_gmpc.nodeType))
298 self.
log.info(
'Finalize Event Communicator : %s %s' %
303 if self.
_gmpc.nodeType ==
'Reader':
304 downstream = self.
_gmpc.nWorkers
305 elif self.
_gmpc.nodeType ==
'Writer':
307 elif self.
_gmpc.nodeType ==
'Worker':
310 for i
in range(downstream):
312 if self.
_gmpc.nodeType !=
'Writer':
318 self.
log.name =
'%s-%i Audit ' % (self.
_gmpc.nodeType,
320 self.
log.info(
'Items Sent : %i' % (self.
nSent))
321 self.
log.info(
'Items Received : %i' % (self.
nRecv))
323 self.
log.info(
'Data Received : %i' % (self.
sizeRecv))
324 self.
log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
325 self.
log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
332 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
333 self.
T = gaudiTESSerializer
347 root = gbl.DataObject()
349 self.
evt.setRoot(
'/Event', root)
351 self.
T.loadBuffer(tbuf)
352 self.
tLoad += (time.time() - t)
358 tb = TBufferFile(TBuffer.kWrite)
359 self.
T.dumpBuffer(tb)
360 self.
tDump += (time.time() - t)
366 evIn =
"Events Loaded : %i" % (self.
nIn)
367 evOut =
"Events Dumped : %i" % (self.
nOut)
369 dataIn =
"Data Loaded : %i" % (din)
370 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
372 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
373 % (din / (self.
nIn * MB))
374 maxIn =
"Max Buf Loaded : %5.2f Mb"\
377 avgIn =
"Avg Buf Loaded : N/A" 378 maxIn =
"Max Buf Loaded : N/A" 380 dataOut =
"Data Dumped : %i" % (dout)
381 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
383 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
384 % (din / (self.
nOut * MB))
385 maxOut =
"Max Buf Dumped : %5.2f Mb"\
388 avgOut =
"Avg Buf Dumped : N/A" 389 maxOut =
"Max Buf Dumped : N/A" 390 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
391 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
421 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
432 current_process().name = nodeType
439 self.nWorkers, self.sEvent, self.config, self.
log = params
451 ks = self.config.keys()
453 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
460 qPair, histq, fq = self.
queues 517 from AlgSmapShot
import SmapShot
519 ss = SmapShot(logname=smapsLog)
520 self.
a.addAlgorithm(ss)
523 self.
fsr = self.
a.filerecordsvc()
524 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
525 self.
pers = self.
a.service(
'EventPersistencySvc',
'IAddressCreator')
526 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
537 root = gbl.DataObject()
539 self.
evt.setRoot(
'/Event', root)
540 self.
ts.loadBuffer(tbufferfile)
543 if self.
app !=
'Gauss':
549 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
553 n = self.
evt[path].evtNumber()
563 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
570 if self.
nIn > 0
or self.
nOut > 0:
573 self.
log.warning(
'Could not determine Event Number')
586 keys = [
"events",
"records",
"tuples",
"histos"]
591 wkeys = WRITERTYPES.keys()
592 for v
in self.config.values():
593 if v.__class__.__name__
in wkeys:
594 writerType = WRITERTYPES[v.__class__.__name__]
595 d[writerType].append(
MiniWriter(v, writerType, self.config))
597 self.
log.info(
'Writer Found : %s' % (v.name()))
600 if 'HistogramPersistencySvc' in self.config.keys():
601 hfile = self.config[
'HistogramPersistencySvc'].getProp(
603 d[
"histos"].append(hfile)
608 Method used by the GaudiPython algorithm CollectHistos 609 to obtain a dictionary of form { path : object } 610 representing the Histogram Store 612 nlist = self.
hvt.getHistoNames()
619 if type(o)
in aidatypes:
626 print(
'WARNING : no histograms to recover?')
636 if self.
app ==
'Gauss':
638 tool = self.
a.tool(
"ToolSvc.EvtCounter")
643 self.
iTime = time.time() - start
651 self.
fTime = time.time() - start
655 allTime =
"Alive Time : %5.2f" % (self.
tTime)
656 initTime =
"Init Time : %5.2f" % (self.
iTime)
657 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
658 runTime =
"Run Time : %5.2f" % (self.
rTime)
659 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
660 tup = (allTime, initTime, frstTime, runTime, finTime)
672 def __init__(self, queues, events, params, subworkers):
673 GMPComponent.__init__(self,
'Reader', -1, queues, events, params,
681 self.config[
'ApplicationMgr'].TopAlg = []
682 self.config[
'ApplicationMgr'].OutStream = []
683 if "HistogramPersistencySvc" in self.config.keys():
684 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 685 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Reader]' + \
687 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
690 tb = TBufferFile(TBuffer.kWrite)
692 self.
ts.dumpBuffer(tb)
699 startFirst = time.time()
700 self.
log.info(
'Reader : First Event')
702 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
708 if not bool(self.
evt[
'/Event']):
709 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
715 lst = self.
evt.getHistoNames()
718 lst = self.
evt.getList()
719 if self.
app ==
"DaVinci":
720 daqnode = self.
evt.retrieveObject(
723 self.
evt.getList(daqnode, lst,
724 daqnode.address().
par())
726 self.
log.critical(
'Reader could not acquire TES List!')
729 self.
log.info(
'Reader : TES List : %i items' % (len(lst)))
734 self.
log.info(
'First Event Sent')
737 self.eventLoopSyncer.set()
738 self.
evt.clearStore()
746 libc = ctypes.CDLL(
'libc.so.6')
748 libc.prctl(15, name, 0, 0, 0)
750 startEngine = time.time()
751 self.
log.name =
'Reader' 752 self.
log.info(
'Reader Process starting')
759 self.
log.info(
'Reader Beginning Distribution')
762 self.
log.info(
'Reader First Event OK')
764 self.
log.critical(
'Reader Failed on First Event')
771 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
774 if not self.
stat.isSuccess():
775 self.
log.critical(
'Reader is Damaged!')
780 self.
rTime += (time.time() - t)
781 if not bool(self.
evt[
'/Event']):
782 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
789 self.eventLoopSyncer.set()
790 self.
evt.clearStore()
791 self.
log.info(
'Setting <Last> Event')
795 self.
log.info(
'Reader : Event Distribution complete.')
796 self.
evcom.finalize()
798 self.
tTime = time.time() - startEngine
806 def __init__(self, workerID, queues, events, params, subworkers):
807 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
813 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
820 libc = ctypes.CDLL(
'libc.so.6')
822 libc.prctl(15, name, 0, 0, 0)
825 startEngine = time.time()
826 msg = self.
a.service(
'MessageSvc')
827 msg.Format =
'%-13s ' % (
'[' + self.
log.name +
']') + self.
msgFormat 829 self.
log.name =
"Worker-%i" % (self.
nodeID)
830 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
835 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
837 nEventWriters = len(self.
writerDict[
"events"])
843 packet = self.
evcom.receive()
848 if packet ==
'FINISHED':
850 evtNumber, tbin = packet
851 if self.
cntr !=
None:
853 self.
cntr.setEventCounter(evtNumber)
863 self.
rTime += (time.time() - t)
867 self.
log.name =
"Worker-%i" % (self.
nodeID)
868 self.
log.warning(
'Did not Execute Event')
869 self.
evt.clearStore()
874 self.
log.name =
"Worker-%i" % (self.
nodeID)
875 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
876 self.
evt.clearStore()
885 self.
inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
886 self.eventLoopSyncer.set()
887 self.
evt.clearStore()
888 self.
log.name =
"Worker-%i" % (self.
nodeID)
889 self.
log.info(
'Setting <Last> Event %s' % (self.
nodeID))
892 self.
evcom.finalize()
895 self.
tTime = time.time() - startEngine
906 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
915 For some output writers, a check is performed to see if the event has 916 executed certain algorithms. 917 These reside in the AcceptAlgs property for those writers 923 if hasattr(m.w,
'AcceptAlgs'):
924 acc += m.w.AcceptAlgs
925 if hasattr(m.w,
'RequireAlgs'):
926 req += m.w.RequireAlgs
927 if hasattr(m.w,
'VetoAlgs'):
929 return (acc, req, vet)
932 if self.
a.algorithm(algName)._ialg.isExecuted()\
933 and self.
a.algorithm(algName)._ialg.filterPassed():
940 Check the algorithm status for an event. 941 Depending on output writer settings, the event 942 may be declined based on various criteria. 943 This is a transcript of the check that occurs in GaudiSvc::OutputStream 947 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
949 for name
in self.acceptAlgs:
956 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
957 for name
in self.requireAlgs:
961 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
964 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
969 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
978 def __init__(self, workerID, queues, events, params, subworkers):
979 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
985 self.
log.name =
"Worker-%i" % (self.
nodeID)
986 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
995 self.config[
'EventSelector'].Input = []
996 self.config[
'ApplicationMgr'].OutStream = []
997 if "HistogramPersistencySvc" in self.config.keys():
998 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 999 formatHead =
'[Worker-%i]' % (self.
nodeID)
1000 self.config[
'MessageSvc'].Format =
'%-13s ' % formatHead + \
1004 self.
log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1009 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1010 self.config[m.key].Output = newName
1020 if "ToolSvc.EvtCounter" not in self.config:
1021 from Configurables
import EvtCounter
1022 counter = EvtCounter()
1024 counter = self.config[
"ToolSvc.EvtCounter"]
1025 counter.UseIncident =
False 1028 self.
log.warning(
'Cannot configure EvtCounter')
1035 libc = ctypes.CDLL(
'libc.so.6')
1037 libc.prctl(15, name, 0, 0, 0)
1039 startEngine = time.time()
1040 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1046 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
1048 nEventWriters = len(self.
writerDict[
"events"])
1053 for item
in m.ItemList:
1054 hsh = item.find(
'#')
1058 for item
in m.OptItemList:
1059 hsh = item.find(
'#')
1062 optItemList.add(item)
1064 itemList -= optItemList
1065 for item
in sorted(itemList):
1066 self.
log.info(
' adding ItemList Item to ts : %s' % (item))
1067 self.
ts.addItem(item)
1068 for item
in sorted(optItemList):
1069 self.
log.info(
' adding Optional Item to ts : %s' % (item))
1070 self.
ts.addOptItem(item)
1072 self.
log.info(
'There is no Event Output for this app')
1078 packet = self.
evcom.receive()
1083 if packet ==
'FINISHED':
1085 evtNumber, tbin = packet
1086 if self.
cntr !=
None:
1087 self.
cntr.setEventCounter(evtNumber)
1091 self.
log.info(
"Fork new subworkers")
1095 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1107 self.
rTime += (time.time() - t)
1111 self.
log.warning(
'Did not Execute Event')
1112 self.
evt.clearStore()
1117 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
1118 self.
evt.clearStore()
1127 self.
inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1128 self.eventLoopSyncer.set()
1129 self.
evt.clearStore()
1130 self.
log.info(
'Setting <Last> Event')
1133 self.
evcom.finalize()
1134 self.
log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1142 self.
log.info(
'Join subworkers')
1147 For some output writers, a check is performed to see if the event has 1148 executed certain algorithms. 1149 These reside in the AcceptAlgs property for those writers 1155 if hasattr(m.w,
'AcceptAlgs'):
1156 acc += m.w.AcceptAlgs
1157 if hasattr(m.w,
'RequireAlgs'):
1158 req += m.w.RequireAlgs
1159 if hasattr(m.w,
'VetoAlgs'):
1161 return (acc, req, vet)
1164 if self.
a.algorithm(algName)._ialg.isExecuted()\
1165 and self.
a.algorithm(algName)._ialg.filterPassed():
1172 Check the algorithm status for an event. 1173 Depending on output writer settings, the event 1174 may be declined based on various criteria. 1175 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1179 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1181 for name
in self.acceptAlgs:
1188 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1189 for name
in self.requireAlgs:
1193 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
1196 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1201 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1210 def __init__(self, queues, events, params, subworkers):
1211 GMPComponent.__init__(self,
'Writer', -2, queues, events, params,
1217 self.
log.name =
"Writer--2" 1223 self.config[
'ApplicationMgr'].TopAlg = []
1224 self.config[
'EventSelector'].Input = []
1226 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Writer]' + \
1231 self.
log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1238 self.
log.debug(
'Processing Event Writer : %s' % (m))
1239 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers)
1240 self.config[m.key].Output = newName
1250 self.
log.debug(
'Processing FileRecords Writer: %s' % (m))
1251 newName = m.getNewName(
1252 '.',
'.p%i.' % self.nWorkers, extra=
" OPT='RECREATE'")
1253 self.config[m.key].Output = newName
1256 hs =
"HistogramPersistencySvc" 1258 if hs
in self.config.keys():
1259 n = self.config[hs].OutputFile
1261 newName = self.config[hs].OutputFile.replace(
1262 '.',
'.p%i.' % (self.nWorkers))
1263 self.config[hs].OutputFile = newName
1269 libc = ctypes.CDLL(
'libc.so.6')
1271 libc.prctl(15, name, 0, 0, 0)
1273 startEngine = time.time()
1281 stopCriteria = self.nWorkers
1283 current = (current + 1) % self.nWorkers
1284 packet = self.
evcoms[current].receive(timeout=0.01)
1287 if packet ==
'FINISHED':
1289 'Writer got FINISHED flag : Worker %i' % (current))
1291 self.
status[current] =
True 1293 self.
log.info(
'FINISHED recd from all workers, break loop')
1298 evtNumber, tbin = packet
1302 self.
rTime += (time.time() - t)
1304 self.
evt.clearStore()
1305 self.eventLoopSyncer.set()
1306 self.
log.name =
"Writer--2" 1307 self.
log.info(
'Setting <Last> Event')
1311 [e.finalize()
for e
in self.
evcoms]
1316 self.
log.info(
'Histo Store rebuilt ok')
1318 self.
log.warning(
'Histo Store Error in Rebuild')
1339 self.
log.name =
'GaudiPython-Parallel-Logger' 1340 self.
log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1349 self.
hq = JoinableQueue()
1350 self.
fq = JoinableQueue()
1356 limit=WAIT_INITIALISE,
1357 step=STEP_INITIALISE)
1362 limit=WAIT_SINGLE_EVENT,
1364 firstEvent=WAIT_FIRST_EVENT)
1366 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1395 init = self.
sInit.d[nodeID].event
1396 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1397 fin = self.
sFin.d[nodeID].event
1398 return (init, run, fin)
1401 eventQ = self.
qs[nodeID]
1404 return (eventQ, histQ, fsrQ)
1409 self.
log.name =
'GaudiPython-Parallel-Logger' 1410 self.
log.info(
'INITIALISING SYSTEM')
1416 sc = self.
sInit.syncAll(step=
"Initialise")
1424 self.
log.name =
'GaudiPython-Parallel-Logger' 1425 self.
log.info(
'RUNNING SYSTEM')
1426 sc = self.
sRun.syncAll(step=
"Run")
1434 self.
log.name =
'GaudiPython-Parallel-Logger' 1435 self.
log.info(
'FINALISING SYSTEM')
1436 sc = self.
sFin.syncAll(step=
"Finalise")
1444 self.
log.info(
"Cleanly join all Processes")
1446 self.
log.info(
"Report Total Success to Main.py")
1451 children = multiprocessing.active_children()
1474 rwk = JoinableQueue()
1476 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1479 d[-2] = (workersWriter,
None)
1481 d[i] = (rwk, workersWriter[i])
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
executeEvent
Helpers for re-entrant interfaces.
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
def __init__(self, gmpcomponent)
::details::reverse_wrapper< T > reverse(T &&iterable)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
def IdentifyWriters(self)
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
def __init__(self, workerID, queues, events, params, subworkers)
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)