The Gaudi Framework  master (37c0b60a)
GMPBase.py
Go to the documentation of this file.
1 
11 
12 import multiprocessing
13 import time
14 import warnings
15 from multiprocessing import Event, JoinableQueue, Process, cpu_count, current_process
16 from multiprocessing.queues import Empty
17 
18 from ROOT import TBuffer, TBufferFile
19 
20 from GaudiMP.pTools import FileRecordsAgent, HistoAgent, Syncer
21 from GaudiPython import (
22  FAILURE,
23  SUCCESS,
24  AppMgr,
25  InterfaceCast,
26  PyAlgorithm,
27  gbl,
28  setOwnership,
29 )
30 
31 # Workaround for ROOT-10769
32 with warnings.catch_warnings():
33  warnings.simplefilter("ignore")
34  import cppyy
35 
36 # This script contains the bases for the Gaudi MultiProcessing (GMP)
37 # classes
38 
39 # There are three classes :
40 # Reader
41 # Worker
42 # Writer
43 
44 # Each class needs to perform communication with the others
45 # For this, we need a means of communication, which will be based on
46 # the python multiprocessing package
47 # This is provided in SPI pytools package
48 # cmt line : use pytools v1.1 LCG_Interfaces
49 # The PYTHONPATH env variable may need to be modified, as this might
50 # still point to 1.0_python2.5
51 
52 # Each class will need Queues, and a defined method for using these
53 # queues.
54 # For example, as long as there is something in the Queue, both ends
55 # of the queue must be open
56 # Also, there needs to be proper Termination flags and criteria
57 # The System should be error proof.
58 
59 # Constants -------------------------------------------------------------------
60 NAP = 0.001
61 MB = 1024.0 * 1024.0
62 # waits to guard against hanging, in seconds
63 WAIT_INITIALISE = 60 * 10
64 WAIT_FIRST_EVENT = 60 * 3
65 WAIT_SINGLE_EVENT = 60 * 6
66 WAIT_FINALISE = 60 * 2
67 STEP_INITIALISE = 10
68 STEP_EVENT = 2
69 STEP_FINALISE = 10
70 
71 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
72 SMAPS = False
73 
74 # -----------------------------------------------------------------------------
75 
76 # definitions
77 # ----------
78 # used to convert stored histos (in AIDA format) to ROOT format
79 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
80 
81 # used to check which type of histo we are dealing with
82 # i.e. if currentHisto in aidatypes : pass
83 aidatypes = (
84  gbl.AIDA.IHistogram,
85  gbl.AIDA.IHistogram1D,
86  gbl.AIDA.IHistogram2D,
87  gbl.AIDA.IHistogram3D,
88  gbl.AIDA.IProfile1D,
89  gbl.AIDA.IProfile2D,
90  gbl.AIDA.IBaseHistogram,
91 ) # extra?
92 
93 # similar to aidatypes
94 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
95 
96 # Types of OutputStream in Gaudi
97 WRITERTYPES = {
98  "EvtCollectionStream": "tuples",
99  "InputCopyStream": "events",
100  "OutputStream": "events",
101  "RecordStream": "records",
102  "RunRecordStream": "records",
103  "SequentialOutputStream": "events",
104  "TagCollectionStream": "tuples",
105 }
106 
107 # =============================================================================
108 
109 
110 class MiniWriter(object):
111  """
112  A class to represent a writer in the GaudiPython configuration
113  It can be non-trivial to access the name of the output file; it may be
114  specified in the DataSvc, or just on the writer, may be a list, or string
115  Also, there are three different types of writer (events, records, tuples)
116  so this bootstrap class provides easy access to this info while configuring
117  """
118 
119  def __init__(self, writer, wType, config):
120  self.w = writer
121  self.wType = wType
122  # set parameters for directly accessing the correct
123  # part of the configuration, so that later, we can do
124  # config[ key ].Output = modified(output)
125  self.key = None
126  self.output = None
127  self.ItemList = None
128  self.OptItemList = None
129  #
130  self.wName = writer.getName()
131  # Now process the Writer, find where the output is named
132  self.woutput = None
133  self.datasvcName = None
134  self.svcOutput = None
135  if hasattr(self.w, "Output"):
136  self.woutput = self.w.Output
137  self.getItemLists(config)
138  self.set(self.wName, self.w.Output)
139  return
140  else:
141  # if there is no output file, get it via the datasvc
142  # (every writer always has a datasvc property)
143  self.datasvcName = self.w.EvtDataSvc
144  datasvc = config[self.datasvcName]
145  if hasattr(datasvc, "Output"):
146  self.getItemLists(config)
147  self.set(self.datasvcName, datasvc.Output)
148  return
149 
150  def getNewName(self, replaceThis, withThis, extra=""):
151  # replace one pattern in the output name string
152  # with another, and return the Output name
153  # It *might* be in a list, so check for this
154  #
155  # @param extra : might need to add ROOT flags
156  # i.e.: OPT='RECREATE', or such
157  assert replaceThis.__class__.__name__ == "str"
158  assert withThis.__class__.__name__ == "str"
159  old = self.output
160  lst = False
161  if old.__class__.__name__ == "list":
162  old = self.output[0]
163  lst = True
164  new = old.replace(replaceThis, withThis)
165  new += extra
166  if lst:
167  return [new]
168  else:
169  return new
170 
171  def getItemLists(self, config):
172  # the item list
173  if hasattr(self.w, "ItemList"):
174  self.ItemList = self.w.ItemList
175  else:
176  datasvc = config[self.w.EvtDataSvc]
177  if hasattr(datasvc, "ItemList"):
178  self.ItemList = datasvc.ItemList
179  # The option item list; possibly not a valid variable
180  if hasattr(self.w, "OptItemList"):
181  self.OptItemList = self.w.OptItemList
182  else:
183  datasvc = config[self.w.EvtDataSvc]
184  if hasattr(datasvc, "OptItemList"):
185  self.OptItemList = datasvc.OptItemList
186  return
187 
188  def set(self, key, output):
189  self.key = key
190  self.output = output
191  return
192 
193  def __repr__(self):
194  s = ""
195  line = "-" * 80
196  s += line + "\n"
197  s += "Writer : %s\n" % (self.wName)
198  s += "Writer Type : %s\n" % (self.wType)
199  s += "Writer Output : %s\n" % (self.output)
200  s += "DataSvc : %s\n" % (self.datasvcName)
201  s += "DataSvc Output : %s\n" % (self.svcOutput)
202  s += "\n"
203  s += "Key for config : %s\n" % (self.key)
204  s += "Output File : %s\n" % (self.output)
205  s += "ItemList : %s\n" % (self.ItemList)
206  s += "OptItemList : %s\n" % (self.OptItemList)
207  s += line + "\n"
208  return s
209 
210 
211 # =============================================================================
212 
213 
215  """
216  GaudiPython algorithm used to clean up histos on the Reader and Workers
217  Only has a finalize method()
218  This retrieves a dictionary of path:histo objects and sends it to the
219  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
220  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
221  at the writer end, there will be an error.
222  """
223 
224  def __init__(self, gmpcomponent):
225  PyAlgorithm.__init__(self)
226  self._gmpc = gmpcomponent
227  self.log = self._gmpc.log
228  return None
229 
230  def execute(self):
231  return SUCCESS
232 
233  def finalize(self):
234  self.log.info("CollectHistograms Finalise (%s)" % (self._gmpc.nodeType))
235  self._gmpc.hDict = self._gmpc.dumpHistograms()
236  ks = list(self._gmpc.hDict.keys())
237  self.log.info("%i Objects in Histogram Store" % (len(ks)))
238 
239  # crashes occurred due to Memory Error during the sending of hundreds
240  # histos on slc5 machines, so instead, break into chunks
241  # send 100 at a time
242  chunk = 100
243  reps = int(len(ks) / chunk + 1)
244  for i in range(reps):
245  someKeys = ks[i * chunk : (i + 1) * chunk]
246  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
247  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
248  # "finished" Notification
249  self.log.debug("Signalling end of histos to Writer")
250  self._gmpc.hq.put("HISTOS_SENT")
251  self.log.debug("Waiting on Sync Event")
252  self._gmpc.sEvent.wait()
253  self.log.debug("Histo Sync Event set, clearing and returning")
254  self._gmpc.hvt.clearStore()
255  root = gbl.DataObject()
256  setOwnership(root, False)
257  self._gmpc.hvt.setRoot("/stat", root)
258  return SUCCESS
259 
260 
261 # =============================================================================
262 
263 
264 class EventCommunicator(object):
265  # This class is responsible for communicating Gaudi Events via Queues
266  # Events are communicated as TBufferFiles, filled either by the
267  # TESSerializer, or the GaudiSvc, "IPCSvc"
268  def __init__(self, GMPComponent, qin, qout):
269  self._gmpc = GMPComponent
270  self.log = self._gmpc.log
271  # maximum capacity of Queues
272  self.maxsize = 50
273  # central variables : Queues
274  self.qin = qin
275  self.qout = qout
276 
277  # flags
278  self.allsent = False
279  self.allrecv = False
280 
281  # statistics storage
282  self.nSent = 0 # counter : items sent
283  self.nRecv = 0 # counter : items received
284  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
285  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
286  self.qinTime = 0 # time : receiving from self.qin
287  self.qoutTime = 0 # time : sending on qout
288 
289  def send(self, item):
290  # This class manages the sending of a TBufferFile Event to a Queue
291  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
292  assert item.__class__.__name__ == "tuple"
293  startTransmission = time.time()
294  self.qout.put(item)
295  # allow the background thread to feed the Queue; not 100% guaranteed to
296  # finish before next line
297  while self.qout._buffer:
298  time.sleep(NAP)
299  self.qoutTime += time.time() - startTransmission
300  self.sizeSent += item[1].Length()
301  self.nSent += 1
302  return SUCCESS
303 
304  def receive(self, timeout=None):
305  # Receive items from self.qin
306  startWait = time.time()
307  try:
308  itemIn = self.qin.get(timeout=timeout)
309  except Empty:
310  return None
311  self.qinTime += time.time() - startWait
312  self.nRecv += 1
313  if itemIn.__class__.__name__ == "tuple":
314  self.sizeRecv += itemIn[1].Length()
315  else:
316  self.nRecv -= 1
317  try:
318  self.qin.task_done()
319  except Exception:
320  self._gmpc.log.warning(
321  "TASK_DONE called too often by : %s" % (self._gmpc.nodeType)
322  )
323  return itemIn
324 
325  def finalize(self):
326  self.log.info(
327  "Finalize Event Communicator : %s %s" % (self._gmpc, self._gmpc.nodeType)
328  )
329  # Reader sends one flag for each worker
330  # Workers send one flag each
331  # Writer sends nothing (it's at the end of the chain)
332  if self._gmpc.nodeType == "Reader":
333  downstream = self._gmpc.nWorkers
334  elif self._gmpc.nodeType == "Writer":
335  downstream = 0
336  elif self._gmpc.nodeType == "Worker":
337  downstream = 1
338 
339  for i in range(downstream):
340  self.qout.put("FINISHED")
341  if self._gmpc.nodeType != "Writer":
342  self.qout.join()
343  # Now some reporting...
344  self.statistics()
345 
346  def statistics(self):
347  self.log.name = "%s-%i Audit " % (self._gmpc.nodeType, self._gmpc.nodeID)
348  self.log.info("Items Sent : %i" % (self.nSent))
349  self.log.info("Items Received : %i" % (self.nRecv))
350  self.log.info("Data Sent : %i" % (self.sizeSent))
351  self.log.info("Data Received : %i" % (self.sizeRecv))
352  self.log.info("Q-out Time : %5.2f" % (self.qoutTime))
353  self.log.info("Q-in Time : %5.2f" % (self.qinTime))
354 
355 
356 # =============================================================================
357 
358 
359 class TESSerializer(object):
360  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
361  self.T = gaudiTESSerializer
362  self.evt = evtDataSvc
363  self.buffersIn = []
364  self.buffersOut = []
365  self.nIn = 0
366  self.nOut = 0
367  self.tDump = 0.0
368  self.tLoad = 0.0
369  # logging
370  self.nodeType = nodeType
371  self.nodeID = nodeID
372  self.log = log
373 
374  def Load(self, tbuf):
375  root = gbl.DataObject()
376  setOwnership(root, False)
377  self.evt.setRoot("/Event", root)
378  t = time.time()
379  self.T.loadBuffer(tbuf)
380  self.tLoad += time.time() - t
381  self.nIn += 1
382  self.buffersIn.append(tbuf.Length())
383 
384  def Dump(self):
385  t = time.time()
386  tb = TBufferFile(TBuffer.kWrite)
387  self.T.dumpBuffer(tb)
388  self.tDump += time.time() - t
389  self.nOut += 1
390  self.buffersOut.append(tb.Length())
391  return tb
392 
393  def Report(self):
394  evIn = "Events Loaded : %i" % (self.nIn)
395  evOut = "Events Dumped : %i" % (self.nOut)
396  din = sum(self.buffersIn)
397  dataIn = "Data Loaded : %i" % (din)
398  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
399  if self.nIn:
400  avgIn = "Avg Buf Loaded : %5.2f Mb" % (din / (self.nIn * MB))
401  maxIn = "Max Buf Loaded : %5.2f Mb" % (max(self.buffersIn) / MB)
402  else:
403  avgIn = "Avg Buf Loaded : N/A"
404  maxIn = "Max Buf Loaded : N/A"
405  dout = sum(self.buffersOut)
406  dataOut = "Data Dumped : %i" % (dout)
407  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
408  if self.nOut:
409  avgOut = "Avg Buf Dumped : %5.2f Mb" % (din / (self.nOut * MB))
410  maxOut = "Max Buf Dumped : %5.2f Mb" % (max(self.buffersOut) / MB)
411  else:
412  avgOut = "Avg Buf Dumped : N/A"
413  maxOut = "Max Buf Dumped : N/A"
414  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
415  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
416 
417  lines = (
418  evIn,
419  evOut,
420  dataIn,
421  dataInMb,
422  avgIn,
423  maxIn,
424  dataOut,
425  dataOutMb,
426  avgOut,
427  maxOut,
428  dumpTime,
429  loadTime,
430  )
431  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
432  for line in lines:
433  self.log.info(line)
434  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
435 
436 
437 # =============================================================================
438 
439 
440 class GMPComponent(object):
441  # This class will be the template for Reader, Worker and Writer
442  # containing all common components
443  # nodeId will be a numerical identifier for the node
444  # -1 for reader
445  # -2 for writer
446  # 0,...,nWorkers-1 for the Workers
447  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
448  # declare a Gaudi MultiProcessing Node
449  # the nodeType is going to be one of Reader, Worker, Writer
450  # qPair is going to be a tuple of ( qin, qout )
451  # for sending and receiving
452  # if nodeType is "Writer", it will be a list of qPairs,
453  # as there's one queue-in from each Worker
454  #
455  # params is a tuple of (nWorkers, config, log)
456 
457  self.nodeType = nodeType
458  current_process().name = nodeType
459 
460  # Synchronisation Event() objects for keeping track of the system
461  self.initEvent, eventLoopSyncer, self.finalEvent = events
462  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
463 
464  # necessary for knowledge of the system
465  self.nWorkers, self.sEvent, self.config, self.log = params
466  self.subworkers = subworkers
467  self.nodeID = nodeID
468  self.msgFormat = self.config["MessageSvc"].Format
469 
470  # describe the state of the node by the current Event Number
471  self.currentEvent = None
472 
473  # Unpack the Queues : (events, histos, filerecords)
474  self.queues = queues
475  self.num = 0
476 
477  ks = self.config.keys()
478  self.app = None
479  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
480  for k in list:
481  if k in ks:
482  self.app = k
483 
484  def Start(self):
485  # define the separate process
486  qPair, histq, fq = self.queues
487 
488  # Set up the Queue Mechanisms ( Event Communicators )
489  if self.nodeType == "Reader" or self.nodeType == "Worker":
490  # Reader or Worker Node
491  qin, qout = qPair
492  self.evcom = EventCommunicator(self, qin, qout)
493  else:
494  # Writer : many queues in, no queue out
495  assert self.nodeType == "Writer"
496  self.evcoms = []
497  qsin = qPair[0]
498  for q in qsin:
499  ec = EventCommunicator(self, q, None)
500  self.evcoms.append(ec)
501  # Histogram Queue
502  self.hq = histq
503  # FileRecords Queue
504  self.fq = fq
505 
506  # Universal Counters (available to all nodes)
507  # Use sensibly!!!
508  self.nIn = 0
509  self.nOut = 0
510 
511  # Status Flag (possibly remove later)
512  self.stat = SUCCESS
513 
514  # Set logger name
515  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
516 
517  # Heuristic variables
518  # time for init, run, final, firstEventTime, totalTime
519  self.iTime = 0.0
520  self.rTime = 0.0
521  self.fTime = 0.0
522  self.firstEvTime = 0.0
523  self.tTime = 0.0
524 
525  self.proc = Process(target=self.Engine)
526  # Fork and start the separate process
527  self.proc.start()
528 
529  def Engine(self):
530  # This will be the method carried out by the Node
531  # Different for all
532  pass
533 
535  # Different for all ; customize Configuration for multicore
536  pass
537 
538  def SetupGaudiPython(self):
539  # This method will initialize the GaudiPython Tools
540  # such as the AppMgr and so on
541  self.a = AppMgr()
542  if SMAPS:
543  from AlgSmapShot import SmapShot
544 
545  smapsLog = self.nodeType + "-" + str(self.nodeID) + ".smp"
546  ss = SmapShot(logname=smapsLog)
547  self.a.addAlgorithm(ss)
548  self.evt = self.a.evtsvc()
549  self.hvt = self.a.histsvc()
550  self.fsr = self.a.filerecordsvc()
551  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
552  self.pers = self.a.service("EventPersistencySvc", "IAddressCreator")
553  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
554  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
555  return SUCCESS
556 
557  def StartGaudiPython(self):
558  self.a.initialize()
559  self.a.start()
560  return SUCCESS
561 
562  def LoadTES(self, tbufferfile):
563  root = gbl.DataObject()
564  setOwnership(root, False)
565  self.evt.setRoot("/Event", root)
566  self.ts.loadBuffer(tbufferfile)
567 
568  def getEventNumber(self):
569  if self.app != "Gauss":
570  # Using getList or getHistoNames can result in the EventSelector
571  # re-initialising connection to RootDBase, which costs a lot of
572  # time... try to build a set of Header paths??
573 
574  # First Attempt : Unpacked Event Data
575  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
576  for l in lst:
577  path = l
578  try:
579  n = self.evt[path].evtNumber()
580 
581  return n
582  except Exception:
583  # No evt number at this path
584  continue
585 
586  # second attepmt : try DAQ/RawEvent data
587  # The Evt Number is in bank type 16, bank 0, data pt 4
588  try:
589  n = self.evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
590 
591  return n
592  except Exception:
593  pass
594 
595  # Default Action
596  if self.nIn > 0 or self.nOut > 0:
597  pass
598  else:
599  self.log.warning("Could not determine Event Number")
600  return -1
601  else:
602  if self.nodeID == -1:
603  self.num = self.num + 1
604 
605  return self.num
606 
607  def IdentifyWriters(self):
608  #
609  # Identify Writers in the Configuration
610  #
611  d = {}
612  keys = ["events", "records", "tuples", "histos"]
613  for k in keys:
614  d[k] = []
615 
616  # Identify Writers and Classify
617  wkeys = WRITERTYPES.keys()
618  for v in self.config.values():
619  if v.__class__.__name__ in wkeys:
620  writerType = WRITERTYPES[v.__class__.__name__]
621  d[writerType].append(MiniWriter(v, writerType, self.config))
622  if self.nodeID == 0:
623  self.log.info("Writer Found : %s" % (v.name()))
624 
625  # Now Check for the Histogram Service
626  if "HistogramPersistencySvc" in self.config.keys():
627  hfile = self.config["HistogramPersistencySvc"].getProp("OutputFile")
628  d["histos"].append(hfile)
629  return d
630 
631  def dumpHistograms(self):
632  """
633  Method used by the GaudiPython algorithm CollectHistos
634  to obtain a dictionary of form { path : object }
635  representing the Histogram Store
636  """
637  nlist = self.hvt.getHistoNames()
638  histDict = {}
639  objects = 0
640  histos = 0
641  if nlist:
642  for n in nlist:
643  o = self.hvt[n]
644  if type(o) in aidatypes:
645  o = aida2root(o)
646  histos += 1
647  else:
648  objects += 1
649  histDict[n] = o
650  else:
651  print("WARNING : no histograms to recover?")
652  return histDict
653 
654  def Initialize(self):
655  start = time.time()
656  self.processConfiguration()
657  self.SetupGaudiPython()
658  self.initEvent.set()
659  self.StartGaudiPython()
660 
661  if self.app == "Gauss":
662  tool = self.a.tool("ToolSvc.EvtCounter")
663  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
664  else:
665  self.cntr = cppyy.nullptr
666 
667  self.iTime = time.time() - start
668 
669  def Finalize(self):
670  start = time.time()
671  self.a.stop()
672  self.a.finalize()
673  self.log.info("%s-%i Finalized" % (self.nodeType, self.nodeID))
674  self.finalEvent.set()
675  self.fTime = time.time() - start
676 
677  def Report(self):
678  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
679  allTime = "Alive Time : %5.2f" % (self.tTime)
680  initTime = "Init Time : %5.2f" % (self.iTime)
681  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
682  runTime = "Run Time : %5.2f" % (self.rTime)
683  finTime = "Finalise Time : %5.2f" % (self.fTime)
684  tup = (allTime, initTime, frstTime, runTime, finTime)
685  for t in tup:
686  self.log.info(t)
687  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
688  # and report from the TESSerializer
689  self.TS.Report()
690 
691 
692 # =============================================================================
693 
694 
696  def __init__(self, queues, events, params, subworkers):
697  GMPComponent.__init__(self, "Reader", -1, queues, events, params, subworkers)
698 
700  # Reader :
701  # No algorithms
702  # No output
703  # No histos
704  self.config["ApplicationMgr"].TopAlg = []
705  self.config["ApplicationMgr"].OutStream = []
706  if "HistogramPersistencySvc" in self.config.keys():
707  self.config["HistogramPersistencySvc"].OutputFile = ""
708  self.config["MessageSvc"].Format = "%-13s " % "[Reader]" + self.msgFormat
709  self.evtMax = self.config["ApplicationMgr"].EvtMax
710 
711  def DumpEvent(self):
712  tb = TBufferFile(TBuffer.kWrite)
713  # print '----Reader dumping Buffer!!!'
714  self.ts.dumpBuffer(tb)
715  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
716  return tb
717 
718  def DoFirstEvent(self):
719  # Do First Event ------------------------------------------------------
720  # Check Termination Criteria
721  startFirst = time.time()
722  self.log.info("Reader : First Event")
723  if self.nOut == self.evtMax:
724  self.log.info("evtMax( %i ) reached" % (self.evtMax))
725  self.lastEvent.set()
726  return SUCCESS
727  else:
728  # Continue to read, dump and send event
729  self.a.run(1)
730  if not bool(self.evt["/Event"]):
731  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
732  self.lastEvent.set()
733  return SUCCESS
734  else:
735  # Popluate TESSerializer list and send Event
736  if self.app == "Gauss":
737  lst = self.evt.getHistoNames()
738  else:
739  try:
740  lst = self.evt.getList()
741  if self.app == "DaVinci":
742  daqnode = self.evt.retrieveObject("/Event/DAQ").registry()
743  setOwnership(daqnode, False)
744  self.evt.getList(daqnode, lst, daqnode.address().par())
745  except Exception:
746  self.log.critical("Reader could not acquire TES List!")
747  self.lastEvent.set()
748  return FAILURE
749  self.log.info("Reader : TES List : %i items" % (len(lst)))
750  for l in lst:
751  self.ts.addItem(l)
753  tb = self.TS.Dump()
754  self.log.info("First Event Sent")
755  self.evcom.send((self.currentEvent, tb))
756  self.nOut += 1
757  self.eventLoopSyncer.set()
758  self.evt.clearStore()
759  self.firstEvTime = time.time() - startFirst
760  return SUCCESS
761 
762  def Engine(self):
763  # rename process
764  import ctypes
765 
766  libc = ctypes.CDLL("libc.so.6")
767  name = str(self.nodeType) + str(self.nodeID) + "\0"
768  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
769 
770  startEngine = time.time()
771  self.log.name = "Reader"
772  self.log.info("Reader Process starting")
773 
774  self.Initialize()
775 
776  # add the Histogram Collection Algorithm
777  self.a.addAlgorithm(CollectHistograms(self))
778 
779  self.log.info("Reader Beginning Distribution")
780  sc = self.DoFirstEvent()
781  if sc.isSuccess():
782  self.log.info("Reader First Event OK")
783  else:
784  self.log.critical("Reader Failed on First Event")
785  self.stat = FAILURE
786 
787  # Do All Others -------------------------------------------------------
788  while True:
789  # Check Termination Criteria
790  if self.nOut == self.evtMax:
791  self.log.info("evtMax( %i ) reached" % (self.evtMax))
792  break
793  # Check Health
794  if not self.stat.isSuccess():
795  self.log.critical("Reader is Damaged!")
796  break
797  # Continue to read, dump and send event
798  t = time.time()
799  self.a.run(1)
800  self.rTime += time.time() - t
801  if not bool(self.evt["/Event"]):
802  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
803  break
804  self.currentEvent = self.getEventNumber()
805  tb = self.TS.Dump()
806  self.evcom.send((self.currentEvent, tb))
807  # clean up
808  self.nOut += 1
809  self.eventLoopSyncer.set()
810  self.evt.clearStore()
811  self.log.info("Setting <Last> Event")
812  self.lastEvent.set()
813 
814  # Finalize
815  self.log.info("Reader : Event Distribution complete.")
816  self.evcom.finalize()
817  self.Finalize()
818  self.tTime = time.time() - startEngine
819  self.Report()
820 
821 
822 # =============================================================================
823 
824 
826  def __init__(self, workerID, queues, events, params, subworkers):
827  GMPComponent.__init__(
828  self, "Worker", workerID, queues, events, params, subworkers
829  )
830  # Identify the writer streams
832  # Identify the accept/veto checks for each event
833  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
834  self.log.info("Subworker-%i Created OK" % (self.nodeID))
835  self.eventOutput = True
836 
837  def Engine(self):
838  # rename process
839  import ctypes
840 
841  libc = ctypes.CDLL("libc.so.6")
842  name = str(self.nodeType) + str(self.nodeID) + "\0"
843  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
844 
845  self.initEvent.set()
846  startEngine = time.time()
847  msg = self.a.service("MessageSvc")
848  msg.Format = "%-13s " % ("[" + self.log.name + "]") + self.msgFormat
849 
850  self.log.name = "Worker-%i" % (self.nodeID)
851  self.log.info("Subworker %i starting Engine" % (self.nodeID))
853 
854  # populate the TESSerializer itemlist
855  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
856 
857  self.a.addAlgorithm(CollectHistograms(self))
858 
859  # Begin processing
860  Go = True
861  while Go:
862  packet = self.evcom.receive()
863  if packet:
864  pass
865  else:
866  continue
867  if packet == "FINISHED":
868  break
869  evtNumber, tbin = packet # unpack
870  if self.cntr != cppyy.nullptr:
871  self.cntr.setEventCounter(evtNumber)
872 
873  self.nIn += 1
874  self.TS.Load(tbin)
875 
876  t = time.time()
877  sc = self.a.executeEvent()
878  if self.nIn == 1:
879  self.firstEvTime = time.time() - t
880  else:
881  self.rTime += time.time() - t
882  if sc.isSuccess():
883  pass
884  else:
885  self.log.name = "Worker-%i" % (self.nodeID)
886  self.log.warning("Did not Execute Event")
887  self.evt.clearStore()
888  continue
889  if self.isEventPassed():
890  pass
891  else:
892  self.log.name = "Worker-%i" % (self.nodeID)
893  self.log.warning("Event did not pass : %i" % (evtNumber))
894  self.evt.clearStore()
895  continue
896  if self.eventOutput:
897  # It may be the case of generating Event Tags; hence
898  # no event output
900  tb = self.TS.Dump()
901  self.evcom.send((self.currentEvent, tb))
902  self.nOut += 1
903  self.inc.fireIncident(gbl.Incident("Subworker", "EndEvent"))
904  self.eventLoopSyncer.set()
905  self.evt.clearStore()
906  self.log.name = "Worker-%i" % (self.nodeID)
907  self.log.info("Setting <Last> Event %s" % (self.nodeID))
908  self.lastEvent.set()
909 
910  self.evcom.finalize()
911  # Now send the FileRecords and stop/finalize the appMgr
912  self.filerecordsAgent.SendFileRecords()
913  self.tTime = time.time() - startEngine
914  self.Finalize()
915  self.Report()
916  # self.finalEvent.set()
917 
918  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
919  self.a = a
920  self.evt = evt
921  self.hvt = hvt
922  self.fsr = fsr
923  # self.inc = inc
924  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
925  self.pers = pers
926  self.ts = ts
927  self.cntr = cntr
928  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
929 
930  def getCheckAlgs(self):
931  """
932  For some output writers, a check is performed to see if the event has
933  executed certain algorithms.
934  These reside in the AcceptAlgs property for those writers
935  """
936  acc = []
937  req = []
938  vet = []
939  for m in self.writerDict["events"]:
940  if hasattr(m.w, "AcceptAlgs"):
941  acc += m.w.AcceptAlgs
942  if hasattr(m.w, "RequireAlgs"):
943  req += m.w.RequireAlgs
944  if hasattr(m.w, "VetoAlgs"):
945  vet += m.w.VetoAlgs
946  return (acc, req, vet)
947 
948  def checkExecutedPassed(self, algName):
949  if (
950  self.a.algorithm(algName)._ialg.isExecuted()
951  and self.a.algorithm(algName)._ialg.filterPassed()
952  ):
953  return True
954  else:
955  return False
956 
957  def isEventPassed(self):
958  """
959  Check the algorithm status for an event.
960  Depending on output writer settings, the event
961  may be declined based on various criteria.
962  This is a transcript of the check that occurs in GaudiSvc::OutputStream
963  """
964  passed = False
965 
966  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
967  if self.acceptAlgs:
968  for name in self.acceptAlgs:
969  if self.checkExecutedPassed(name):
970  passed = True
971  break
972  else:
973  passed = True
974 
975  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
976  for name in self.requireAlgs:
977  if self.checkExecutedPassed(name):
978  pass
979  else:
980  self.log.info("Evt declined (requireAlgs) : %s" % (name))
981  passed = False
982 
983  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
984  for name in self.vetoAlgs:
985  if self.checkExecutedPassed(name):
986  pass
987  else:
988  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
989  passed = False
990  return passed
991 
992 
993 # =============================================================================
994 
995 
997  def __init__(self, workerID, queues, events, params, subworkers):
998  GMPComponent.__init__(
999  self, "Worker", workerID, queues, events, params, subworkers
1000  )
1001  # Identify the writer streams
1003  # Identify the accept/veto checks for each event
1004  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
1005  self.log.name = "Worker-%i" % (self.nodeID)
1006  self.log.info("Worker-%i Created OK" % (self.nodeID))
1007  self.eventOutput = True
1008 
1010  # Worker :
1011  # No input
1012  # No output
1013  # No Histos
1014  self.config["EventSelector"].Input = []
1015  self.config["ApplicationMgr"].OutStream = []
1016  if "HistogramPersistencySvc" in self.config.keys():
1017  self.config["HistogramPersistencySvc"].OutputFile = ""
1018  formatHead = "[Worker-%i]" % (self.nodeID)
1019  self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1020 
1021  for key, lst in self.writerDict.items():
1022  self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1023 
1024  for m in self.writerDict["tuples"]:
1025  # rename Tuple output file with an appendix
1026  # based on worker id, for merging later
1027  newName = m.getNewName(".", ".w%i." % (self.nodeID))
1028  self.config[m.key].Output = newName
1029 
1030  # Suppress INFO Output for all but Worker-0
1031  # if self.nodeID == 0 :
1032  # pass
1033  # else :
1034  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1035 
1036  if self.app == "Gauss":
1037  try:
1038  if "ToolSvc.EvtCounter" not in self.config:
1039  from Configurables import EvtCounter
1040 
1041  counter = EvtCounter()
1042  else:
1043  counter = self.config["ToolSvc.EvtCounter"]
1044  counter.UseIncident = False
1045  except Exception:
1046  # ignore errors when trying to change the configuration of the EvtCounter
1047  self.log.warning("Cannot configure EvtCounter")
1048 
1049  def Engine(self):
1050  # rename process
1051  import ctypes
1052 
1053  libc = ctypes.CDLL("libc.so.6")
1054  name = str(self.nodeType) + str(self.nodeID) + "\0"
1055  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1056 
1057  startEngine = time.time()
1058  self.log.info("Worker %i starting Engine" % (self.nodeID))
1059  self.Initialize()
1061 
1062  # populate the TESSerializer itemlist
1063  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1064 
1065  nEventWriters = len(self.writerDict["events"])
1066  if nEventWriters:
1067  itemList = set()
1068  optItemList = set()
1069  for m in self.writerDict["events"]:
1070  for item in m.ItemList:
1071  hsh = item.find("#")
1072  if hsh != -1:
1073  item = item[:hsh]
1074  itemList.add(item)
1075  for item in m.OptItemList:
1076  hsh = item.find("#")
1077  if hsh != -1:
1078  item = item[:hsh]
1079  optItemList.add(item)
1080  # If an item is mandatory and optional, keep it only in the optional list
1081  itemList -= optItemList
1082  for item in sorted(itemList):
1083  self.log.info(" adding ItemList Item to ts : %s" % (item))
1084  self.ts.addItem(item)
1085  for item in sorted(optItemList):
1086  self.log.info(" adding Optional Item to ts : %s" % (item))
1087  self.ts.addOptItem(item)
1088  else:
1089  self.log.info("There is no Event Output for this app")
1090  self.eventOutput = False
1091 
1092  # Begin processing
1093  Go = True
1094  while Go:
1095  packet = self.evcom.receive()
1096  if packet:
1097  pass
1098  else:
1099  continue
1100  if packet == "FINISHED":
1101  break
1102  evtNumber, tbin = packet # unpack
1103  if self.cntr != cppyy.nullptr:
1104  self.cntr.setEventCounter(evtNumber)
1105 
1106  # subworkers are forked before the first event is processed
1107  if self.nIn == 0:
1108  self.log.info("Fork new subworkers")
1109 
1110  # Fork subworkers and share services
1111  for k in self.subworkers:
1112  k.SetServices(
1113  self.a,
1114  self.evt,
1115  self.hvt,
1116  self.fsr,
1117  self.inc,
1118  self.pers,
1119  self.ts,
1120  self.cntr,
1121  )
1122  k.Start()
1123  self.a.addAlgorithm(CollectHistograms(self))
1124  self.nIn += 1
1125  self.TS.Load(tbin)
1126 
1127  t = time.time()
1128  sc = self.a.executeEvent()
1129  if self.nIn == 1:
1130  self.firstEvTime = time.time() - t
1131  else:
1132  self.rTime += time.time() - t
1133  if sc.isSuccess():
1134  pass
1135  else:
1136  self.log.warning("Did not Execute Event")
1137  self.evt.clearStore()
1138  continue
1139  if self.isEventPassed():
1140  pass
1141  else:
1142  self.log.warning("Event did not pass : %i" % (evtNumber))
1143  self.evt.clearStore()
1144  continue
1145  if self.eventOutput:
1146  # It may be the case of generating Event Tags; hence
1147  # no event output
1149  tb = self.TS.Dump()
1150  self.evcom.send((self.currentEvent, tb))
1151  self.nOut += 1
1152  self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1153  self.eventLoopSyncer.set()
1154  self.evt.clearStore()
1155  self.log.info("Setting <Last> Event")
1156  self.lastEvent.set()
1157 
1158  self.evcom.finalize()
1159  self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1160  # Now send the FileRecords and stop/finalize the appMgr
1161  self.filerecordsAgent.SendFileRecords()
1162  self.Finalize()
1163  self.tTime = time.time() - startEngine
1164  self.Report()
1165 
1166  for k in self.subworkers:
1167  self.log.info("Join subworkers")
1168  k.proc.join()
1169 
1170  def getCheckAlgs(self):
1171  """
1172  For some output writers, a check is performed to see if the event has
1173  executed certain algorithms.
1174  These reside in the AcceptAlgs property for those writers
1175  """
1176  acc = []
1177  req = []
1178  vet = []
1179  for m in self.writerDict["events"]:
1180  if hasattr(m.w, "AcceptAlgs"):
1181  acc += m.w.AcceptAlgs
1182  if hasattr(m.w, "RequireAlgs"):
1183  req += m.w.RequireAlgs
1184  if hasattr(m.w, "VetoAlgs"):
1185  vet += m.w.VetoAlgs
1186  return (acc, req, vet)
1187 
1188  def checkExecutedPassed(self, algName):
1189  if (
1190  self.a.algorithm(algName)._ialg.isExecuted()
1191  and self.a.algorithm(algName)._ialg.filterPassed()
1192  ):
1193  return True
1194  else:
1195  return False
1196 
1197  def isEventPassed(self):
1198  """
1199  Check the algorithm status for an event.
1200  Depending on output writer settings, the event
1201  may be declined based on various criteria.
1202  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1203  """
1204  passed = False
1205 
1206  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1207  if self.acceptAlgs:
1208  for name in self.acceptAlgs:
1209  if self.checkExecutedPassed(name):
1210  passed = True
1211  break
1212  else:
1213  passed = True
1214 
1215  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1216  for name in self.requireAlgs:
1217  if self.checkExecutedPassed(name):
1218  pass
1219  else:
1220  self.log.info("Evt declined (requireAlgs) : %s" % (name))
1221  passed = False
1222 
1223  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1224  for name in self.vetoAlgs:
1225  if self.checkExecutedPassed(name):
1226  pass
1227  else:
1228  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1229  passed = False
1230  return passed
1231 
1232 
1233 # =============================================================================
1234 
1235 
1237  def __init__(self, queues, events, params, subworkers):
1238  GMPComponent.__init__(self, "Writer", -2, queues, events, params, subworkers)
1239  # Identify the writer streams
1241  # This keeps track of workers as they finish
1242  self.status = [False] * self.nWorkers
1243  self.log.name = "Writer--2"
1244 
1246  # Writer :
1247  # No input
1248  # No Algs
1249  self.config["ApplicationMgr"].TopAlg = []
1250  self.config["EventSelector"].Input = []
1251 
1252  self.config["MessageSvc"].Format = "%-13s " % "[Writer]" + self.msgFormat
1253 
1254  def Engine(self):
1255  # rename process
1256  import ctypes
1257 
1258  libc = ctypes.CDLL("libc.so.6")
1259  name = str(self.nodeType) + str(self.nodeID) + "\0"
1260  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1261 
1262  self.Initialize()
1263  self.histoAgent = HistoAgent(self)
1265 
1266  # Begin processing
1267  Go = True
1268  current = -1
1269  while Go:
1270  current = (current + 1) % self.nWorkers
1271  packet = self.evcoms[current].receive(timeout=0.01)
1272  if packet is None:
1273  continue
1274  if packet == "FINISHED":
1275  self.log.info("Writer got FINISHED flag : Worker %i" % (current))
1276 
1277  self.status[current] = True
1278  if all(self.status):
1279  self.log.info("FINISHED recd from all workers, break loop")
1280  break
1281  continue
1282  # otherwise, continue as normal
1283  self.nIn += 1 # update central count (maybe needed by FSR store)
1284  evtNumber, tbin = packet # unpack
1285  self.TS.Load(tbin)
1286  t = time.time()
1287  self.a.executeEvent()
1288  self.rTime += time.time() - t
1290  self.evt.clearStore()
1291  self.eventLoopSyncer.set()
1292  self.log.name = "Writer--2"
1293  self.log.info("Setting <Last> Event")
1294  self.lastEvent.set()
1295 
1296  # finalisation steps
1297  [e.finalize() for e in self.evcoms]
1298  # Now do Histograms
1299  sc = self.histoAgent.Receive()
1300  sc = self.histoAgent.RebuildHistoStore()
1301  if sc.isSuccess():
1302  self.log.info("Histo Store rebuilt ok")
1303  else:
1304  self.log.warning("Histo Store Error in Rebuild")
1305 
1306  # Now do FileRecords
1307  sc = self.filerecordsAgent.Receive()
1308  self.filerecordsAgent.Rebuild()
1309  self.Finalize()
1310  # self.rTime = time.time()-startEngine
1311  self.Report()
1312 
1313 
1314 # =============================================================================
1315 
1316 # =============================================================================
1317 
1318 
1319 class Coord(object):
1320  def __init__(self, nWorkers, config, log):
1321  self.log = log
1322  self.config = config
1323  # set up Logging
1324  self.log.name = "GaudiPython-Parallel-Logger"
1325  self.log.info("GaudiPython Parallel Process Co-ordinator beginning")
1326 
1327  if nWorkers == -1:
1328  # The user has requested all available cpus in the machine
1329  self.nWorkers = cpu_count()
1330  else:
1331  self.nWorkers = nWorkers
1332 
1333  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1334  self.hq = JoinableQueue() # for Histogram data
1335  self.fq = JoinableQueue() # for FileRecords data
1336 
1337  # Make a Syncer for Initalise, Run, and Finalise
1338  self.sInit = Syncer(
1339  self.nWorkers, self.log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1340  )
1341  self.sRun = Syncer(
1342  self.nWorkers,
1343  self.log,
1344  manyEvents=True,
1345  limit=WAIT_SINGLE_EVENT,
1346  step=STEP_EVENT,
1347  firstEvent=WAIT_FIRST_EVENT,
1348  )
1349  self.sFin = Syncer(
1350  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE
1351  )
1352  # and one final one for Histogram Transfer
1353  self.histSyncEvent = Event()
1354 
1355  # params are common to al subprocesses
1356  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1357 
1358  self.subworkers = []
1359  # Declare SubProcesses!
1360  for i in range(1, self.nWorkers):
1361  sub = Subworker(
1362  i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers
1363  )
1364  self.subworkers.append(sub)
1365  self.reader = Reader(
1366  self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers
1367  )
1368  self.workers = []
1369  wk = Worker(
1370  0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers
1371  )
1372  self.writer = Writer(
1373  self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers
1374  )
1375 
1376  self.system = []
1377  self.system.append(self.writer)
1378  self.system.append(wk)
1379  self.system.append(self.reader)
1380 
1381  def getSyncEvents(self, nodeID):
1382  init = self.sInit.d[nodeID].event
1383  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1384  fin = self.sFin.d[nodeID].event
1385  return (init, run, fin)
1386 
1387  def getQueues(self, nodeID):
1388  eventQ = self.qs[nodeID]
1389  histQ = self.hq
1390  fsrQ = self.fq
1391  return (eventQ, histQ, fsrQ)
1392 
1393  def Go(self):
1394  # Initialise
1395  self.log.name = "GaudiPython-Parallel-Logger"
1396  self.log.info("INITIALISING SYSTEM")
1397 
1398  # Start reader, writer and main worker
1399  for p in self.system:
1400  p.Start()
1401 
1402  sc = self.sInit.syncAll(step="Initialise")
1403  if sc == SUCCESS:
1404  pass
1405  else:
1406  self.Terminate()
1407  return FAILURE
1408 
1409  # Run
1410  self.log.name = "GaudiPython-Parallel-Logger"
1411  self.log.info("RUNNING SYSTEM")
1412  sc = self.sRun.syncAll(step="Run")
1413  if sc == SUCCESS:
1414  pass
1415  else:
1416  self.Terminate()
1417  return FAILURE
1418 
1419  # Finalise
1420  self.log.name = "GaudiPython-Parallel-Logger"
1421  self.log.info("FINALISING SYSTEM")
1422  sc = self.sFin.syncAll(step="Finalise")
1423  if sc == SUCCESS:
1424  pass
1425  else:
1426  self.Terminate()
1427  return FAILURE
1428 
1429  # if we've got this far, finally report SUCCESS
1430  self.log.info("Cleanly join all Processes")
1431  self.Stop()
1432  self.log.info("Report Total Success to Main.py")
1433  return SUCCESS
1434 
1435  def Terminate(self):
1436  # Brutally kill sub-processes
1437  children = multiprocessing.active_children()
1438  for i in children:
1439  i.terminate()
1440 
1441  # self.writer.proc.terminate()
1442  # [ w.proc.terminate() for w in self.workers]
1443  # self.reader.proc.terminate()
1444 
1445  def Stop(self):
1446  # procs should be joined in reverse order to launch
1447  self.system.reverse()
1448  for s in self.system:
1449  s.proc.join()
1450  return SUCCESS
1451 
1452  def SetupQueues(self):
1453  # This method will set up the network of Queues needed
1454  # N Queues = nWorkers + 1
1455  # Each Worker has a Queue in, and a Queue out
1456  # Reader has Queue out only
1457  # Writer has nWorkers Queues in
1458 
1459  # one queue from Reader-Workers
1460  rwk = JoinableQueue()
1461  # one queue from each worker to writer
1462  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1463  d = {}
1464  d[-1] = (None, rwk) # Reader
1465  d[-2] = (workersWriter, None) # Writer
1466  for i in range(self.nWorkers):
1467  d[i] = (rwk, workersWriter[i])
1468  return d
1469 
1470 
1471 # ============================= EOF ===========================================
GaudiMP.GMPBase.CollectHistograms
Definition: GMPBase.py:214
GaudiMP.GMPBase.Coord.workers
workers
Definition: GMPBase.py:1368
GaudiMP.GMPBase.GMPComponent.num
num
Definition: GMPBase.py:475
GaudiMP.GMPBase.TESSerializer.nOut
nOut
Definition: GMPBase.py:366
GaudiMP.GMPBase.TESSerializer.__init__
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:360
GaudiMP.GMPBase.Coord.fq
fq
Definition: GMPBase.py:1335
GaudiMP.GMPBase.EventCommunicator.receive
def receive(self, timeout=None)
Definition: GMPBase.py:304
GaudiMP.GMPBase.GMPComponent.nodeID
nodeID
Definition: GMPBase.py:467
GaudiMP.GMPBase.GMPComponent.__init__
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:447
GaudiMP.GMPBase.MiniWriter.OptItemList
OptItemList
Definition: GMPBase.py:128
GaudiMP.GMPBase.EventCommunicator.log
log
Definition: GMPBase.py:270
GaudiMP.GMPBase.GMPComponent.tTime
tTime
Definition: GMPBase.py:523
GaudiMP.GMPBase.Coord.SetupQueues
def SetupQueues(self)
Definition: GMPBase.py:1452
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:238
GaudiMP.GMPBase.GMPComponent.getEventNumber
def getEventNumber(self)
Definition: GMPBase.py:568
GaudiMP.GMPBase.Writer.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1245
GaudiMP.GMPBase.GMPComponent.IdentifyWriters
def IdentifyWriters(self)
Definition: GMPBase.py:607
GaudiMP.GMPBase.TESSerializer.tLoad
tLoad
Definition: GMPBase.py:368
GaudiPython::PyAlgorithm::execute
StatusCode execute() override
Definition: Algorithm.cpp:110
GaudiMP.GMPBase.EventCommunicator.send
def send(self, item)
Definition: GMPBase.py:289
GaudiMP.GMPBase.Worker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1060
cpluginsvc.registry
def registry()
Definition: cpluginsvc.py:83
GaudiMP.GMPBase.Coord.sRun
sRun
Definition: GMPBase.py:1341
GaudiMP.GMPBase.TESSerializer.buffersIn
buffersIn
Definition: GMPBase.py:363
GaudiMP.GMPBase.GMPComponent.TS
TS
Definition: GMPBase.py:554
GaudiMP.GMPBase.GMPComponent.stat
stat
Definition: GMPBase.py:512
GaudiMP.GMPBase.Subworker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:852
GaudiMP.GMPBase.GMPComponent.dumpHistograms
def dumpHistograms(self)
Definition: GMPBase.py:631
GaudiMP.GMPBase.EventCommunicator.sizeSent
sizeSent
Definition: GMPBase.py:284
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
GaudiMP.GMPBase.TESSerializer
Definition: GMPBase.py:359
GaudiMP.GMPBase.GMPComponent.nIn
nIn
Definition: GMPBase.py:508
GaudiMP.GMPBase.MiniWriter.wType
wType
Definition: GMPBase.py:121
GaudiMP.GMPBase.GMPComponent
Definition: GMPBase.py:440
GaudiMP.GMPBase.Worker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:1004
DataOnDemand.Dump
Dump
Definition: DataOnDemand.py:35
GaudiMP.GMPBase.Reader.evtMax
evtMax
Definition: GMPBase.py:709
GaudiMP.GMPBase.MiniWriter.ItemList
ItemList
Definition: GMPBase.py:127
GaudiMP.GMPBase.EventCommunicator.maxsize
maxsize
Definition: GMPBase.py:272
GaudiMP.GMPBase.EventCommunicator.statistics
def statistics(self)
Definition: GMPBase.py:346
GaudiPartProp.decorators.get
get
decorate the vector of properties
Definition: decorators.py:283
GaudiMP.GMPBase.MiniWriter.getNewName
def getNewName(self, replaceThis, withThis, extra="")
Definition: GMPBase.py:150
GaudiMP.GMPBase.Worker.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1009
GaudiMP.GMPBase.Coord.qs
qs
Definition: GMPBase.py:1333
GaudiMP.GMPBase.GMPComponent.Initialize
def Initialize(self)
Definition: GMPBase.py:654
GaudiMP.GMPBase.Subworker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:930
GaudiMP.GMPBase.Coord.nWorkers
nWorkers
Definition: GMPBase.py:1329
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:887
GaudiMP.GMPBase.EventCommunicator.__init__
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:268
GaudiMP.GMPBase.GMPComponent.fTime
fTime
Definition: GMPBase.py:521
GaudiMP.GMPBase.Writer
Definition: GMPBase.py:1236
GaudiMP.GMPBase.Coord
Definition: GMPBase.py:1319
GaudiPython::PyAlgorithm
Definition: Algorithm.h:40
GaudiMP.GMPBase.Subworker
Definition: GMPBase.py:825
GaudiMP.GMPBase.GMPComponent.hq
hq
Definition: GMPBase.py:502
GaudiMP.GMPBase.Coord.hq
hq
Definition: GMPBase.py:1334
GaudiPython.Bindings.setOwnership
setOwnership
Definition: Bindings.py:130
GaudiMP.GMPBase.Reader.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:699
GaudiMP.GMPBase.Coord.Terminate
def Terminate(self)
Definition: GMPBase.py:1435
GaudiMP.GMPBase.Worker.Engine
def Engine(self)
Definition: GMPBase.py:1049
GaudiMP.GMPBase.Worker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:997
GaudiMP.GMPBase.Writer.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1237
IOTest.start
start
Definition: IOTest.py:110
GaudiMP.GMPBase.Subworker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:826
GaudiMP.GMPBase.Subworker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:833
GaudiMP.GMPBase.GMPComponent.pers
pers
Definition: GMPBase.py:552
GaudiMP.GMPBase.GMPComponent.hvt
hvt
Definition: GMPBase.py:549
GaudiMP.GMPBase.TESSerializer.T
T
Definition: GMPBase.py:361
GaudiMP.GMPBase.TESSerializer.log
log
Definition: GMPBase.py:372
GaudiMP.GMPBase.Worker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:1197
GaudiMP.GMPBase.GMPComponent.iTime
iTime
Definition: GMPBase.py:519
GaudiMP.GMPBase.TESSerializer.nodeID
nodeID
Definition: GMPBase.py:371
GaudiPartProp.decorators.all
all
decorate service
Definition: decorators.py:53
GaudiMP.GMPBase.GMPComponent.currentEvent
currentEvent
Definition: GMPBase.py:471
GaudiMP.GMPBase.GMPComponent.app
app
Definition: GMPBase.py:478
compareOutputFiles.par
par
Definition: compareOutputFiles.py:477
GaudiMP.GMPBase.Writer.histoAgent
histoAgent
Definition: GMPBase.py:1263
GaudiMP.GMPBase.GMPComponent.firstEvTime
firstEvTime
Definition: GMPBase.py:522
GaudiMP.GMPBase.EventCommunicator.qin
qin
Definition: GMPBase.py:274
GaudiMP.GMPBase.Reader.DumpEvent
def DumpEvent(self)
Definition: GMPBase.py:711
GaudiMP.GMPBase.TESSerializer.buffersOut
buffersOut
Definition: GMPBase.py:364
GaudiMP.GMPBase.MiniWriter.woutput
woutput
Definition: GMPBase.py:132
GaudiMP.GMPBase.GMPComponent.proc
proc
Definition: GMPBase.py:525
GaudiMP.GMPBase.Coord.config
config
Definition: GMPBase.py:1322
bug_34121.tool
tool
Definition: bug_34121.py:18
GaudiMP.GMPBase.GMPComponent.inc
inc
Definition: GMPBase.py:551
GaudiMP.GMPBase.MiniWriter.getItemLists
def getItemLists(self, config)
Definition: GMPBase.py:171
GaudiMP.GMPBase.GMPComponent.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:534
GaudiMP.pTools.Syncer
Definition: pTools.py:628
GaudiMP.GMPBase.Reader.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:696
GaudiMP.GMPBase.MiniWriter.w
w
Definition: GMPBase.py:120
GaudiMP.GMPBase.Writer.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1264
GaudiMP.GMPBase.Coord.sFin
sFin
Definition: GMPBase.py:1349
GaudiMP.GMPBase.Coord.reader
reader
Definition: GMPBase.py:1365
GaudiMP.GMPBase.Subworker.eventOutput
eventOutput
Definition: GMPBase.py:835
GaudiMP.GMPBase.GMPComponent.lastEvent
lastEvent
Definition: GMPBase.py:462
GaudiMP.GMPBase.GMPComponent.nOut
nOut
Definition: GMPBase.py:509
GaudiMP.GMPBase.Worker.eventOutput
eventOutput
Definition: GMPBase.py:1007
GaudiMP.GMPBase.GMPComponent.finalEvent
finalEvent
Definition: GMPBase.py:461
GaudiMP.GMPBase.MiniWriter.key
key
Definition: GMPBase.py:125
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:574
GaudiMP.GMPBase.EventCommunicator.allrecv
allrecv
Definition: GMPBase.py:279
GaudiMP.GMPBase.EventCommunicator.qoutTime
qoutTime
Definition: GMPBase.py:287
GaudiMP.GMPBase.EventCommunicator.finalize
def finalize(self)
Definition: GMPBase.py:325
GaudiMP.GMPBase.CollectHistograms.__init__
def __init__(self, gmpcomponent)
Definition: GMPBase.py:224
GaudiMP.GMPBase.TESSerializer.Dump
def Dump(self)
Definition: GMPBase.py:384
GaudiMP.GMPBase.TESSerializer.tDump
tDump
Definition: GMPBase.py:367
GaudiMP.GMPBase.Worker.writerDict
writerDict
Definition: GMPBase.py:1002
GaudiMP.GMPBase.TESSerializer.nIn
nIn
Definition: GMPBase.py:365
GaudiMP.GMPBase.EventCommunicator.qout
qout
Definition: GMPBase.py:275
GaudiMP.GMPBase.GMPComponent.Engine
def Engine(self)
Definition: GMPBase.py:529
GaudiMP.GMPBase.Coord.Stop
def Stop(self)
Definition: GMPBase.py:1445
GaudiMP.GMPBase.MiniWriter.set
def set(self, key, output)
Definition: GMPBase.py:188
GaudiMP.GMPBase.EventCommunicator.qinTime
qinTime
Definition: GMPBase.py:286
GaudiMP.GMPBase.Reader.DoFirstEvent
def DoFirstEvent(self)
Definition: GMPBase.py:718
GaudiMP.GMPBase.Coord.__init__
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1320
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.GMPBase.Writer.status
status
Definition: GMPBase.py:1242
GaudiMP.GMPBase.GMPComponent.evcoms
evcoms
Definition: GMPBase.py:496
GaudiMP.GMPBase.TESSerializer.Report
def Report(self)
Definition: GMPBase.py:393
GaudiMP.GMPBase.Worker
Definition: GMPBase.py:996
GaudiMP.GMPBase.GMPComponent.fq
fq
Definition: GMPBase.py:504
GaudiMP.GMPBase.TESSerializer.nodeType
nodeType
Definition: GMPBase.py:370
GaudiMP.GMPBase.MiniWriter.datasvcName
datasvcName
Definition: GMPBase.py:133
GaudiMP.GMPBase.GMPComponent.SetupGaudiPython
def SetupGaudiPython(self)
Definition: GMPBase.py:538
GaudiMP.GMPBase.EventCommunicator._gmpc
_gmpc
Definition: GMPBase.py:269
GaudiMP.GMPBase.GMPComponent.queues
queues
Definition: GMPBase.py:474
GaudiMP.GMPBase.Coord.histSyncEvent
histSyncEvent
Definition: GMPBase.py:1353
GaudiMP.GMPBase.Subworker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:957
GaudiMP.GMPBase.EventCommunicator.allsent
allsent
Definition: GMPBase.py:278
GaudiMP.GMPBase.MiniWriter.__init__
def __init__(self, writer, wType, config)
Definition: GMPBase.py:119
GaudiMP.GMPBase.Writer.Engine
def Engine(self)
Definition: GMPBase.py:1254
GaudiMP.GMPBase.GMPComponent.fsr
fsr
Definition: GMPBase.py:550
GaudiMP.GMPBase.EventCommunicator.nSent
nSent
Definition: GMPBase.py:282
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.GMPBase.CollectHistograms.log
log
Definition: GMPBase.py:227
GaudiMP.GMPBase.aida2root
aida2root
Definition: GMPBase.py:79
GaudiMP.GMPBase.GMPComponent.ts
ts
Definition: GMPBase.py:553
GaudiMP.GMPBase.Coord.subworkers
subworkers
Definition: GMPBase.py:1358
GaudiMP.GMPBase.EventCommunicator
Definition: GMPBase.py:264
GaudiMP.GMPBase.Subworker.SetServices
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:918
GaudiMP.GMPBase.Coord.getQueues
def getQueues(self, nodeID)
Definition: GMPBase.py:1387
GaudiMP.GMPBase.Coord.sInit
sInit
Definition: GMPBase.py:1338
GaudiMP.GMPBase.GMPComponent.rTime
rTime
Definition: GMPBase.py:520
GaudiMP.GMPBase.MiniWriter.wName
wName
Definition: GMPBase.py:130
GaudiMP.GMPBase.Coord.getSyncEvents
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1381
GaudiMP.GMPBase.Reader.Engine
def Engine(self)
Definition: GMPBase.py:762
GaudiMP.GMPBase.GMPComponent.subworkers
subworkers
Definition: GMPBase.py:466
GaudiMP.GMPBase.Coord.Go
def Go(self)
Definition: GMPBase.py:1393
StringKeyEx.keys
keys
Definition: StringKeyEx.py:64
GaudiMP.GMPBase.Reader
Definition: GMPBase.py:695
GaudiPython.Bindings.InterfaceCast
Definition: Bindings.py:140
GaudiMP.GMPBase.MiniWriter.__repr__
def __repr__(self)
Definition: GMPBase.py:193
GaudiMP.GMPBase.Coord.log
log
Definition: GMPBase.py:1321
GaudiMP.GMPBase.GMPComponent.nodeType
nodeType
Definition: GMPBase.py:457
GaudiMP.GMPBase.GMPComponent.evcom
evcom
Definition: GMPBase.py:492
GaudiMP.GMPBase.Writer.writerDict
writerDict
Definition: GMPBase.py:1240
GaudiMP.GMPBase.Coord.writer
writer
Definition: GMPBase.py:1372
GaudiMP.GMPBase.TESSerializer.Load
def Load(self, tbuf)
Definition: GMPBase.py:374
GaudiMP.GMPBase.GMPComponent.Report
def Report(self)
Definition: GMPBase.py:677
GaudiMP.GMPBase.Worker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1188
GaudiPython::PyAlgorithm::finalize
StatusCode finalize() override
Definition: Algorithm.cpp:114
GaudiMP.GMPBase.EventCommunicator.sizeRecv
sizeRecv
Definition: GMPBase.py:285
GaudiMP.GMPBase.MiniWriter
Definition: GMPBase.py:110
GaudiMP.GMPBase.MiniWriter.svcOutput
svcOutput
Definition: GMPBase.py:134
GaudiMP.GMPBase.GMPComponent.Finalize
def Finalize(self)
Definition: GMPBase.py:669
GaudiMP.GMPBase.GMPComponent.a
a
Definition: GMPBase.py:541
GaudiMP.pTools
Definition: pTools.py:1
GaudiMP.GMPBase.Coord.system
system
Definition: GMPBase.py:1376
GaudiMP.GMPBase.Subworker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:948
GaudiMP.GMPBase.Worker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:1170
GaudiMP.GMPBase.Subworker.writerDict
writerDict
Definition: GMPBase.py:831
GaudiMP.GMPBase.GMPComponent.msgFormat
msgFormat
Definition: GMPBase.py:468
GaudiMP.GMPBase.GMPComponent.evt
evt
Definition: GMPBase.py:548
GaudiMP.GMPBase.EventCommunicator.nRecv
nRecv
Definition: GMPBase.py:283
GaudiMP.GMPBase.CollectHistograms._gmpc
_gmpc
Definition: GMPBase.py:226
GaudiMP.GMPBase.MiniWriter.output
output
Definition: GMPBase.py:126
GaudiMP.GMPBase.GMPComponent.log
log
Definition: GMPBase.py:465
GaudiMP.GMPBase.Subworker.Engine
def Engine(self)
Definition: GMPBase.py:837
GaudiMP.GMPBase.GMPComponent.cntr
cntr
Definition: GMPBase.py:663
GaudiMP.GMPBase.GMPComponent.StartGaudiPython
def StartGaudiPython(self)
Definition: GMPBase.py:557
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: details.h:168
GaudiMP.GMPBase.TESSerializer.evt
evt
Definition: GMPBase.py:362
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:97
GaudiMP.GMPBase.GMPComponent.Start
def Start(self)
Definition: GMPBase.py:484
GaudiMP.GMPBase.GMPComponent.LoadTES
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:562