The Gaudi Framework  master (b9786168)
Loading...
Searching...
No Matches
GMPBase.py
Go to the documentation of this file.
11
12import multiprocessing
13import time
14import warnings
15from multiprocessing import Event, JoinableQueue, Process, cpu_count, current_process
16from multiprocessing.queues import Empty
17
18from ROOT import TBuffer, TBufferFile
19
20from GaudiMP.pTools import FileRecordsAgent, HistoAgent, Syncer
21from GaudiPython import (
22 FAILURE,
23 SUCCESS,
24 AppMgr,
25 InterfaceCast,
26 PyAlgorithm,
27 gbl,
28 setOwnership,
29)
30
31# Workaround for ROOT-10769
32with warnings.catch_warnings():
33 warnings.simplefilter("ignore")
34 import cppyy
35
36# This script contains the bases for the Gaudi MultiProcessing (GMP)
37# classes
38
39# There are three classes :
40# Reader
41# Worker
42# Writer
43
44# Each class needs to perform communication with the others
45# For this, we need a means of communication, which will be based on
46# the python multiprocessing package
47# This is provided in SPI pytools package
48# cmt line : use pytools v1.1 LCG_Interfaces
49# The PYTHONPATH env variable may need to be modified, as this might
50# still point to 1.0_python2.5
51
52# Each class will need Queues, and a defined method for using these
53# queues.
54# For example, as long as there is something in the Queue, both ends
55# of the queue must be open
56# Also, there needs to be proper Termination flags and criteria
57# The System should be error proof.
58
59# Constants -------------------------------------------------------------------
60NAP = 0.001
61MB = 1024.0 * 1024.0
62# waits to guard against hanging, in seconds
63WAIT_INITIALISE = 60 * 10
64WAIT_FIRST_EVENT = 60 * 3
65WAIT_SINGLE_EVENT = 60 * 6
66WAIT_FINALISE = 60 * 2
67STEP_INITIALISE = 10
68STEP_EVENT = 2
69STEP_FINALISE = 10
70
71# My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
72SMAPS = False
73
74# -----------------------------------------------------------------------------
75
76# definitions
77# ----------
78# used to convert stored histos (in AIDA format) to ROOT format
79aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
80
81# used to check which type of histo we are dealing with
82# i.e. if currentHisto in aidatypes : pass
83aidatypes = (
84 gbl.AIDA.IHistogram,
85 gbl.AIDA.IHistogram1D,
86 gbl.AIDA.IHistogram2D,
87 gbl.AIDA.IHistogram3D,
88 gbl.AIDA.IProfile1D,
89 gbl.AIDA.IProfile2D,
90 gbl.AIDA.IBaseHistogram,
91) # extra?
92
93# similar to aidatypes
94thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
95
96# Types of OutputStream in Gaudi
97WRITERTYPES = {
98 "EvtCollectionStream": "tuples",
99 "InputCopyStream": "events",
100 "OutputStream": "events",
101 "RecordStream": "records",
102 "RunRecordStream": "records",
103 "SequentialOutputStream": "events",
104 "TagCollectionStream": "tuples",
105}
106
107# =============================================================================
108
109
110class MiniWriter(object):
111 """
112 A class to represent a writer in the GaudiPython configuration
113 It can be non-trivial to access the name of the output file; it may be
114 specified in the DataSvc, or just on the writer, may be a list, or string
115 Also, there are three different types of writer (events, records, tuples)
116 so this bootstrap class provides easy access to this info while configuring
117 """
118
119 def __init__(self, writer, wType, config):
120 self.w = writer
121 self.wType = wType
122 # set parameters for directly accessing the correct
123 # part of the configuration, so that later, we can do
124 # config[ key ].Output = modified(output)
125 self.key = None
126 self.output = None
127 self.ItemList = None
128 self.OptItemList = None
129 #
130 self.wName = writer.getName()
131 # Now process the Writer, find where the output is named
132 self.woutput = None
133 self.datasvcName = None
134 self.svcOutput = None
135 if hasattr(self.w, "Output"):
136 self.woutput = self.w.Output
137 self.getItemLists(config)
138 self.set(self.wName, self.w.Output)
139 return
140 else:
141 # if there is no output file, get it via the datasvc
142 # (every writer always has a datasvc property)
143 self.datasvcName = self.w.EvtDataSvc
144 datasvc = config[self.datasvcName]
145 if hasattr(datasvc, "Output"):
146 self.getItemLists(config)
147 self.set(self.datasvcName, datasvc.Output)
148 return
149
150 def getNewName(self, replaceThis, withThis, extra=""):
151 # replace one pattern in the output name string
152 # with another, and return the Output name
153 # It *might* be in a list, so check for this
154 #
155 # @param extra : might need to add ROOT flags
156 # i.e.: OPT='RECREATE', or such
157 assert replaceThis.__class__.__name__ == "str"
158 assert withThis.__class__.__name__ == "str"
159 old = self.output
160 lst = False
161 if old.__class__.__name__ == "list":
162 old = self.output[0]
163 lst = True
164 new = old.replace(replaceThis, withThis)
165 new += extra
166 if lst:
167 return [new]
168 else:
169 return new
170
171 def getItemLists(self, config):
172 # the item list
173 if hasattr(self.w, "ItemList"):
174 self.ItemList = self.w.ItemList
175 else:
176 datasvc = config[self.w.EvtDataSvc]
177 if hasattr(datasvc, "ItemList"):
178 self.ItemList = datasvc.ItemList
179 # The option item list; possibly not a valid variable
180 if hasattr(self.w, "OptItemList"):
181 self.OptItemList = self.w.OptItemList
182 else:
183 datasvc = config[self.w.EvtDataSvc]
184 if hasattr(datasvc, "OptItemList"):
185 self.OptItemList = datasvc.OptItemList
186 return
187
188 def set(self, key, output):
189 self.key = key
190 self.output = output
191 return
192
193 def __repr__(self):
194 s = ""
195 line = "-" * 80
196 s += line + "\n"
197 s += "Writer : %s\n" % (self.wName)
198 s += "Writer Type : %s\n" % (self.wType)
199 s += "Writer Output : %s\n" % (self.output)
200 s += "DataSvc : %s\n" % (self.datasvcName)
201 s += "DataSvc Output : %s\n" % (self.svcOutput)
202 s += "\n"
203 s += "Key for config : %s\n" % (self.key)
204 s += "Output File : %s\n" % (self.output)
205 s += "ItemList : %s\n" % (self.ItemList)
206 s += "OptItemList : %s\n" % (self.OptItemList)
207 s += line + "\n"
208 return s
209
210
211# =============================================================================
212
213
215 """
216 GaudiPython algorithm used to clean up histos on the Reader and Workers
217 Only has a finalize method()
218 This retrieves a dictionary of path:histo objects and sends it to the
219 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
220 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
221 at the writer end, there will be an error.
222 """
223
224 def __init__(self, gmpcomponent):
225 PyAlgorithm.__init__(self)
226 self._gmpc = gmpcomponent
227 self.log = self._gmpc.log
228 return None
229
230 def execute(self):
231 return SUCCESS
232
233 def finalize(self):
234 self.log.info("CollectHistograms Finalise (%s)" % (self._gmpc.nodeType))
235 self._gmpc.hDict = self._gmpc.dumpHistograms()
236 ks = list(self._gmpc.hDict.keys())
237 self.log.info("%i Objects in Histogram Store" % (len(ks)))
238
239 # crashes occurred due to Memory Error during the sending of hundreds
240 # histos on slc5 machines, so instead, break into chunks
241 # send 100 at a time
242 chunk = 100
243 reps = int(len(ks) / chunk + 1)
244 for i in range(reps):
245 someKeys = ks[i * chunk : (i + 1) * chunk]
246 smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
247 self._gmpc.hq.put((self._gmpc.nodeID, smalld))
248 # "finished" Notification
249 self.log.debug("Signalling end of histos to Writer")
250 self._gmpc.hq.put("HISTOS_SENT")
251 self.log.debug("Waiting on Sync Event")
252 self._gmpc.sEvent.wait()
253 self.log.debug("Histo Sync Event set, clearing and returning")
254 self._gmpc.hvt.clearStore()
255 root = gbl.DataObject()
256 setOwnership(root, False)
257 self._gmpc.hvt.setRoot("/stat", root)
258 return SUCCESS
259
260
261# =============================================================================
262
263
264class EventCommunicator(object):
265 # This class is responsible for communicating Gaudi Events via Queues
266 # Events are communicated as TBufferFiles, filled either by the
267 # TESSerializer, or the GaudiSvc, "IPCSvc"
268 def __init__(self, GMPComponent, qin, qout):
269 self._gmpc = GMPComponent
270 self.log = self._gmpc.log
271 # maximum capacity of Queues
272 self.maxsize = 50
273 # central variables : Queues
274 self.qin = qin
275 self.qout = qout
276
277 # flags
278 self.allsent = False
279 self.allrecv = False
280
281 # statistics storage
282 self.nSent = 0 # counter : items sent
283 self.nRecv = 0 # counter : items received
284 self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
285 self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
286 self.qinTime = 0 # time : receiving from self.qin
287 self.qoutTime = 0 # time : sending on qout
288
289 def send(self, item):
290 # This class manages the sending of a TBufferFile Event to a Queue
291 # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
292 assert item.__class__.__name__ == "tuple"
293 startTransmission = time.time()
294 self.qout.put(item)
295 # allow the background thread to feed the Queue; not 100% guaranteed to
296 # finish before next line
297 while self.qout._buffer:
298 time.sleep(NAP)
299 self.qoutTime += time.time() - startTransmission
300 self.sizeSent += item[1].Length()
301 self.nSent += 1
302 return SUCCESS
303
304 def receive(self, timeout=None):
305 # Receive items from self.qin
306 startWait = time.time()
307 try:
308 itemIn = self.qin.get(timeout=timeout)
309 except Empty:
310 return None
311 self.qinTime += time.time() - startWait
312 self.nRecv += 1
313 if itemIn.__class__.__name__ == "tuple":
314 self.sizeRecv += itemIn[1].Length()
315 else:
316 self.nRecv -= 1
317 try:
318 self.qin.task_done()
319 except Exception:
320 self._gmpc.log.warning(
321 "TASK_DONE called too often by : %s" % (self._gmpc.nodeType)
322 )
323 return itemIn
324
325 def finalize(self):
326 self.log.info(
327 "Finalize Event Communicator : %s %s" % (self._gmpc, self._gmpc.nodeType)
328 )
329 # Reader sends one flag for each worker
330 # Workers send one flag each
331 # Writer sends nothing (it's at the end of the chain)
332 if self._gmpc.nodeType == "Reader":
333 downstream = self._gmpc.nWorkers
334 elif self._gmpc.nodeType == "Writer":
335 downstream = 0
336 elif self._gmpc.nodeType == "Worker":
337 downstream = 1
338
339 for i in range(downstream):
340 self.qout.put("FINISHED")
341 if self._gmpc.nodeType != "Writer":
342 self.qout.join()
343 # Now some reporting...
344 self.statistics()
345
346 def statistics(self):
347 self.log.name = "%s-%i Audit " % (self._gmpc.nodeType, self._gmpc.nodeID)
348 self.log.info("Items Sent : %i" % (self.nSent))
349 self.log.info("Items Received : %i" % (self.nRecv))
350 self.log.info("Data Sent : %i" % (self.sizeSent))
351 self.log.info("Data Received : %i" % (self.sizeRecv))
352 self.log.info("Q-out Time : %5.2f" % (self.qoutTime))
353 self.log.info("Q-in Time : %5.2f" % (self.qinTime))
354
355
356# =============================================================================
357
358
359class TESSerializer(object):
360 def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
361 self.T = gaudiTESSerializer
362 self.evt = evtDataSvc
363 self.buffersIn = []
364 self.buffersOut = []
365 self.nIn = 0
366 self.nOut = 0
367 self.tDump = 0.0
368 self.tLoad = 0.0
369 # logging
370 self.nodeType = nodeType
371 self.nodeID = nodeID
372 self.log = log
373
374 def Load(self, tbuf):
375 root = gbl.DataObject()
376 setOwnership(root, False)
377 self.evt.setRoot("/Event", root)
378 t = time.time()
379 self.T.loadBuffer(tbuf)
380 self.tLoad += time.time() - t
381 self.nIn += 1
382 self.buffersIn.append(tbuf.Length())
383
384 def Dump(self):
385 t = time.time()
386 tb = TBufferFile(TBuffer.kWrite)
387 self.T.dumpBuffer(tb)
388 self.tDump += time.time() - t
389 self.nOut += 1
390 self.buffersOut.append(tb.Length())
391 return tb
392
393 def Report(self):
394 evIn = "Events Loaded : %i" % (self.nIn)
395 evOut = "Events Dumped : %i" % (self.nOut)
396 din = sum(self.buffersIn)
397 dataIn = "Data Loaded : %i" % (din)
398 dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
399 if self.nIn:
400 avgIn = "Avg Buf Loaded : %5.2f Mb" % (din / (self.nIn * MB))
401 maxIn = "Max Buf Loaded : %5.2f Mb" % (max(self.buffersIn) / MB)
402 else:
403 avgIn = "Avg Buf Loaded : N/A"
404 maxIn = "Max Buf Loaded : N/A"
405 dout = sum(self.buffersOut)
406 dataOut = "Data Dumped : %i" % (dout)
407 dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
408 if self.nOut:
409 avgOut = "Avg Buf Dumped : %5.2f Mb" % (din / (self.nOut * MB))
410 maxOut = "Max Buf Dumped : %5.2f Mb" % (max(self.buffersOut) / MB)
411 else:
412 avgOut = "Avg Buf Dumped : N/A"
413 maxOut = "Max Buf Dumped : N/A"
414 dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
415 loadTime = "Total Load Time : %5.2f" % (self.tLoad)
416
417 lines = (
418 evIn,
419 evOut,
420 dataIn,
421 dataInMb,
422 avgIn,
423 maxIn,
424 dataOut,
425 dataOutMb,
426 avgOut,
427 maxOut,
428 dumpTime,
429 loadTime,
430 )
431 self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
432 for line in lines:
433 self.log.info(line)
434 self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
435
436
437# =============================================================================
438
439
440class GMPComponent(object):
441 # This class will be the template for Reader, Worker and Writer
442 # containing all common components
443 # nodeId will be a numerical identifier for the node
444 # -1 for reader
445 # -2 for writer
446 # 0,...,nWorkers-1 for the Workers
447 def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
448 # declare a Gaudi MultiProcessing Node
449 # the nodeType is going to be one of Reader, Worker, Writer
450 # qPair is going to be a tuple of ( qin, qout )
451 # for sending and receiving
452 # if nodeType is "Writer", it will be a list of qPairs,
453 # as there's one queue-in from each Worker
454 #
455 # params is a tuple of (nWorkers, config, log)
456
457 self.nodeType = nodeType
458 current_process().name = nodeType
459
460 # Synchronisation Event() objects for keeping track of the system
461 self.initEvent, eventLoopSyncer, self.finalEvent = events
462 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
463
464 # necessary for knowledge of the system
465 self.nWorkers, self.sEvent, self.config, self.log = params
466 self.subworkers = subworkers
467 self.nodeID = nodeID
468 self.msgFormat = self.config["MessageSvc"].Format
469
470 # describe the state of the node by the current Event Number
471 self.currentEvent = None
472
473 # Unpack the Queues : (events, histos, filerecords)
474 self.queues = queues
475 self.num = 0
476
477 ks = self.config.keys()
478 self.app = None
479 list = ["Brunel", "DaVinci", "Boole", "Gauss"]
480 for k in list:
481 if k in ks:
482 self.app = k
483
484 def Start(self):
485 # define the separate process
486 qPair, histq, fq = self.queues
487
488 # Set up the Queue Mechanisms ( Event Communicators )
489 if self.nodeType == "Reader" or self.nodeType == "Worker":
490 # Reader or Worker Node
491 qin, qout = qPair
492 self.evcom = EventCommunicator(self, qin, qout)
493 else:
494 # Writer : many queues in, no queue out
495 assert self.nodeType == "Writer"
496 self.evcoms = []
497 qsin = qPair[0]
498 for q in qsin:
499 ec = EventCommunicator(self, q, None)
500 self.evcoms.append(ec)
501 # Histogram Queue
502 self.hq = histq
503 # FileRecords Queue
504 self.fq = fq
505
506 # Universal Counters (available to all nodes)
507 # Use sensibly!!!
508 self.nIn = 0
509 self.nOut = 0
510
511 # Status Flag (possibly remove later)
512 self.stat = SUCCESS
513
514 # Set logger name
515 self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
516
517 # Heuristic variables
518 # time for init, run, final, firstEventTime, totalTime
519 self.iTime = 0.0
520 self.rTime = 0.0
521 self.fTime = 0.0
522 self.firstEvTime = 0.0
523 self.tTime = 0.0
524
525 self.proc = Process(target=self.Engine)
526 # Fork and start the separate process
527 self.proc.start()
528
529 def Engine(self):
530 # This will be the method carried out by the Node
531 # Different for all
532 pass
533
535 # Different for all ; customize Configuration for multicore
536 pass
537
539 # This method will initialize the GaudiPython Tools
540 # such as the AppMgr and so on
541 self.a = AppMgr()
542 if SMAPS:
543 from AlgSmapShot import SmapShot
544
545 smapsLog = self.nodeType + "-" + str(self.nodeID) + ".smp"
546 ss = SmapShot(logname=smapsLog)
547 self.a.addAlgorithm(ss)
548 self.evt = self.a.evtsvc()
549 self.hvt = self.a.histsvc()
550 self.fsr = self.a.filerecordsvc()
551 self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
552 self.pers = self.a.service("EventPersistencySvc", "IAddressCreator")
553 self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
554 self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
555 return SUCCESS
556
558 self.a.initialize()
559 self.a.start()
560 return SUCCESS
561
562 def LoadTES(self, tbufferfile):
563 root = gbl.DataObject()
564 setOwnership(root, False)
565 self.evt.setRoot("/Event", root)
566 self.ts.loadBuffer(tbufferfile)
567
568 def getEventNumber(self):
569 if self.app != "Gauss":
570 # Using getList or getHistoNames can result in the EventSelector
571 # re-initialising connection to RootDBase, which costs a lot of
572 # time... try to build a set of Header paths??
573
574 # First Attempt : Unpacked Event Data
575 lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
576 for l in lst:
577 path = l
578 try:
579 n = self.evt[path].evtNumber()
580
581 return n
582 except Exception:
583 # No evt number at this path
584 continue
585
586 # second attepmt : try DAQ/RawEvent data
587 # The Evt Number is in bank type 16, bank 0, data pt 4
588 try:
589 n = self.evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
590
591 return n
592 except Exception:
593 pass
594
595 # Default Action
596 if self.nIn > 0 or self.nOut > 0:
597 pass
598 else:
599 self.log.warning("Could not determine Event Number")
600 return -1
601 else:
602 if self.nodeID == -1:
603 self.num = self.num + 1
604
605 return self.num
606
608 #
609 # Identify Writers in the Configuration
610 #
611 d = {}
612 keys = ["events", "records", "tuples", "histos"]
613 for k in keys:
614 d[k] = []
615
616 # Identify Writers and Classify
617 wkeys = WRITERTYPES.keys()
618 for v in self.config.values():
619 if v.__class__.__name__ in wkeys:
620 writerType = WRITERTYPES[v.__class__.__name__]
621 d[writerType].append(MiniWriter(v, writerType, self.config))
622 if self.nodeID == 0:
623 self.log.info("Writer Found : %s" % (v.name()))
624
625 # Now Check for the Histogram Service
626 if "HistogramPersistencySvc" in self.config.keys():
627 hfile = self.config["HistogramPersistencySvc"].getProp("OutputFile")
628 d["histos"].append(hfile)
629 return d
630
631 def dumpHistograms(self):
632 """
633 Method used by the GaudiPython algorithm CollectHistos
634 to obtain a dictionary of form { path : object }
635 representing the Histogram Store
636 """
637 nlist = self.hvt.getHistoNames()
638 histDict = {}
639 objects = 0
640 histos = 0
641 if nlist:
642 for n in nlist:
643 o = self.hvt[n]
644 if type(o) in aidatypes:
645 o = aida2root(o)
646 histos += 1
647 else:
648 objects += 1
649 histDict[n] = o
650 else:
651 print("WARNING : no histograms to recover?")
652 return histDict
653
654 def Initialize(self):
655 start = time.time()
657 self.SetupGaudiPython()
658 self.initEvent.set()
659 self.StartGaudiPython()
660
661 if self.app == "Gauss":
662 tool = self.a.tool("ToolSvc.EvtCounter")
663 self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
664 else:
665 self.cntr = cppyy.nullptr
666
667 self.iTime = time.time() - start
668
669 def Finalize(self):
670 start = time.time()
671 self.a.stop()
672 self.a.finalize()
673 self.log.info("%s-%i Finalized" % (self.nodeType, self.nodeID))
674 self.finalEvent.set()
675 self.fTime = time.time() - start
676
677 def Report(self):
678 self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
679 allTime = "Alive Time : %5.2f" % (self.tTime)
680 initTime = "Init Time : %5.2f" % (self.iTime)
681 frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
682 runTime = "Run Time : %5.2f" % (self.rTime)
683 finTime = "Finalise Time : %5.2f" % (self.fTime)
684 tup = (allTime, initTime, frstTime, runTime, finTime)
685 for t in tup:
686 self.log.info(t)
687 self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
688 # and report from the TESSerializer
689 self.TS.Report()
690
691
692# =============================================================================
693
694
696 def __init__(self, queues, events, params, subworkers):
697 GMPComponent.__init__(self, "Reader", -1, queues, events, params, subworkers)
698
700 # Reader :
701 # No algorithms
702 # No output
703 # No histos
704 self.config["ApplicationMgr"].TopAlg = []
705 self.config["ApplicationMgr"].OutStream = []
706 if "HistogramPersistencySvc" in self.config.keys():
707 self.config["HistogramPersistencySvc"].OutputFile = ""
708 self.config["MessageSvc"].Format = "%-13s " % "[Reader]" + self.msgFormat
709 self.evtMax = self.config["ApplicationMgr"].EvtMax
710
711 def DumpEvent(self):
712 tb = TBufferFile(TBuffer.kWrite)
713 # print '----Reader dumping Buffer!!!'
714 self.ts.dumpBuffer(tb)
715 # print '\tBuffer Dumped, size : %i'%( tb.Length() )
716 return tb
717
718 def DoFirstEvent(self):
719 # Do First Event ------------------------------------------------------
720 # Check Termination Criteria
721 startFirst = time.time()
722 self.log.info("Reader : First Event")
723 if self.nOut == self.evtMax:
724 self.log.info("evtMax( %i ) reached" % (self.evtMax))
725 self.lastEvent.set()
726 return SUCCESS
727 else:
728 # Continue to read, dump and send event
729 self.a.run(1)
730 if not bool(self.evt["/Event"]):
731 self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
732 self.lastEvent.set()
733 return SUCCESS
734 else:
735 # Popluate TESSerializer list and send Event
736 if self.app == "Gauss":
737 lst = self.evt.getHistoNames()
738 else:
739 try:
740 lst = self.evt.getList()
741 if self.app == "DaVinci":
742 daqnode = self.evt.retrieveObject("/Event/DAQ").registry()
743 setOwnership(daqnode, False)
744 self.evt.getList(daqnode, lst, daqnode.address().par())
745 except Exception:
746 self.log.critical("Reader could not acquire TES List!")
747 self.lastEvent.set()
748 return FAILURE
749 self.log.info("Reader : TES List : %i items" % (len(lst)))
750 for l in lst:
751 self.ts.addItem(l)
753 tb = self.TS.Dump()
754 self.log.info("First Event Sent")
755 self.evcom.send((self.currentEvent, tb))
756 self.nOut += 1
757 self.eventLoopSyncer.set()
758 self.evt.clearStore()
759 self.firstEvTime = time.time() - startFirst
760 return SUCCESS
761
762 def Engine(self):
763 # rename process
764 import ctypes
765
766 libc = ctypes.CDLL("libc.so.6")
767 name = str(self.nodeType) + str(self.nodeID) + "\0"
768 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
769
770 startEngine = time.time()
771 self.log.name = "Reader"
772 self.log.info("Reader Process starting")
773
774 self.Initialize()
775
776 # add the Histogram Collection Algorithm
777 self.a.addAlgorithm(CollectHistograms(self))
778
779 self.log.info("Reader Beginning Distribution")
780 sc = self.DoFirstEvent()
781 if sc.isSuccess():
782 self.log.info("Reader First Event OK")
783 else:
784 self.log.critical("Reader Failed on First Event")
785 self.stat = FAILURE
786
787 # Do All Others -------------------------------------------------------
788 while True:
789 # Check Termination Criteria
790 if self.nOut == self.evtMax:
791 self.log.info("evtMax( %i ) reached" % (self.evtMax))
792 break
793 # Check Health
794 if not self.stat.isSuccess():
795 self.log.critical("Reader is Damaged!")
796 break
797 # Continue to read, dump and send event
798 t = time.time()
799 self.a.run(1)
800 self.rTime += time.time() - t
801 if not bool(self.evt["/Event"]):
802 self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
803 break
804 self.currentEvent = self.getEventNumber()
805 tb = self.TS.Dump()
806 self.evcom.send((self.currentEvent, tb))
807 # clean up
808 self.nOut += 1
809 self.eventLoopSyncer.set()
810 self.evt.clearStore()
811 self.log.info("Setting <Last> Event")
812 self.lastEvent.set()
813
814 # Finalize
815 self.log.info("Reader : Event Distribution complete.")
816 self.evcom.finalize()
817 self.Finalize()
818 self.tTime = time.time() - startEngine
819 self.Report()
820
821
822# =============================================================================
823
824
826 def __init__(self, workerID, queues, events, params, subworkers):
827 GMPComponent.__init__(
828 self, "Worker", workerID, queues, events, params, subworkers
829 )
830 # Identify the writer streams
832 # Identify the accept/veto checks for each event
833 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
834 self.log.info("Subworker-%i Created OK" % (self.nodeID))
835 self.eventOutput = True
836
837 def Engine(self):
838 # rename process
839 import ctypes
840
841 libc = ctypes.CDLL("libc.so.6")
842 name = str(self.nodeType) + str(self.nodeID) + "\0"
843 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
844
845 self.initEvent.set()
846 startEngine = time.time()
847 msg = self.a.service("MessageSvc")
848 msg.Format = "%-13s " % ("[" + self.log.name + "]") + self.msgFormat
849
850 self.log.name = "Worker-%i" % (self.nodeID)
851 self.log.info("Subworker %i starting Engine" % (self.nodeID))
853
854 # populate the TESSerializer itemlist
855 self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
856
857 self.a.addAlgorithm(CollectHistograms(self))
858
859 # Begin processing
860 Go = True
861 while Go:
862 packet = self.evcom.receive()
863 if packet:
864 pass
865 else:
866 continue
867 if packet == "FINISHED":
868 break
869 evtNumber, tbin = packet # unpack
870 if self.cntr != cppyy.nullptr:
871 self.cntr.setEventCounter(evtNumber)
872
873 self.nIn += 1
874 self.TS.Load(tbin)
875
876 t = time.time()
877 sc = self.a.executeEvent()
878 if self.nIn == 1:
879 self.firstEvTime = time.time() - t
880 else:
881 self.rTime += time.time() - t
882 if sc.isSuccess():
883 pass
884 else:
885 self.log.name = "Worker-%i" % (self.nodeID)
886 self.log.warning("Did not Execute Event")
887 self.evt.clearStore()
888 continue
889 if self.isEventPassed():
890 pass
891 else:
892 self.log.name = "Worker-%i" % (self.nodeID)
893 self.log.warning("Event did not pass : %i" % (evtNumber))
894 self.evt.clearStore()
895 continue
896 if self.eventOutput:
897 # It may be the case of generating Event Tags; hence
898 # no event output
900 tb = self.TS.Dump()
901 self.evcom.send((self.currentEvent, tb))
902 self.nOut += 1
903 self.inc.fireIncident(gbl.Incident("Subworker", "EndEvent"))
904 self.eventLoopSyncer.set()
905 self.evt.clearStore()
906 self.log.name = "Worker-%i" % (self.nodeID)
907 self.log.info("Setting <Last> Event %s" % (self.nodeID))
908 self.lastEvent.set()
909
910 self.evcom.finalize()
911 # Now send the FileRecords and stop/finalize the appMgr
912 self.filerecordsAgent.SendFileRecords()
913 self.tTime = time.time() - startEngine
914 self.Finalize()
915 self.Report()
916 # self.finalEvent.set()
917
918 def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
919 self.a = a
920 self.evt = evt
921 self.hvt = hvt
922 self.fsr = fsr
923 # self.inc = inc
924 self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
925 self.pers = pers
926 self.ts = ts
927 self.cntr = cntr
928 self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
929
930 def getCheckAlgs(self):
931 """
932 For some output writers, a check is performed to see if the event has
933 executed certain algorithms.
934 These reside in the AcceptAlgs property for those writers
935 """
936 acc = []
937 req = []
938 vet = []
939 for m in self.writerDict["events"]:
940 if hasattr(m.w, "AcceptAlgs"):
941 acc += m.w.AcceptAlgs
942 if hasattr(m.w, "RequireAlgs"):
943 req += m.w.RequireAlgs
944 if hasattr(m.w, "VetoAlgs"):
945 vet += m.w.VetoAlgs
946 return (acc, req, vet)
947
948 def checkExecutedPassed(self, algName):
949 if (
950 self.a.algorithm(algName)._ialg.isExecuted()
951 and self.a.algorithm(algName)._ialg.filterPassed()
952 ):
953 return True
954 else:
955 return False
956
957 def isEventPassed(self):
958 """
959 Check the algorithm status for an event.
960 Depending on output writer settings, the event
961 may be declined based on various criteria.
962 This is a transcript of the check that occurs in GaudiSvc::OutputStream
963 """
964 passed = False
965
966 self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
967 if self.acceptAlgs:
968 for name in self.acceptAlgs:
969 if self.checkExecutedPassed(name):
970 passed = True
971 break
972 else:
973 passed = True
974
975 self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
976 for name in self.requireAlgs:
977 if self.checkExecutedPassed(name):
978 pass
979 else:
980 self.log.info("Evt declined (requireAlgs) : %s" % (name))
981 passed = False
982
983 self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
984 for name in self.vetoAlgs:
985 if self.checkExecutedPassed(name):
986 pass
987 else:
988 self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
989 passed = False
990 return passed
991
992
993# =============================================================================
994
995
997 def __init__(self, workerID, queues, events, params, subworkers):
998 GMPComponent.__init__(
999 self, "Worker", workerID, queues, events, params, subworkers
1000 )
1001 # Identify the writer streams
1003 # Identify the accept/veto checks for each event
1005 self.log.name = "Worker-%i" % (self.nodeID)
1006 self.log.info("Worker-%i Created OK" % (self.nodeID))
1007 self.eventOutput = True
1008
1010 # Worker :
1011 # No input
1012 # No output
1013 # No Histos
1014 self.config["EventSelector"].Input = []
1015 self.config["ApplicationMgr"].OutStream = []
1016 if "HistogramPersistencySvc" in self.config.keys():
1017 self.config["HistogramPersistencySvc"].OutputFile = ""
1018 formatHead = "[Worker-%i]" % (self.nodeID)
1019 self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1020
1021 for key, lst in self.writerDict.items():
1022 self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1023
1024 for m in self.writerDict["tuples"]:
1025 # rename Tuple output file with an appendix
1026 # based on worker id, for merging later
1027 newName = m.getNewName(".", ".w%i." % (self.nodeID))
1028 self.config[m.key].Output = newName
1029
1030 # Suppress INFO Output for all but Worker-0
1031 # if self.nodeID == 0 :
1032 # pass
1033 # else :
1034 # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1035
1036 if self.app == "Gauss":
1037 try:
1038 if "ToolSvc.EvtCounter" not in self.config:
1039 from Configurables import EvtCounter
1040
1041 counter = EvtCounter()
1042 else:
1043 counter = self.config["ToolSvc.EvtCounter"]
1044 counter.UseIncident = False
1045 except Exception:
1046 # ignore errors when trying to change the configuration of the EvtCounter
1047 self.log.warning("Cannot configure EvtCounter")
1048
1049 def Engine(self):
1050 # rename process
1051 import ctypes
1052
1053 libc = ctypes.CDLL("libc.so.6")
1054 name = str(self.nodeType) + str(self.nodeID) + "\0"
1055 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1056
1057 startEngine = time.time()
1058 self.log.info("Worker %i starting Engine" % (self.nodeID))
1059 self.Initialize()
1061
1062 # populate the TESSerializer itemlist
1063 self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1064
1065 nEventWriters = len(self.writerDict["events"])
1066 if nEventWriters:
1067 itemList = set()
1068 optItemList = set()
1069 for m in self.writerDict["events"]:
1070 for item in m.ItemList:
1071 hsh = item.find("#")
1072 if hsh != -1:
1073 item = item[:hsh]
1074 itemList.add(item)
1075 for item in m.OptItemList:
1076 hsh = item.find("#")
1077 if hsh != -1:
1078 item = item[:hsh]
1079 optItemList.add(item)
1080 # If an item is mandatory and optional, keep it only in the optional list
1081 itemList -= optItemList
1082 for item in sorted(itemList):
1083 self.log.info(" adding ItemList Item to ts : %s" % (item))
1084 self.ts.addItem(item)
1085 for item in sorted(optItemList):
1086 self.log.info(" adding Optional Item to ts : %s" % (item))
1087 self.ts.addOptItem(item)
1088 else:
1089 self.log.info("There is no Event Output for this app")
1090 self.eventOutput = False
1091
1092 # Begin processing
1093 Go = True
1094 while Go:
1095 packet = self.evcom.receive()
1096 if packet:
1097 pass
1098 else:
1099 continue
1100 if packet == "FINISHED":
1101 break
1102 evtNumber, tbin = packet # unpack
1103 if self.cntr != cppyy.nullptr:
1104 self.cntr.setEventCounter(evtNumber)
1105
1106 # subworkers are forked before the first event is processed
1107 if self.nIn == 0:
1108 self.log.info("Fork new subworkers")
1109
1110 # Fork subworkers and share services
1111 for k in self.subworkers:
1112 k.SetServices(
1113 self.a,
1114 self.evt,
1115 self.hvt,
1116 self.fsr,
1117 self.inc,
1118 self.pers,
1119 self.ts,
1120 self.cntr,
1121 )
1122 k.Start()
1123 self.a.addAlgorithm(CollectHistograms(self))
1124 self.nIn += 1
1125 self.TS.Load(tbin)
1126
1127 t = time.time()
1128 sc = self.a.executeEvent()
1129 if self.nIn == 1:
1130 self.firstEvTime = time.time() - t
1131 else:
1132 self.rTime += time.time() - t
1133 if sc.isSuccess():
1134 pass
1135 else:
1136 self.log.warning("Did not Execute Event")
1137 self.evt.clearStore()
1138 continue
1139 if self.isEventPassed():
1140 pass
1141 else:
1142 self.log.warning("Event did not pass : %i" % (evtNumber))
1143 self.evt.clearStore()
1144 continue
1145 if self.eventOutput:
1146 # It may be the case of generating Event Tags; hence
1147 # no event output
1149 tb = self.TS.Dump()
1150 self.evcom.send((self.currentEvent, tb))
1151 self.nOut += 1
1152 self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1153 self.eventLoopSyncer.set()
1154 self.evt.clearStore()
1155 self.log.info("Setting <Last> Event")
1156 self.lastEvent.set()
1157
1158 self.evcom.finalize()
1159 self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1160 # Now send the FileRecords and stop/finalize the appMgr
1161 self.filerecordsAgent.SendFileRecords()
1162 self.Finalize()
1163 self.tTime = time.time() - startEngine
1164 self.Report()
1165
1166 for k in self.subworkers:
1167 self.log.info("Join subworkers")
1168 k.proc.join()
1169
1170 def getCheckAlgs(self):
1171 """
1172 For some output writers, a check is performed to see if the event has
1173 executed certain algorithms.
1174 These reside in the AcceptAlgs property for those writers
1175 """
1176 acc = []
1177 req = []
1178 vet = []
1179 for m in self.writerDict["events"]:
1180 if hasattr(m.w, "AcceptAlgs"):
1181 acc += m.w.AcceptAlgs
1182 if hasattr(m.w, "RequireAlgs"):
1183 req += m.w.RequireAlgs
1184 if hasattr(m.w, "VetoAlgs"):
1185 vet += m.w.VetoAlgs
1186 return (acc, req, vet)
1187
1188 def checkExecutedPassed(self, algName):
1189 if (
1190 self.a.algorithm(algName)._ialg.isExecuted()
1191 and self.a.algorithm(algName)._ialg.filterPassed()
1192 ):
1193 return True
1194 else:
1195 return False
1196
1197 def isEventPassed(self):
1198 """
1199 Check the algorithm status for an event.
1200 Depending on output writer settings, the event
1201 may be declined based on various criteria.
1202 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1203 """
1204 passed = False
1205
1206 self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1207 if self.acceptAlgs:
1208 for name in self.acceptAlgs:
1209 if self.checkExecutedPassed(name):
1210 passed = True
1211 break
1212 else:
1213 passed = True
1214
1215 self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1216 for name in self.requireAlgs:
1217 if self.checkExecutedPassed(name):
1218 pass
1219 else:
1220 self.log.info("Evt declined (requireAlgs) : %s" % (name))
1221 passed = False
1222
1223 self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1224 for name in self.vetoAlgs:
1225 if self.checkExecutedPassed(name):
1226 pass
1227 else:
1228 self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1229 passed = False
1230 return passed
1231
1232
1233# =============================================================================
1234
1235
1237 def __init__(self, queues, events, params, subworkers):
1238 GMPComponent.__init__(self, "Writer", -2, queues, events, params, subworkers)
1239 # Identify the writer streams
1241 # This keeps track of workers as they finish
1242 self.status = [False] * self.nWorkers
1243 self.log.name = "Writer--2"
1244
1246 # Writer :
1247 # No input
1248 # No Algs
1249 self.config["ApplicationMgr"].TopAlg = []
1250 self.config["EventSelector"].Input = []
1251
1252 self.config["MessageSvc"].Format = "%-13s " % "[Writer]" + self.msgFormat
1253
1254 def Engine(self):
1255 # rename process
1256 import ctypes
1257
1258 libc = ctypes.CDLL("libc.so.6")
1259 name = str(self.nodeType) + str(self.nodeID) + "\0"
1260 libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1261
1262 self.Initialize()
1265
1266 # Begin processing
1267 Go = True
1268 current = -1
1269 while Go:
1270 current = (current + 1) % self.nWorkers
1271 packet = self.evcoms[current].receive(timeout=0.01)
1272 if packet is None:
1273 continue
1274 if packet == "FINISHED":
1275 self.log.info("Writer got FINISHED flag : Worker %i" % (current))
1276
1277 self.status[current] = True
1278 if all(self.status):
1279 self.log.info("FINISHED recd from all workers, break loop")
1280 break
1281 continue
1282 # otherwise, continue as normal
1283 self.nIn += 1 # update central count (maybe needed by FSR store)
1284 evtNumber, tbin = packet # unpack
1285 self.TS.Load(tbin)
1286 t = time.time()
1287 self.a.executeEvent()
1288 self.rTime += time.time() - t
1290 self.evt.clearStore()
1291 self.eventLoopSyncer.set()
1292 self.log.name = "Writer--2"
1293 self.log.info("Setting <Last> Event")
1294 self.lastEvent.set()
1295
1296 # finalisation steps
1297 [e.finalize() for e in self.evcoms]
1298 # Now do Histograms
1299 sc = self.histoAgent.Receive()
1300 sc = self.histoAgent.RebuildHistoStore()
1301 if sc.isSuccess():
1302 self.log.info("Histo Store rebuilt ok")
1303 else:
1304 self.log.warning("Histo Store Error in Rebuild")
1305
1306 # Now do FileRecords
1307 sc = self.filerecordsAgent.Receive()
1308 self.filerecordsAgent.Rebuild()
1309 self.Finalize()
1310 # self.rTime = time.time()-startEngine
1311 self.Report()
1312
1313
1314# =============================================================================
1315
1316# =============================================================================
1317
1318
1319class Coord(object):
1320 def __init__(self, nWorkers, config, log):
1321 self.log = log
1322 self.config = config
1323 # set up Logging
1324 self.log.name = "GaudiPython-Parallel-Logger"
1325 self.log.info("GaudiPython Parallel Process Co-ordinator beginning")
1326
1327 if nWorkers == -1:
1328 # The user has requested all available cpus in the machine
1329 self.nWorkers = cpu_count()
1330 else:
1331 self.nWorkers = nWorkers
1332
1333 self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1334 self.hq = JoinableQueue() # for Histogram data
1335 self.fq = JoinableQueue() # for FileRecords data
1336
1337 # Make a Syncer for Initalise, Run, and Finalise
1339 self.nWorkers, self.log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1340 )
1341 self.sRun = Syncer(
1342 self.nWorkers,
1343 self.log,
1344 manyEvents=True,
1345 limit=WAIT_SINGLE_EVENT,
1346 step=STEP_EVENT,
1347 firstEvent=WAIT_FIRST_EVENT,
1348 )
1349 self.sFin = Syncer(
1350 self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE
1351 )
1352 # and one final one for Histogram Transfer
1353 self.histSyncEvent = Event()
1354
1355 # params are common to al subprocesses
1356 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1357
1358 self.subworkers = []
1359 # Declare SubProcesses!
1360 for i in range(1, self.nWorkers):
1361 sub = Subworker(
1362 i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers
1363 )
1364 self.subworkers.append(sub)
1366 self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers
1367 )
1368 self.workers = []
1369 wk = Worker(
1370 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers
1371 )
1373 self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers
1374 )
1375
1376 self.system = []
1377 self.system.append(self.writer)
1378 self.system.append(wk)
1379 self.system.append(self.reader)
1380
1381 def getSyncEvents(self, nodeID):
1382 init = self.sInit.d[nodeID].event
1383 run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1384 fin = self.sFin.d[nodeID].event
1385 return (init, run, fin)
1386
1387 def getQueues(self, nodeID):
1388 eventQ = self.qs[nodeID]
1389 histQ = self.hq
1390 fsrQ = self.fq
1391 return (eventQ, histQ, fsrQ)
1392
1393 def Go(self):
1394 # Initialise
1395 self.log.name = "GaudiPython-Parallel-Logger"
1396 self.log.info("INITIALISING SYSTEM")
1397
1398 # Start reader, writer and main worker
1399 for p in self.system:
1400 p.Start()
1401
1402 sc = self.sInit.syncAll(step="Initialise")
1403 if sc == SUCCESS:
1404 pass
1405 else:
1406 self.Terminate()
1407 return FAILURE
1408
1409 # Run
1410 self.log.name = "GaudiPython-Parallel-Logger"
1411 self.log.info("RUNNING SYSTEM")
1412 sc = self.sRun.syncAll(step="Run")
1413 if sc == SUCCESS:
1414 pass
1415 else:
1416 self.Terminate()
1417 return FAILURE
1418
1419 # Finalise
1420 self.log.name = "GaudiPython-Parallel-Logger"
1421 self.log.info("FINALISING SYSTEM")
1422 sc = self.sFin.syncAll(step="Finalise")
1423 if sc == SUCCESS:
1424 pass
1425 else:
1426 self.Terminate()
1427 return FAILURE
1428
1429 # if we've got this far, finally report SUCCESS
1430 self.log.info("Cleanly join all Processes")
1431 self.Stop()
1432 self.log.info("Report Total Success to Main.py")
1433 return SUCCESS
1434
1435 def Terminate(self):
1436 # Brutally kill sub-processes
1437 children = multiprocessing.active_children()
1438 for i in children:
1439 i.terminate()
1440
1441 # self.writer.proc.terminate()
1442 # [ w.proc.terminate() for w in self.workers]
1443 # self.reader.proc.terminate()
1444
1445 def Stop(self):
1446 # procs should be joined in reverse order to launch
1447 self.system.reverse()
1448 for s in self.system:
1449 s.proc.join()
1450 return SUCCESS
1451
1452 def SetupQueues(self):
1453 # This method will set up the network of Queues needed
1454 # N Queues = nWorkers + 1
1455 # Each Worker has a Queue in, and a Queue out
1456 # Reader has Queue out only
1457 # Writer has nWorkers Queues in
1458
1459 # one queue from Reader-Workers
1460 rwk = JoinableQueue()
1461 # one queue from each worker to writer
1462 workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1463 d = {}
1464 d[-1] = (None, rwk) # Reader
1465 d[-2] = (workersWriter, None) # Writer
1466 for i in range(self.nWorkers):
1467 d[i] = (rwk, workersWriter[i])
1468 return d
1469
1470
1471# ============================= EOF ===========================================
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
__init__(self, gmpcomponent)
Definition GMPBase.py:224
getSyncEvents(self, nodeID)
Definition GMPBase.py:1381
__init__(self, nWorkers, config, log)
Definition GMPBase.py:1320
getQueues(self, nodeID)
Definition GMPBase.py:1387
__init__(self, GMPComponent, qin, qout)
Definition GMPBase.py:268
receive(self, timeout=None)
Definition GMPBase.py:304
__init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition GMPBase.py:447
LoadTES(self, tbufferfile)
Definition GMPBase.py:562
set(self, key, output)
Definition GMPBase.py:188
getNewName(self, replaceThis, withThis, extra="")
Definition GMPBase.py:150
__init__(self, writer, wType, config)
Definition GMPBase.py:119
getItemLists(self, config)
Definition GMPBase.py:171
__init__(self, queues, events, params, subworkers)
Definition GMPBase.py:696
checkExecutedPassed(self, algName)
Definition GMPBase.py:948
SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition GMPBase.py:918
__init__(self, workerID, queues, events, params, subworkers)
Definition GMPBase.py:826
__init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition GMPBase.py:360
checkExecutedPassed(self, algName)
Definition GMPBase.py:1188
__init__(self, workerID, queues, events, params, subworkers)
Definition GMPBase.py:997
__init__(self, queues, events, params, subworkers)
Definition GMPBase.py:1237
Python Algorithm base class.
Definition Algorithm.h:39
StatusCode execute() override
StatusCode finalize() override
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition reverse.h:58