11 from __future__
import print_function
13 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
14 from ROOT
import TBufferFile, TBuffer
15 import multiprocessing
16 from multiprocessing
import Process, Queue, JoinableQueue, Event
17 from multiprocessing
import cpu_count, current_process
18 from multiprocessing.queues
import Empty
23 from ROOT
import TParallelMergingFile
51 WAIT_INITIALISE = 60 * 5
52 WAIT_FIRST_EVENT = 60 * 3
53 WAIT_SINGLE_EVENT = 60 * 6
54 WAIT_FINALISE = 60 * 2
67 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
71 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
72 gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
73 gbl.AIDA.IBaseHistogram)
76 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
80 'EvtCollectionStream':
"tuples",
81 'InputCopyStream':
"events",
82 'OutputStream':
"events",
83 'RecordStream':
"records",
84 'RunRecordStream':
"records",
85 'SequentialOutputStream':
"events",
86 'TagCollectionStream':
"tuples" 94 A class to represent a writer in the GaudiPython configuration 95 It can be non-trivial to access the name of the output file; it may be 96 specified in the DataSvc, or just on the writer, may be a list, or string 97 Also, there are three different types of writer (events, records, tuples) 98 so this bootstrap class provides easy access to this info while configuring 117 if hasattr(self.
w,
"Output"):
127 if hasattr(datasvc,
"Output"):
139 assert replaceThis.__class__.__name__ ==
'str' 140 assert withThis.__class__.__name__ ==
'str' 143 if old.__class__.__name__ ==
'list':
146 new = old.replace(replaceThis, withThis)
155 if hasattr(self.
w,
"ItemList"):
158 datasvc = config[self.
w.EvtDataSvc]
159 if hasattr(datasvc,
"ItemList"):
162 if hasattr(self.
w,
"OptItemList"):
165 datasvc = config[self.
w.EvtDataSvc]
166 if hasattr(datasvc,
"OptItemList"):
170 def set(self, key, output):
179 s +=
"Writer : %s\n" % (self.
wName)
180 s +=
"Writer Type : %s\n" % (self.
wType)
181 s +=
"Writer Output : %s\n" % (self.
output)
183 s +=
"DataSvc Output : %s\n" % (self.
svcOutput)
185 s +=
"Key for config : %s\n" % (self.
key)
186 s +=
"Output File : %s\n" % (self.
output)
187 s +=
"ItemList : %s\n" % (self.
ItemList)
198 GaudiPython algorithm used to clean up histos on the Reader and Workers 199 Only has a finalize method() 200 This retrieves a dictionary of path:histo objects and sends it to the 201 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 202 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 203 at the writer end, there will be an error. 207 PyAlgorithm.__init__(self)
217 'CollectHistograms Finalise (%s)' % (self.
_gmpc.nodeType))
218 self.
_gmpc.hDict = self.
_gmpc.dumpHistograms()
219 ks = self.
_gmpc.hDict.keys()
220 self.
log.
info(
'%i Objects in Histogram Store' % (len(ks)))
226 reps = len(ks) / chunk + 1
227 for i
in range(reps):
228 someKeys = ks[i * chunk:(i + 1) * chunk]
229 smalld = dict([(key, self.
_gmpc.hDict[key])
for key
in someKeys])
230 self.
_gmpc.hq.put((self.
_gmpc.nodeID, smalld))
232 self.
log.
debug(
'Signalling end of histos to Writer')
233 self.
_gmpc.hq.put(
'HISTOS_SENT')
234 self.
log.
debug(
'Waiting on Sync Event')
235 self.
_gmpc.sEvent.wait()
236 self.
log.
debug(
'Histo Sync Event set, clearing and returning')
237 self.
_gmpc.hvt.clearStore()
238 root = gbl.DataObject()
240 self.
_gmpc.hvt.setRoot(
'/stat', root)
275 assert item.__class__.__name__ ==
'tuple' 276 startTransmission = time.time()
280 while self.
qout._buffer:
282 self.
qoutTime += time.time() - startTransmission
289 startWait = time.time()
291 itemIn = self.
qin.
get(timeout=timeout)
294 self.
qinTime += time.time() - startWait
296 if itemIn.__class__.__name__ ==
'tuple':
303 self.
_gmpc.log.warning(
304 'TASK_DONE called too often by : %s' % (self.
_gmpc.nodeType))
308 self.
log.info(
'Finalize Event Communicator : %s %s' %
313 if self.
_gmpc.nodeType ==
'Reader':
314 downstream = self.
_gmpc.nWorkers
315 elif self.
_gmpc.nodeType ==
'Writer':
317 elif self.
_gmpc.nodeType ==
'Worker':
320 for i
in range(downstream):
322 if self.
_gmpc.nodeType !=
'Writer':
328 self.
log.name =
'%s-%i Audit ' % (self.
_gmpc.nodeType,
330 self.
log.info(
'Items Sent : %i' % (self.
nSent))
331 self.
log.info(
'Items Received : %i' % (self.
nRecv))
333 self.
log.info(
'Data Received : %i' % (self.
sizeRecv))
334 self.
log.info(
'Q-out Time : %5.2f' % (self.
qoutTime))
335 self.
log.info(
'Q-in Time : %5.2f' % (self.
qinTime))
342 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
343 self.
T = gaudiTESSerializer
357 root = gbl.DataObject()
359 self.
evt.setRoot(
'/Event', root)
361 self.
T.loadBuffer(tbuf)
362 self.
tLoad += (time.time() - t)
368 tb = TBufferFile(TBuffer.kWrite)
369 self.
T.dumpBuffer(tb)
370 self.
tDump += (time.time() - t)
376 evIn =
"Events Loaded : %i" % (self.
nIn)
377 evOut =
"Events Dumped : %i" % (self.
nOut)
379 dataIn =
"Data Loaded : %i" % (din)
380 dataInMb =
"Data Loaded (MB) : %5.2f Mb" % (din / MB)
382 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
383 % (din / (self.
nIn * MB))
384 maxIn =
"Max Buf Loaded : %5.2f Mb"\
387 avgIn =
"Avg Buf Loaded : N/A" 388 maxIn =
"Max Buf Loaded : N/A" 390 dataOut =
"Data Dumped : %i" % (dout)
391 dataOutMb =
"Data Dumped (MB) : %5.2f Mb" % (dout / MB)
393 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
394 % (din / (self.
nOut * MB))
395 maxOut =
"Max Buf Dumped : %5.2f Mb"\
398 avgOut =
"Avg Buf Dumped : N/A" 399 maxOut =
"Max Buf Dumped : N/A" 400 dumpTime =
"Total Dump Time : %5.2f" % (self.
tDump)
401 loadTime =
"Total Load Time : %5.2f" % (self.
tLoad)
431 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
442 current_process().name = nodeType
449 self.nWorkers, self.sEvent, self.config, self.
log = params
461 ks = self.config.keys()
463 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
470 qPair, histq, fq = self.
queues 527 from AlgSmapShot
import SmapShot
529 ss = SmapShot(logname=smapsLog)
530 self.
a.addAlgorithm(ss)
533 self.
fsr = self.
a.filerecordsvc()
534 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
535 self.
pers = self.
a.service(
'EventPersistencySvc',
'IAddressCreator')
536 self.
ts = gbl.GaudiMP.TESSerializer(self.
evt._idp, self.
pers)
547 root = gbl.DataObject()
549 self.
evt.setRoot(
'/Event', root)
550 self.
ts.loadBuffer(tbufferfile)
553 if self.
app !=
'Gauss':
559 lst = [
'/Event/Gen/Header',
'/Event/Rec/Header']
563 n = self.
evt[path].evtNumber()
573 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
580 if self.
nIn > 0
or self.
nOut > 0:
583 self.
log.warning(
'Could not determine Event Number')
596 keys = [
"events",
"records",
"tuples",
"histos"]
601 wkeys = WRITERTYPES.keys()
602 for v
in self.config.values():
603 if v.__class__.__name__
in wkeys:
604 writerType = WRITERTYPES[v.__class__.__name__]
605 d[writerType].append(
MiniWriter(v, writerType, self.config))
607 self.
log.info(
'Writer Found : %s' % (v.name()))
610 if 'HistogramPersistencySvc' in self.config.keys():
611 hfile = self.config[
'HistogramPersistencySvc'].getProp(
613 d[
"histos"].append(hfile)
618 Method used by the GaudiPython algorithm CollectHistos 619 to obtain a dictionary of form { path : object } 620 representing the Histogram Store 622 nlist = self.
hvt.getHistoNames()
629 if type(o)
in aidatypes:
636 print(
'WARNING : no histograms to recover?')
646 if self.
app ==
'Gauss':
648 tool = self.
a.tool(
"ToolSvc.EvtCounter")
653 self.
iTime = time.time() - start
661 self.
fTime = time.time() - start
665 allTime =
"Alive Time : %5.2f" % (self.
tTime)
666 initTime =
"Init Time : %5.2f" % (self.
iTime)
667 frstTime =
"1st Event Time : %5.2f" % (self.
firstEvTime)
668 runTime =
"Run Time : %5.2f" % (self.
rTime)
669 finTime =
"Finalise Time : %5.2f" % (self.
fTime)
670 tup = (allTime, initTime, frstTime, runTime, finTime)
682 def __init__(self, queues, events, params, subworkers):
683 GMPComponent.__init__(self,
'Reader', -1, queues, events, params,
691 self.config[
'ApplicationMgr'].TopAlg = []
692 self.config[
'ApplicationMgr'].OutStream = []
693 if "HistogramPersistencySvc" in self.config.keys():
694 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 695 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Reader]' + \
697 self.
evtMax = self.config[
'ApplicationMgr'].EvtMax
700 tb = TBufferFile(TBuffer.kWrite)
702 self.
ts.dumpBuffer(tb)
709 startFirst = time.time()
710 self.
log.info(
'Reader : First Event')
712 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
718 if not bool(self.
evt[
'/Event']):
719 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
725 lst = self.
evt.getHistoNames()
728 lst = self.
evt.getList()
729 if self.
app ==
"DaVinci":
730 daqnode = self.
evt.retrieveObject(
733 self.
evt.getList(daqnode, lst,
734 daqnode.address().
par())
736 self.
log.critical(
'Reader could not acquire TES List!')
739 self.
log.info(
'Reader : TES List : %i items' % (len(lst)))
744 self.
log.info(
'First Event Sent')
747 self.eventLoopSyncer.set()
748 self.
evt.clearStore()
756 libc = ctypes.CDLL(
'libc.so.6')
758 libc.prctl(15, name, 0, 0, 0)
760 startEngine = time.time()
761 self.
log.name =
'Reader' 762 self.
log.info(
'Reader Process starting')
769 self.
log.info(
'Reader Beginning Distribution')
772 self.
log.info(
'Reader First Event OK')
774 self.
log.critical(
'Reader Failed on First Event')
781 self.
log.info(
'evtMax( %i ) reached' % (self.
evtMax))
784 if not self.
stat.isSuccess():
785 self.
log.critical(
'Reader is Damaged!')
790 self.
rTime += (time.time() - t)
791 if not bool(self.
evt[
'/Event']):
792 self.
log.warning(
'No More Events! (So Far : %i)' % (self.
nOut))
799 self.eventLoopSyncer.set()
800 self.
evt.clearStore()
801 self.
log.info(
'Setting <Last> Event')
805 self.
log.info(
'Reader : Event Distribution complete.')
806 self.
evcom.finalize()
808 self.
tTime = time.time() - startEngine
816 def __init__(self, workerID, queues, events, params, subworkers):
817 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
823 self.
log.info(
"Subworker-%i Created OK" % (self.
nodeID))
830 libc = ctypes.CDLL(
'libc.so.6')
832 libc.prctl(15, name, 0, 0, 0)
835 startEngine = time.time()
836 msg = self.
a.service(
'MessageSvc')
837 msg.Format =
'%-13s ' % (
'[' + self.
log.name +
']') + self.
msgFormat 839 self.
log.name =
"Worker-%i" % (self.
nodeID)
840 self.
log.info(
"Subworker %i starting Engine" % (self.
nodeID))
845 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
847 nEventWriters = len(self.
writerDict[
"events"])
853 packet = self.
evcom.receive()
858 if packet ==
'FINISHED':
860 evtNumber, tbin = packet
861 if self.
cntr !=
None:
863 self.
cntr.setEventCounter(evtNumber)
873 self.
rTime += (time.time() - t)
877 self.
log.name =
"Worker-%i" % (self.
nodeID)
878 self.
log.warning(
'Did not Execute Event')
879 self.
evt.clearStore()
884 self.
log.name =
"Worker-%i" % (self.
nodeID)
885 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
886 self.
evt.clearStore()
895 self.
inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
896 self.eventLoopSyncer.set()
897 self.
evt.clearStore()
898 self.
log.name =
"Worker-%i" % (self.
nodeID)
899 self.
log.info(
'Setting <Last> Event %s' % (self.
nodeID))
902 self.
evcom.finalize()
905 self.
tTime = time.time() - startEngine
916 self.
inc = self.
a.service(
'IncidentSvc',
'IIncidentSvc')
925 For some output writers, a check is performed to see if the event has 926 executed certain algorithms. 927 These reside in the AcceptAlgs property for those writers 933 if hasattr(m.w,
'AcceptAlgs'):
934 acc += m.w.AcceptAlgs
935 if hasattr(m.w,
'RequireAlgs'):
936 req += m.w.RequireAlgs
937 if hasattr(m.w,
'VetoAlgs'):
939 return (acc, req, vet)
942 if self.
a.algorithm(algName)._ialg.isExecuted()\
943 and self.
a.algorithm(algName)._ialg.filterPassed():
950 Check the algorithm status for an event. 951 Depending on output writer settings, the event 952 may be declined based on various criteria. 953 This is a transcript of the check that occurs in GaudiSvc::OutputStream 957 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
959 for name
in self.acceptAlgs:
966 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
967 for name
in self.requireAlgs:
971 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
974 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
979 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
988 def __init__(self, workerID, queues, events, params, subworkers):
989 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params,
995 self.
log.name =
"Worker-%i" % (self.
nodeID)
996 self.
log.info(
"Worker-%i Created OK" % (self.
nodeID))
1005 self.config[
'EventSelector'].Input = []
1006 self.config[
'ApplicationMgr'].OutStream = []
1007 if "HistogramPersistencySvc" in self.config.keys():
1008 self.config[
'HistogramPersistencySvc'].OutputFile =
'' 1009 formatHead =
'[Worker-%i]' % (self.
nodeID)
1010 self.config[
'MessageSvc'].Format =
'%-13s ' % formatHead + \
1014 self.
log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1019 newName = m.getNewName(
'.',
'.w%i.' % (self.
nodeID))
1020 self.config[m.key].Output = newName
1030 if "ToolSvc.EvtCounter" not in self.config:
1031 from Configurables
import EvtCounter
1032 counter = EvtCounter()
1034 counter = self.config[
"ToolSvc.EvtCounter"]
1035 counter.UseIncident =
False 1038 self.
log.warning(
'Cannot configure EvtCounter')
1045 libc = ctypes.CDLL(
'libc.so.6')
1047 libc.prctl(15, name, 0, 0, 0)
1049 startEngine = time.time()
1050 self.
log.info(
"Worker %i starting Engine" % (self.
nodeID))
1056 'EVT WRITERS ON WORKER : %i' % (len(self.
writerDict[
'events'])))
1058 nEventWriters = len(self.
writerDict[
"events"])
1063 for item
in m.ItemList:
1064 hsh = item.find(
'#')
1068 for item
in m.OptItemList:
1069 hsh = item.find(
'#')
1072 optItemList.add(item)
1074 itemList -= optItemList
1075 for item
in sorted(itemList):
1076 self.
log.info(
' adding ItemList Item to ts : %s' % (item))
1077 self.
ts.addItem(item)
1078 for item
in sorted(optItemList):
1079 self.
log.info(
' adding Optional Item to ts : %s' % (item))
1080 self.
ts.addOptItem(item)
1082 self.
log.info(
'There is no Event Output for this app')
1088 packet = self.
evcom.receive()
1093 if packet ==
'FINISHED':
1095 evtNumber, tbin = packet
1096 if self.
cntr !=
None:
1097 self.
cntr.setEventCounter(evtNumber)
1101 self.
log.info(
"Fork new subworkers")
1105 k.SetServices(self.
a, self.
evt, self.
hvt, self.
fsr,
1117 self.
rTime += (time.time() - t)
1121 self.
log.warning(
'Did not Execute Event')
1122 self.
evt.clearStore()
1127 self.
log.warning(
'Event did not pass : %i' % (evtNumber))
1128 self.
evt.clearStore()
1137 self.
inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1138 self.eventLoopSyncer.set()
1139 self.
evt.clearStore()
1140 self.
log.info(
'Setting <Last> Event')
1143 self.
evcom.finalize()
1144 self.
log.info(
'Worker-%i Finished Processing Events' % (self.
nodeID))
1152 self.
log.info(
'Join subworkers')
1157 For some output writers, a check is performed to see if the event has 1158 executed certain algorithms. 1159 These reside in the AcceptAlgs property for those writers 1165 if hasattr(m.w,
'AcceptAlgs'):
1166 acc += m.w.AcceptAlgs
1167 if hasattr(m.w,
'RequireAlgs'):
1168 req += m.w.RequireAlgs
1169 if hasattr(m.w,
'VetoAlgs'):
1171 return (acc, req, vet)
1174 if self.
a.algorithm(algName)._ialg.isExecuted()\
1175 and self.
a.algorithm(algName)._ialg.filterPassed():
1182 Check the algorithm status for an event. 1183 Depending on output writer settings, the event 1184 may be declined based on various criteria. 1185 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1189 self.
log.debug(
'self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1191 for name
in self.acceptAlgs:
1198 self.
log.debug(
'self.requireAlgs is %s' % (str(self.requireAlgs)))
1199 for name
in self.requireAlgs:
1203 self.
log.info(
'Evt declined (requireAlgs) : %s' % (name))
1206 self.
log.debug(
'self.vetoAlgs is %s' % (str(self.
vetoAlgs)))
1211 self.
log.info(
'Evt declined : (vetoAlgs) : %s' % (name))
1220 def __init__(self, queues, events, params, subworkers):
1221 GMPComponent.__init__(self,
'Writer', -2, queues, events, params,
1227 self.
log.name =
"Writer--2" 1233 self.config[
'ApplicationMgr'].TopAlg = []
1234 self.config[
'EventSelector'].Input = []
1236 self.config[
'MessageSvc'].Format =
'%-13s ' %
'[Writer]' + \
1241 self.
log.info(
'Writer Type : %s\t : %i' % (key, len(lst)))
1248 self.
log.debug(
'Processing Event Writer : %s' % (m))
1249 newName = m.getNewName(
'.',
'.p%i.' % self.nWorkers)
1250 self.config[m.key].Output = newName
1260 self.
log.debug(
'Processing FileRecords Writer: %s' % (m))
1261 newName = m.getNewName(
1262 '.',
'.p%i.' % self.nWorkers, extra=
" OPT='RECREATE'")
1263 self.config[m.key].Output = newName
1266 hs =
"HistogramPersistencySvc" 1268 if hs
in self.config.keys():
1269 n = self.config[hs].OutputFile
1271 newName = self.config[hs].OutputFile.replace(
1272 '.',
'.p%i.' % (self.nWorkers))
1273 self.config[hs].OutputFile = newName
1279 libc = ctypes.CDLL(
'libc.so.6')
1281 libc.prctl(15, name, 0, 0, 0)
1283 startEngine = time.time()
1291 stopCriteria = self.nWorkers
1293 current = (current + 1) % self.nWorkers
1294 packet = self.
evcoms[current].receive(timeout=0.01)
1297 if packet ==
'FINISHED':
1299 'Writer got FINISHED flag : Worker %i' % (current))
1301 self.
status[current] =
True 1303 self.
log.info(
'FINISHED recd from all workers, break loop')
1308 evtNumber, tbin = packet
1312 self.
rTime += (time.time() - t)
1314 self.
evt.clearStore()
1315 self.eventLoopSyncer.set()
1316 self.
log.name =
"Writer--2" 1317 self.
log.info(
'Setting <Last> Event')
1321 [e.finalize()
for e
in self.
evcoms]
1326 self.
log.info(
'Histo Store rebuilt ok')
1328 self.
log.warning(
'Histo Store Error in Rebuild')
1349 self.
log.name =
'GaudiPython-Parallel-Logger' 1350 self.
log.info(
'GaudiPython Parallel Process Co-ordinator beginning')
1359 self.
hq = JoinableQueue()
1360 self.
fq = JoinableQueue()
1366 limit=WAIT_INITIALISE,
1367 step=STEP_INITIALISE)
1372 limit=WAIT_SINGLE_EVENT,
1374 firstEvent=WAIT_FIRST_EVENT)
1376 self.
nWorkers, self.
log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1405 init = self.
sInit.d[nodeID].event
1406 run = (self.
sRun.d[nodeID].event, self.
sRun.d[nodeID].lastEvent)
1407 fin = self.
sFin.d[nodeID].event
1408 return (init, run, fin)
1411 eventQ = self.
qs[nodeID]
1414 return (eventQ, histQ, fsrQ)
1419 self.
log.name =
'GaudiPython-Parallel-Logger' 1420 self.
log.info(
'INITIALISING SYSTEM')
1426 sc = self.
sInit.syncAll(step=
"Initialise")
1434 self.
log.name =
'GaudiPython-Parallel-Logger' 1435 self.
log.info(
'RUNNING SYSTEM')
1436 sc = self.
sRun.syncAll(step=
"Run")
1444 self.
log.name =
'GaudiPython-Parallel-Logger' 1445 self.
log.info(
'FINALISING SYSTEM')
1446 sc = self.
sFin.syncAll(step=
"Finalise")
1454 self.
log.info(
"Cleanly join all Processes")
1456 self.
log.info(
"Report Total Success to Main.py")
1461 children = multiprocessing.active_children()
1484 rwk = JoinableQueue()
1486 workersWriter = [JoinableQueue()
for i
in range(self.
nWorkers)]
1489 d[-2] = (workersWriter,
None)
1491 d[i] = (rwk, workersWriter[i])
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
executeEvent
Helpers for re-entrant interfaces.
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
def __init__(self, gmpcomponent)
::details::reverse_wrapper< T > reverse(T &&iterable)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
def IdentifyWriters(self)
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
def __init__(self, workerID, queues, events, params, subworkers)
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)