The Gaudi Framework  v37r1 (a7f61348)
GMPBase.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 
13 import multiprocessing
14 import time
15 import warnings
16 from multiprocessing import Event, JoinableQueue, Process, cpu_count, current_process
17 from multiprocessing.queues import Empty
18 
19 from GaudiMP.pTools import FileRecordsAgent, HistoAgent, Syncer
20 from ROOT import TBuffer, TBufferFile
21 
22 from GaudiPython import (
23  FAILURE,
24  SUCCESS,
25  AppMgr,
26  InterfaceCast,
27  PyAlgorithm,
28  gbl,
29  setOwnership,
30 )
31 
32 # Workaround for ROOT-10769
33 with warnings.catch_warnings():
34  warnings.simplefilter("ignore")
35  import cppyy
36 
37 # This script contains the bases for the Gaudi MultiProcessing (GMP)
38 # classes
39 
40 # There are three classes :
41 # Reader
42 # Worker
43 # Writer
44 
45 # Each class needs to perform communication with the others
46 # For this, we need a means of communication, which will be based on
47 # the python multiprocessing package
48 # This is provided in SPI pytools package
49 # cmt line : use pytools v1.1 LCG_Interfaces
50 # The PYTHONPATH env variable may need to be modified, as this might
51 # still point to 1.0_python2.5
52 
53 # Each class will need Queues, and a defined method for using these
54 # queues.
55 # For example, as long as there is something in the Queue, both ends
56 # of the queue must be open
57 # Also, there needs to be proper Termination flags and criteria
58 # The System should be error proof.
59 
60 # Constants -------------------------------------------------------------------
61 NAP = 0.001
62 MB = 1024.0 * 1024.0
63 # waits to guard against hanging, in seconds
64 WAIT_INITIALISE = 60 * 10
65 WAIT_FIRST_EVENT = 60 * 3
66 WAIT_SINGLE_EVENT = 60 * 6
67 WAIT_FINALISE = 60 * 2
68 STEP_INITIALISE = 10
69 STEP_EVENT = 2
70 STEP_FINALISE = 10
71 
72 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
73 SMAPS = False
74 
75 # -----------------------------------------------------------------------------
76 
77 # definitions
78 # ----------
79 # used to convert stored histos (in AIDA format) to ROOT format
80 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
81 
82 # used to check which type of histo we are dealing with
83 # i.e. if currentHisto in aidatypes : pass
84 aidatypes = (
85  gbl.AIDA.IHistogram,
86  gbl.AIDA.IHistogram1D,
87  gbl.AIDA.IHistogram2D,
88  gbl.AIDA.IHistogram3D,
89  gbl.AIDA.IProfile1D,
90  gbl.AIDA.IProfile2D,
91  gbl.AIDA.IBaseHistogram,
92 ) # extra?
93 
94 # similar to aidatypes
95 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
96 
97 # Types of OutputStream in Gaudi
98 WRITERTYPES = {
99  "EvtCollectionStream": "tuples",
100  "InputCopyStream": "events",
101  "OutputStream": "events",
102  "RecordStream": "records",
103  "RunRecordStream": "records",
104  "SequentialOutputStream": "events",
105  "TagCollectionStream": "tuples",
106 }
107 
108 # =============================================================================
109 
110 
111 class MiniWriter(object):
112  """
113  A class to represent a writer in the GaudiPython configuration
114  It can be non-trivial to access the name of the output file; it may be
115  specified in the DataSvc, or just on the writer, may be a list, or string
116  Also, there are three different types of writer (events, records, tuples)
117  so this bootstrap class provides easy access to this info while configuring
118  """
119 
120  def __init__(self, writer, wType, config):
121  self.w = writer
122  self.wType = wType
123  # set parameters for directly accessing the correct
124  # part of the configuration, so that later, we can do
125  # config[ key ].Output = modified(output)
126  self.key = None
127  self.output = None
128  self.ItemList = None
129  self.OptItemList = None
130  #
131  self.wName = writer.getName()
132  # Now process the Writer, find where the output is named
133  self.woutput = None
134  self.datasvcName = None
135  self.svcOutput = None
136  if hasattr(self.w, "Output"):
137  self.woutput = self.w.Output
138  self.getItemLists(config)
139  self.set(self.wName, self.w.Output)
140  return
141  else:
142  # if there is no output file, get it via the datasvc
143  # (every writer always has a datasvc property)
144  self.datasvcName = self.w.EvtDataSvc
145  datasvc = config[self.datasvcName]
146  if hasattr(datasvc, "Output"):
147  self.getItemLists(config)
148  self.set(self.datasvcName, datasvc.Output)
149  return
150 
151  def getNewName(self, replaceThis, withThis, extra=""):
152  # replace one pattern in the output name string
153  # with another, and return the Output name
154  # It *might* be in a list, so check for this
155  #
156  # @param extra : might need to add ROOT flags
157  # i.e.: OPT='RECREATE', or such
158  assert replaceThis.__class__.__name__ == "str"
159  assert withThis.__class__.__name__ == "str"
160  old = self.output
161  lst = False
162  if old.__class__.__name__ == "list":
163  old = self.output[0]
164  lst = True
165  new = old.replace(replaceThis, withThis)
166  new += extra
167  if lst:
168  return [new]
169  else:
170  return new
171 
172  def getItemLists(self, config):
173  # the item list
174  if hasattr(self.w, "ItemList"):
175  self.ItemList = self.w.ItemList
176  else:
177  datasvc = config[self.w.EvtDataSvc]
178  if hasattr(datasvc, "ItemList"):
179  self.ItemList = datasvc.ItemList
180  # The option item list; possibly not a valid variable
181  if hasattr(self.w, "OptItemList"):
182  self.OptItemList = self.w.OptItemList
183  else:
184  datasvc = config[self.w.EvtDataSvc]
185  if hasattr(datasvc, "OptItemList"):
186  self.OptItemList = datasvc.OptItemList
187  return
188 
189  def set(self, key, output):
190  self.key = key
191  self.output = output
192  return
193 
194  def __repr__(self):
195  s = ""
196  line = "-" * 80
197  s += line + "\n"
198  s += "Writer : %s\n" % (self.wName)
199  s += "Writer Type : %s\n" % (self.wType)
200  s += "Writer Output : %s\n" % (self.output)
201  s += "DataSvc : %s\n" % (self.datasvcName)
202  s += "DataSvc Output : %s\n" % (self.svcOutput)
203  s += "\n"
204  s += "Key for config : %s\n" % (self.key)
205  s += "Output File : %s\n" % (self.output)
206  s += "ItemList : %s\n" % (self.ItemList)
207  s += "OptItemList : %s\n" % (self.OptItemList)
208  s += line + "\n"
209  return s
210 
211 
212 # =============================================================================
213 
214 
216  """
217  GaudiPython algorithm used to clean up histos on the Reader and Workers
218  Only has a finalize method()
219  This retrieves a dictionary of path:histo objects and sends it to the
220  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
221  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
222  at the writer end, there will be an error.
223  """
224 
225  def __init__(self, gmpcomponent):
226  PyAlgorithm.__init__(self)
227  self._gmpc = gmpcomponent
228  self.log = self._gmpc.log
229  return None
230 
231  def execute(self):
232  return SUCCESS
233 
234  def finalize(self):
235  self.log.info("CollectHistograms Finalise (%s)" % (self._gmpc.nodeType))
236  self._gmpc.hDict = self._gmpc.dumpHistograms()
237  ks = list(self._gmpc.hDict.keys())
238  self.log.info("%i Objects in Histogram Store" % (len(ks)))
239 
240  # crashes occurred due to Memory Error during the sending of hundreds
241  # histos on slc5 machines, so instead, break into chunks
242  # send 100 at a time
243  chunk = 100
244  reps = int(len(ks) / chunk + 1)
245  for i in range(reps):
246  someKeys = ks[i * chunk : (i + 1) * chunk]
247  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
248  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
249  # "finished" Notification
250  self.log.debug("Signalling end of histos to Writer")
251  self._gmpc.hq.put("HISTOS_SENT")
252  self.log.debug("Waiting on Sync Event")
253  self._gmpc.sEvent.wait()
254  self.log.debug("Histo Sync Event set, clearing and returning")
255  self._gmpc.hvt.clearStore()
256  root = gbl.DataObject()
257  setOwnership(root, False)
258  self._gmpc.hvt.setRoot("/stat", root)
259  return SUCCESS
260 
261 
262 # =============================================================================
263 
264 
265 class EventCommunicator(object):
266  # This class is responsible for communicating Gaudi Events via Queues
267  # Events are communicated as TBufferFiles, filled either by the
268  # TESSerializer, or the GaudiSvc, "IPCSvc"
269  def __init__(self, GMPComponent, qin, qout):
270  self._gmpc = GMPComponent
271  self.log = self._gmpc.log
272  # maximum capacity of Queues
273  self.maxsize = 50
274  # central variables : Queues
275  self.qin = qin
276  self.qout = qout
277 
278  # flags
279  self.allsent = False
280  self.allrecv = False
281 
282  # statistics storage
283  self.nSent = 0 # counter : items sent
284  self.nRecv = 0 # counter : items received
285  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
286  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
287  self.qinTime = 0 # time : receiving from self.qin
288  self.qoutTime = 0 # time : sending on qout
289 
290  def send(self, item):
291  # This class manages the sending of a TBufferFile Event to a Queue
292  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
293  assert item.__class__.__name__ == "tuple"
294  startTransmission = time.time()
295  self.qout.put(item)
296  # allow the background thread to feed the Queue; not 100% guaranteed to
297  # finish before next line
298  while self.qout._buffer:
299  time.sleep(NAP)
300  self.qoutTime += time.time() - startTransmission
301  self.sizeSent += item[1].Length()
302  self.nSent += 1
303  return SUCCESS
304 
305  def receive(self, timeout=None):
306  # Receive items from self.qin
307  startWait = time.time()
308  try:
309  itemIn = self.qin.get(timeout=timeout)
310  except Empty:
311  return None
312  self.qinTime += time.time() - startWait
313  self.nRecv += 1
314  if itemIn.__class__.__name__ == "tuple":
315  self.sizeRecv += itemIn[1].Length()
316  else:
317  self.nRecv -= 1
318  try:
319  self.qin.task_done()
320  except Exception:
321  self._gmpc.log.warning(
322  "TASK_DONE called too often by : %s" % (self._gmpc.nodeType)
323  )
324  return itemIn
325 
326  def finalize(self):
327  self.log.info(
328  "Finalize Event Communicator : %s %s" % (self._gmpc, self._gmpc.nodeType)
329  )
330  # Reader sends one flag for each worker
331  # Workers send one flag each
332  # Writer sends nothing (it's at the end of the chain)
333  if self._gmpc.nodeType == "Reader":
334  downstream = self._gmpc.nWorkers
335  elif self._gmpc.nodeType == "Writer":
336  downstream = 0
337  elif self._gmpc.nodeType == "Worker":
338  downstream = 1
339 
340  for i in range(downstream):
341  self.qout.put("FINISHED")
342  if self._gmpc.nodeType != "Writer":
343  self.qout.join()
344  # Now some reporting...
345  self.statistics()
346 
347  def statistics(self):
348  self.log.name = "%s-%i Audit " % (self._gmpc.nodeType, self._gmpc.nodeID)
349  self.log.info("Items Sent : %i" % (self.nSent))
350  self.log.info("Items Received : %i" % (self.nRecv))
351  self.log.info("Data Sent : %i" % (self.sizeSent))
352  self.log.info("Data Received : %i" % (self.sizeRecv))
353  self.log.info("Q-out Time : %5.2f" % (self.qoutTime))
354  self.log.info("Q-in Time : %5.2f" % (self.qinTime))
355 
356 
357 # =============================================================================
358 
359 
360 class TESSerializer(object):
361  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
362  self.T = gaudiTESSerializer
363  self.evt = evtDataSvc
364  self.buffersIn = []
365  self.buffersOut = []
366  self.nIn = 0
367  self.nOut = 0
368  self.tDump = 0.0
369  self.tLoad = 0.0
370  # logging
371  self.nodeType = nodeType
372  self.nodeID = nodeID
373  self.log = log
374 
375  def Load(self, tbuf):
376  root = gbl.DataObject()
377  setOwnership(root, False)
378  self.evt.setRoot("/Event", root)
379  t = time.time()
380  self.T.loadBuffer(tbuf)
381  self.tLoad += time.time() - t
382  self.nIn += 1
383  self.buffersIn.append(tbuf.Length())
384 
385  def Dump(self):
386  t = time.time()
387  tb = TBufferFile(TBuffer.kWrite)
388  self.T.dumpBuffer(tb)
389  self.tDump += time.time() - t
390  self.nOut += 1
391  self.buffersOut.append(tb.Length())
392  return tb
393 
394  def Report(self):
395  evIn = "Events Loaded : %i" % (self.nIn)
396  evOut = "Events Dumped : %i" % (self.nOut)
397  din = sum(self.buffersIn)
398  dataIn = "Data Loaded : %i" % (din)
399  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
400  if self.nIn:
401  avgIn = "Avg Buf Loaded : %5.2f Mb" % (din / (self.nIn * MB))
402  maxIn = "Max Buf Loaded : %5.2f Mb" % (max(self.buffersIn) / MB)
403  else:
404  avgIn = "Avg Buf Loaded : N/A"
405  maxIn = "Max Buf Loaded : N/A"
406  dout = sum(self.buffersOut)
407  dataOut = "Data Dumped : %i" % (dout)
408  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
409  if self.nOut:
410  avgOut = "Avg Buf Dumped : %5.2f Mb" % (din / (self.nOut * MB))
411  maxOut = "Max Buf Dumped : %5.2f Mb" % (max(self.buffersOut) / MB)
412  else:
413  avgOut = "Avg Buf Dumped : N/A"
414  maxOut = "Max Buf Dumped : N/A"
415  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
416  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
417 
418  lines = (
419  evIn,
420  evOut,
421  dataIn,
422  dataInMb,
423  avgIn,
424  maxIn,
425  dataOut,
426  dataOutMb,
427  avgOut,
428  maxOut,
429  dumpTime,
430  loadTime,
431  )
432  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
433  for line in lines:
434  self.log.info(line)
435  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
436 
437 
438 # =============================================================================
439 
440 
441 class GMPComponent(object):
442  # This class will be the template for Reader, Worker and Writer
443  # containing all common components
444  # nodeId will be a numerical identifier for the node
445  # -1 for reader
446  # -2 for writer
447  # 0,...,nWorkers-1 for the Workers
448  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
449  # declare a Gaudi MultiProcessing Node
450  # the nodeType is going to be one of Reader, Worker, Writer
451  # qPair is going to be a tuple of ( qin, qout )
452  # for sending and receiving
453  # if nodeType is "Writer", it will be a list of qPairs,
454  # as there's one queue-in from each Worker
455  #
456  # params is a tuple of (nWorkers, config, log)
457 
458  self.nodeType = nodeType
459  current_process().name = nodeType
460 
461  # Synchronisation Event() objects for keeping track of the system
462  self.initEvent, eventLoopSyncer, self.finalEvent = events
463  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
464 
465  # necessary for knowledge of the system
466  self.nWorkers, self.sEvent, self.config, self.log = params
467  self.subworkers = subworkers
468  self.nodeID = nodeID
469  self.msgFormat = self.config["MessageSvc"].Format
470 
471  # describe the state of the node by the current Event Number
472  self.currentEvent = None
473 
474  # Unpack the Queues : (events, histos, filerecords)
475  self.queues = queues
476  self.num = 0
477 
478  ks = self.config.keys()
479  self.app = None
480  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
481  for k in list:
482  if k in ks:
483  self.app = k
484 
485  def Start(self):
486  # define the separate process
487  qPair, histq, fq = self.queues
488 
489  # Set up the Queue Mechanisms ( Event Communicators )
490  if self.nodeType == "Reader" or self.nodeType == "Worker":
491  # Reader or Worker Node
492  qin, qout = qPair
493  self.evcom = EventCommunicator(self, qin, qout)
494  else:
495  # Writer : many queues in, no queue out
496  assert self.nodeType == "Writer"
497  self.evcoms = []
498  qsin = qPair[0]
499  for q in qsin:
500  ec = EventCommunicator(self, q, None)
501  self.evcoms.append(ec)
502  # Histogram Queue
503  self.hq = histq
504  # FileRecords Queue
505  self.fq = fq
506 
507  # Universal Counters (available to all nodes)
508  # Use sensibly!!!
509  self.nIn = 0
510  self.nOut = 0
511 
512  # Status Flag (possibly remove later)
513  self.stat = SUCCESS
514 
515  # Set logger name
516  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
517 
518  # Heuristic variables
519  # time for init, run, final, firstEventTime, totalTime
520  self.iTime = 0.0
521  self.rTime = 0.0
522  self.fTime = 0.0
523  self.firstEvTime = 0.0
524  self.tTime = 0.0
525 
526  self.proc = Process(target=self.Engine)
527  # Fork and start the separate process
528  self.proc.start()
529 
530  def Engine(self):
531  # This will be the method carried out by the Node
532  # Different for all
533  pass
534 
536  # Different for all ; customize Configuration for multicore
537  pass
538 
539  def SetupGaudiPython(self):
540  # This method will initialize the GaudiPython Tools
541  # such as the AppMgr and so on
542  self.a = AppMgr()
543  if SMAPS:
544  from AlgSmapShot import SmapShot
545 
546  smapsLog = self.nodeType + "-" + str(self.nodeID) + ".smp"
547  ss = SmapShot(logname=smapsLog)
548  self.a.addAlgorithm(ss)
549  self.evt = self.a.evtsvc()
550  self.hvt = self.a.histsvc()
551  self.fsr = self.a.filerecordsvc()
552  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
553  self.pers = self.a.service("EventPersistencySvc", "IAddressCreator")
554  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
555  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
556  return SUCCESS
557 
558  def StartGaudiPython(self):
559  self.a.initialize()
560  self.a.start()
561  return SUCCESS
562 
563  def LoadTES(self, tbufferfile):
564  root = gbl.DataObject()
565  setOwnership(root, False)
566  self.evt.setRoot("/Event", root)
567  self.ts.loadBuffer(tbufferfile)
568 
569  def getEventNumber(self):
570  if self.app != "Gauss":
571  # Using getList or getHistoNames can result in the EventSelector
572  # re-initialising connection to RootDBase, which costs a lot of
573  # time... try to build a set of Header paths??
574 
575  # First Attempt : Unpacked Event Data
576  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
577  for l in lst:
578  path = l
579  try:
580  n = self.evt[path].evtNumber()
581 
582  return n
583  except Exception:
584  # No evt number at this path
585  continue
586 
587  # second attepmt : try DAQ/RawEvent data
588  # The Evt Number is in bank type 16, bank 0, data pt 4
589  try:
590  n = self.evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
591 
592  return n
593  except Exception:
594  pass
595 
596  # Default Action
597  if self.nIn > 0 or self.nOut > 0:
598  pass
599  else:
600  self.log.warning("Could not determine Event Number")
601  return -1
602  else:
603  if self.nodeID == -1:
604  self.num = self.num + 1
605 
606  return self.num
607 
608  def IdentifyWriters(self):
609  #
610  # Identify Writers in the Configuration
611  #
612  d = {}
613  keys = ["events", "records", "tuples", "histos"]
614  for k in keys:
615  d[k] = []
616 
617  # Identify Writers and Classify
618  wkeys = WRITERTYPES.keys()
619  for v in self.config.values():
620  if v.__class__.__name__ in wkeys:
621  writerType = WRITERTYPES[v.__class__.__name__]
622  d[writerType].append(MiniWriter(v, writerType, self.config))
623  if self.nodeID == 0:
624  self.log.info("Writer Found : %s" % (v.name()))
625 
626  # Now Check for the Histogram Service
627  if "HistogramPersistencySvc" in self.config.keys():
628  hfile = self.config["HistogramPersistencySvc"].getProp("OutputFile")
629  d["histos"].append(hfile)
630  return d
631 
632  def dumpHistograms(self):
633  """
634  Method used by the GaudiPython algorithm CollectHistos
635  to obtain a dictionary of form { path : object }
636  representing the Histogram Store
637  """
638  nlist = self.hvt.getHistoNames()
639  histDict = {}
640  objects = 0
641  histos = 0
642  if nlist:
643  for n in nlist:
644  o = self.hvt[n]
645  if type(o) in aidatypes:
646  o = aida2root(o)
647  histos += 1
648  else:
649  objects += 1
650  histDict[n] = o
651  else:
652  print("WARNING : no histograms to recover?")
653  return histDict
654 
655  def Initialize(self):
656  start = time.time()
657  self.processConfiguration()
658  self.SetupGaudiPython()
659  self.initEvent.set()
660  self.StartGaudiPython()
661 
662  if self.app == "Gauss":
663  tool = self.a.tool("ToolSvc.EvtCounter")
664  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
665  else:
666  self.cntr = cppyy.nullptr
667 
668  self.iTime = time.time() - start
669 
670  def Finalize(self):
671  start = time.time()
672  self.a.stop()
673  self.a.finalize()
674  self.log.info("%s-%i Finalized" % (self.nodeType, self.nodeID))
675  self.finalEvent.set()
676  self.fTime = time.time() - start
677 
678  def Report(self):
679  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
680  allTime = "Alive Time : %5.2f" % (self.tTime)
681  initTime = "Init Time : %5.2f" % (self.iTime)
682  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
683  runTime = "Run Time : %5.2f" % (self.rTime)
684  finTime = "Finalise Time : %5.2f" % (self.fTime)
685  tup = (allTime, initTime, frstTime, runTime, finTime)
686  for t in tup:
687  self.log.info(t)
688  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
689  # and report from the TESSerializer
690  self.TS.Report()
691 
692 
693 # =============================================================================
694 
695 
697  def __init__(self, queues, events, params, subworkers):
698  GMPComponent.__init__(self, "Reader", -1, queues, events, params, subworkers)
699 
701  # Reader :
702  # No algorithms
703  # No output
704  # No histos
705  self.config["ApplicationMgr"].TopAlg = []
706  self.config["ApplicationMgr"].OutStream = []
707  if "HistogramPersistencySvc" in self.config.keys():
708  self.config["HistogramPersistencySvc"].OutputFile = ""
709  self.config["MessageSvc"].Format = "%-13s " % "[Reader]" + self.msgFormat
710  self.evtMax = self.config["ApplicationMgr"].EvtMax
711 
712  def DumpEvent(self):
713  tb = TBufferFile(TBuffer.kWrite)
714  # print '----Reader dumping Buffer!!!'
715  self.ts.dumpBuffer(tb)
716  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
717  return tb
718 
719  def DoFirstEvent(self):
720  # Do First Event ------------------------------------------------------
721  # Check Termination Criteria
722  startFirst = time.time()
723  self.log.info("Reader : First Event")
724  if self.nOut == self.evtMax:
725  self.log.info("evtMax( %i ) reached" % (self.evtMax))
726  self.lastEvent.set()
727  return SUCCESS
728  else:
729  # Continue to read, dump and send event
730  self.a.run(1)
731  if not bool(self.evt["/Event"]):
732  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
733  self.lastEvent.set()
734  return SUCCESS
735  else:
736  # Popluate TESSerializer list and send Event
737  if self.app == "Gauss":
738  lst = self.evt.getHistoNames()
739  else:
740  try:
741  lst = self.evt.getList()
742  if self.app == "DaVinci":
743  daqnode = self.evt.retrieveObject("/Event/DAQ").registry()
744  setOwnership(daqnode, False)
745  self.evt.getList(daqnode, lst, daqnode.address().par())
746  except Exception:
747  self.log.critical("Reader could not acquire TES List!")
748  self.lastEvent.set()
749  return FAILURE
750  self.log.info("Reader : TES List : %i items" % (len(lst)))
751  for l in lst:
752  self.ts.addItem(l)
754  tb = self.TS.Dump()
755  self.log.info("First Event Sent")
756  self.evcom.send((self.currentEvent, tb))
757  self.nOut += 1
758  self.eventLoopSyncer.set()
759  self.evt.clearStore()
760  self.firstEvTime = time.time() - startFirst
761  return SUCCESS
762 
763  def Engine(self):
764  # rename process
765  import ctypes
766 
767  libc = ctypes.CDLL("libc.so.6")
768  name = str(self.nodeType) + str(self.nodeID) + "\0"
769  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
770 
771  startEngine = time.time()
772  self.log.name = "Reader"
773  self.log.info("Reader Process starting")
774 
775  self.Initialize()
776 
777  # add the Histogram Collection Algorithm
778  self.a.addAlgorithm(CollectHistograms(self))
779 
780  self.log.info("Reader Beginning Distribution")
781  sc = self.DoFirstEvent()
782  if sc.isSuccess():
783  self.log.info("Reader First Event OK")
784  else:
785  self.log.critical("Reader Failed on First Event")
786  self.stat = FAILURE
787 
788  # Do All Others -------------------------------------------------------
789  while True:
790  # Check Termination Criteria
791  if self.nOut == self.evtMax:
792  self.log.info("evtMax( %i ) reached" % (self.evtMax))
793  break
794  # Check Health
795  if not self.stat.isSuccess():
796  self.log.critical("Reader is Damaged!")
797  break
798  # Continue to read, dump and send event
799  t = time.time()
800  self.a.run(1)
801  self.rTime += time.time() - t
802  if not bool(self.evt["/Event"]):
803  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
804  break
805  self.currentEvent = self.getEventNumber()
806  tb = self.TS.Dump()
807  self.evcom.send((self.currentEvent, tb))
808  # clean up
809  self.nOut += 1
810  self.eventLoopSyncer.set()
811  self.evt.clearStore()
812  self.log.info("Setting <Last> Event")
813  self.lastEvent.set()
814 
815  # Finalize
816  self.log.info("Reader : Event Distribution complete.")
817  self.evcom.finalize()
818  self.Finalize()
819  self.tTime = time.time() - startEngine
820  self.Report()
821 
822 
823 # =============================================================================
824 
825 
827  def __init__(self, workerID, queues, events, params, subworkers):
828  GMPComponent.__init__(
829  self, "Worker", workerID, queues, events, params, subworkers
830  )
831  # Identify the writer streams
833  # Identify the accept/veto checks for each event
834  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
835  self.log.info("Subworker-%i Created OK" % (self.nodeID))
836  self.eventOutput = True
837 
838  def Engine(self):
839  # rename process
840  import ctypes
841 
842  libc = ctypes.CDLL("libc.so.6")
843  name = str(self.nodeType) + str(self.nodeID) + "\0"
844  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
845 
846  self.initEvent.set()
847  startEngine = time.time()
848  msg = self.a.service("MessageSvc")
849  msg.Format = "%-13s " % ("[" + self.log.name + "]") + self.msgFormat
850 
851  self.log.name = "Worker-%i" % (self.nodeID)
852  self.log.info("Subworker %i starting Engine" % (self.nodeID))
854 
855  # populate the TESSerializer itemlist
856  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
857 
858  self.a.addAlgorithm(CollectHistograms(self))
859 
860  # Begin processing
861  Go = True
862  while Go:
863  packet = self.evcom.receive()
864  if packet:
865  pass
866  else:
867  continue
868  if packet == "FINISHED":
869  break
870  evtNumber, tbin = packet # unpack
871  if self.cntr != cppyy.nullptr:
872  self.cntr.setEventCounter(evtNumber)
873 
874  self.nIn += 1
875  self.TS.Load(tbin)
876 
877  t = time.time()
878  sc = self.a.executeEvent()
879  if self.nIn == 1:
880  self.firstEvTime = time.time() - t
881  else:
882  self.rTime += time.time() - t
883  if sc.isSuccess():
884  pass
885  else:
886  self.log.name = "Worker-%i" % (self.nodeID)
887  self.log.warning("Did not Execute Event")
888  self.evt.clearStore()
889  continue
890  if self.isEventPassed():
891  pass
892  else:
893  self.log.name = "Worker-%i" % (self.nodeID)
894  self.log.warning("Event did not pass : %i" % (evtNumber))
895  self.evt.clearStore()
896  continue
897  if self.eventOutput:
898  # It may be the case of generating Event Tags; hence
899  # no event output
901  tb = self.TS.Dump()
902  self.evcom.send((self.currentEvent, tb))
903  self.nOut += 1
904  self.inc.fireIncident(gbl.Incident("Subworker", "EndEvent"))
905  self.eventLoopSyncer.set()
906  self.evt.clearStore()
907  self.log.name = "Worker-%i" % (self.nodeID)
908  self.log.info("Setting <Last> Event %s" % (self.nodeID))
909  self.lastEvent.set()
910 
911  self.evcom.finalize()
912  # Now send the FileRecords and stop/finalize the appMgr
913  self.filerecordsAgent.SendFileRecords()
914  self.tTime = time.time() - startEngine
915  self.Finalize()
916  self.Report()
917  # self.finalEvent.set()
918 
919  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
920  self.a = a
921  self.evt = evt
922  self.hvt = hvt
923  self.fsr = fsr
924  # self.inc = inc
925  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
926  self.pers = pers
927  self.ts = ts
928  self.cntr = cntr
929  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
930 
931  def getCheckAlgs(self):
932  """
933  For some output writers, a check is performed to see if the event has
934  executed certain algorithms.
935  These reside in the AcceptAlgs property for those writers
936  """
937  acc = []
938  req = []
939  vet = []
940  for m in self.writerDict["events"]:
941  if hasattr(m.w, "AcceptAlgs"):
942  acc += m.w.AcceptAlgs
943  if hasattr(m.w, "RequireAlgs"):
944  req += m.w.RequireAlgs
945  if hasattr(m.w, "VetoAlgs"):
946  vet += m.w.VetoAlgs
947  return (acc, req, vet)
948 
949  def checkExecutedPassed(self, algName):
950  if (
951  self.a.algorithm(algName)._ialg.isExecuted()
952  and self.a.algorithm(algName)._ialg.filterPassed()
953  ):
954  return True
955  else:
956  return False
957 
958  def isEventPassed(self):
959  """
960  Check the algorithm status for an event.
961  Depending on output writer settings, the event
962  may be declined based on various criteria.
963  This is a transcript of the check that occurs in GaudiSvc::OutputStream
964  """
965  passed = False
966 
967  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
968  if self.acceptAlgs:
969  for name in self.acceptAlgs:
970  if self.checkExecutedPassed(name):
971  passed = True
972  break
973  else:
974  passed = True
975 
976  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
977  for name in self.requireAlgs:
978  if self.checkExecutedPassed(name):
979  pass
980  else:
981  self.log.info("Evt declined (requireAlgs) : %s" % (name))
982  passed = False
983 
984  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
985  for name in self.vetoAlgs:
986  if self.checkExecutedPassed(name):
987  pass
988  else:
989  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
990  passed = False
991  return passed
992 
993 
994 # =============================================================================
995 
996 
998  def __init__(self, workerID, queues, events, params, subworkers):
999  GMPComponent.__init__(
1000  self, "Worker", workerID, queues, events, params, subworkers
1001  )
1002  # Identify the writer streams
1004  # Identify the accept/veto checks for each event
1005  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
1006  self.log.name = "Worker-%i" % (self.nodeID)
1007  self.log.info("Worker-%i Created OK" % (self.nodeID))
1008  self.eventOutput = True
1009 
1011  # Worker :
1012  # No input
1013  # No output
1014  # No Histos
1015  self.config["EventSelector"].Input = []
1016  self.config["ApplicationMgr"].OutStream = []
1017  if "HistogramPersistencySvc" in self.config.keys():
1018  self.config["HistogramPersistencySvc"].OutputFile = ""
1019  formatHead = "[Worker-%i]" % (self.nodeID)
1020  self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1021 
1022  for key, lst in self.writerDict.items():
1023  self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1024 
1025  for m in self.writerDict["tuples"]:
1026  # rename Tuple output file with an appendix
1027  # based on worker id, for merging later
1028  newName = m.getNewName(".", ".w%i." % (self.nodeID))
1029  self.config[m.key].Output = newName
1030 
1031  # Suppress INFO Output for all but Worker-0
1032  # if self.nodeID == 0 :
1033  # pass
1034  # else :
1035  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1036 
1037  if self.app == "Gauss":
1038  try:
1039  if "ToolSvc.EvtCounter" not in self.config:
1040  from Configurables import EvtCounter
1041 
1042  counter = EvtCounter()
1043  else:
1044  counter = self.config["ToolSvc.EvtCounter"]
1045  counter.UseIncident = False
1046  except Exception:
1047  # ignore errors when trying to change the configuration of the EvtCounter
1048  self.log.warning("Cannot configure EvtCounter")
1049 
1050  def Engine(self):
1051  # rename process
1052  import ctypes
1053 
1054  libc = ctypes.CDLL("libc.so.6")
1055  name = str(self.nodeType) + str(self.nodeID) + "\0"
1056  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1057 
1058  startEngine = time.time()
1059  self.log.info("Worker %i starting Engine" % (self.nodeID))
1060  self.Initialize()
1062 
1063  # populate the TESSerializer itemlist
1064  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1065 
1066  nEventWriters = len(self.writerDict["events"])
1067  if nEventWriters:
1068  itemList = set()
1069  optItemList = set()
1070  for m in self.writerDict["events"]:
1071  for item in m.ItemList:
1072  hsh = item.find("#")
1073  if hsh != -1:
1074  item = item[:hsh]
1075  itemList.add(item)
1076  for item in m.OptItemList:
1077  hsh = item.find("#")
1078  if hsh != -1:
1079  item = item[:hsh]
1080  optItemList.add(item)
1081  # If an item is mandatory and optional, keep it only in the optional list
1082  itemList -= optItemList
1083  for item in sorted(itemList):
1084  self.log.info(" adding ItemList Item to ts : %s" % (item))
1085  self.ts.addItem(item)
1086  for item in sorted(optItemList):
1087  self.log.info(" adding Optional Item to ts : %s" % (item))
1088  self.ts.addOptItem(item)
1089  else:
1090  self.log.info("There is no Event Output for this app")
1091  self.eventOutput = False
1092 
1093  # Begin processing
1094  Go = True
1095  while Go:
1096  packet = self.evcom.receive()
1097  if packet:
1098  pass
1099  else:
1100  continue
1101  if packet == "FINISHED":
1102  break
1103  evtNumber, tbin = packet # unpack
1104  if self.cntr != cppyy.nullptr:
1105  self.cntr.setEventCounter(evtNumber)
1106 
1107  # subworkers are forked before the first event is processed
1108  if self.nIn == 0:
1109  self.log.info("Fork new subworkers")
1110 
1111  # Fork subworkers and share services
1112  for k in self.subworkers:
1113  k.SetServices(
1114  self.a,
1115  self.evt,
1116  self.hvt,
1117  self.fsr,
1118  self.inc,
1119  self.pers,
1120  self.ts,
1121  self.cntr,
1122  )
1123  k.Start()
1124  self.a.addAlgorithm(CollectHistograms(self))
1125  self.nIn += 1
1126  self.TS.Load(tbin)
1127 
1128  t = time.time()
1129  sc = self.a.executeEvent()
1130  if self.nIn == 1:
1131  self.firstEvTime = time.time() - t
1132  else:
1133  self.rTime += time.time() - t
1134  if sc.isSuccess():
1135  pass
1136  else:
1137  self.log.warning("Did not Execute Event")
1138  self.evt.clearStore()
1139  continue
1140  if self.isEventPassed():
1141  pass
1142  else:
1143  self.log.warning("Event did not pass : %i" % (evtNumber))
1144  self.evt.clearStore()
1145  continue
1146  if self.eventOutput:
1147  # It may be the case of generating Event Tags; hence
1148  # no event output
1150  tb = self.TS.Dump()
1151  self.evcom.send((self.currentEvent, tb))
1152  self.nOut += 1
1153  self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1154  self.eventLoopSyncer.set()
1155  self.evt.clearStore()
1156  self.log.info("Setting <Last> Event")
1157  self.lastEvent.set()
1158 
1159  self.evcom.finalize()
1160  self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1161  # Now send the FileRecords and stop/finalize the appMgr
1162  self.filerecordsAgent.SendFileRecords()
1163  self.Finalize()
1164  self.tTime = time.time() - startEngine
1165  self.Report()
1166 
1167  for k in self.subworkers:
1168  self.log.info("Join subworkers")
1169  k.proc.join()
1170 
1171  def getCheckAlgs(self):
1172  """
1173  For some output writers, a check is performed to see if the event has
1174  executed certain algorithms.
1175  These reside in the AcceptAlgs property for those writers
1176  """
1177  acc = []
1178  req = []
1179  vet = []
1180  for m in self.writerDict["events"]:
1181  if hasattr(m.w, "AcceptAlgs"):
1182  acc += m.w.AcceptAlgs
1183  if hasattr(m.w, "RequireAlgs"):
1184  req += m.w.RequireAlgs
1185  if hasattr(m.w, "VetoAlgs"):
1186  vet += m.w.VetoAlgs
1187  return (acc, req, vet)
1188 
1189  def checkExecutedPassed(self, algName):
1190  if (
1191  self.a.algorithm(algName)._ialg.isExecuted()
1192  and self.a.algorithm(algName)._ialg.filterPassed()
1193  ):
1194  return True
1195  else:
1196  return False
1197 
1198  def isEventPassed(self):
1199  """
1200  Check the algorithm status for an event.
1201  Depending on output writer settings, the event
1202  may be declined based on various criteria.
1203  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1204  """
1205  passed = False
1206 
1207  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1208  if self.acceptAlgs:
1209  for name in self.acceptAlgs:
1210  if self.checkExecutedPassed(name):
1211  passed = True
1212  break
1213  else:
1214  passed = True
1215 
1216  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1217  for name in self.requireAlgs:
1218  if self.checkExecutedPassed(name):
1219  pass
1220  else:
1221  self.log.info("Evt declined (requireAlgs) : %s" % (name))
1222  passed = False
1223 
1224  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1225  for name in self.vetoAlgs:
1226  if self.checkExecutedPassed(name):
1227  pass
1228  else:
1229  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1230  passed = False
1231  return passed
1232 
1233 
1234 # =============================================================================
1235 
1236 
1238  def __init__(self, queues, events, params, subworkers):
1239  GMPComponent.__init__(self, "Writer", -2, queues, events, params, subworkers)
1240  # Identify the writer streams
1242  # This keeps track of workers as they finish
1243  self.status = [False] * self.nWorkers
1244  self.log.name = "Writer--2"
1245 
1247  # Writer :
1248  # No input
1249  # No Algs
1250  self.config["ApplicationMgr"].TopAlg = []
1251  self.config["EventSelector"].Input = []
1252 
1253  self.config["MessageSvc"].Format = "%-13s " % "[Writer]" + self.msgFormat
1254 
1255  def Engine(self):
1256  # rename process
1257  import ctypes
1258 
1259  libc = ctypes.CDLL("libc.so.6")
1260  name = str(self.nodeType) + str(self.nodeID) + "\0"
1261  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1262 
1263  self.Initialize()
1264  self.histoAgent = HistoAgent(self)
1266 
1267  # Begin processing
1268  Go = True
1269  current = -1
1270  while Go:
1271  current = (current + 1) % self.nWorkers
1272  packet = self.evcoms[current].receive(timeout=0.01)
1273  if packet is None:
1274  continue
1275  if packet == "FINISHED":
1276  self.log.info("Writer got FINISHED flag : Worker %i" % (current))
1277 
1278  self.status[current] = True
1279  if all(self.status):
1280  self.log.info("FINISHED recd from all workers, break loop")
1281  break
1282  continue
1283  # otherwise, continue as normal
1284  self.nIn += 1 # update central count (maybe needed by FSR store)
1285  evtNumber, tbin = packet # unpack
1286  self.TS.Load(tbin)
1287  t = time.time()
1288  self.a.executeEvent()
1289  self.rTime += time.time() - t
1291  self.evt.clearStore()
1292  self.eventLoopSyncer.set()
1293  self.log.name = "Writer--2"
1294  self.log.info("Setting <Last> Event")
1295  self.lastEvent.set()
1296 
1297  # finalisation steps
1298  [e.finalize() for e in self.evcoms]
1299  # Now do Histograms
1300  sc = self.histoAgent.Receive()
1301  sc = self.histoAgent.RebuildHistoStore()
1302  if sc.isSuccess():
1303  self.log.info("Histo Store rebuilt ok")
1304  else:
1305  self.log.warning("Histo Store Error in Rebuild")
1306 
1307  # Now do FileRecords
1308  sc = self.filerecordsAgent.Receive()
1309  self.filerecordsAgent.Rebuild()
1310  self.Finalize()
1311  # self.rTime = time.time()-startEngine
1312  self.Report()
1313 
1314 
1315 # =============================================================================
1316 
1317 # =============================================================================
1318 
1319 
1320 class Coord(object):
1321  def __init__(self, nWorkers, config, log):
1322  self.log = log
1323  self.config = config
1324  # set up Logging
1325  self.log.name = "GaudiPython-Parallel-Logger"
1326  self.log.info("GaudiPython Parallel Process Co-ordinator beginning")
1327 
1328  if nWorkers == -1:
1329  # The user has requested all available cpus in the machine
1330  self.nWorkers = cpu_count()
1331  else:
1332  self.nWorkers = nWorkers
1333 
1334  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1335  self.hq = JoinableQueue() # for Histogram data
1336  self.fq = JoinableQueue() # for FileRecords data
1337 
1338  # Make a Syncer for Initalise, Run, and Finalise
1339  self.sInit = Syncer(
1340  self.nWorkers, self.log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1341  )
1342  self.sRun = Syncer(
1343  self.nWorkers,
1344  self.log,
1345  manyEvents=True,
1346  limit=WAIT_SINGLE_EVENT,
1347  step=STEP_EVENT,
1348  firstEvent=WAIT_FIRST_EVENT,
1349  )
1350  self.sFin = Syncer(
1351  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE
1352  )
1353  # and one final one for Histogram Transfer
1354  self.histSyncEvent = Event()
1355 
1356  # params are common to al subprocesses
1357  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1358 
1359  self.subworkers = []
1360  # Declare SubProcesses!
1361  for i in range(1, self.nWorkers):
1362  sub = Subworker(
1363  i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers
1364  )
1365  self.subworkers.append(sub)
1366  self.reader = Reader(
1367  self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers
1368  )
1369  self.workers = []
1370  wk = Worker(
1371  0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers
1372  )
1373  self.writer = Writer(
1374  self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers
1375  )
1376 
1377  self.system = []
1378  self.system.append(self.writer)
1379  self.system.append(wk)
1380  self.system.append(self.reader)
1381 
1382  def getSyncEvents(self, nodeID):
1383  init = self.sInit.d[nodeID].event
1384  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1385  fin = self.sFin.d[nodeID].event
1386  return (init, run, fin)
1387 
1388  def getQueues(self, nodeID):
1389  eventQ = self.qs[nodeID]
1390  histQ = self.hq
1391  fsrQ = self.fq
1392  return (eventQ, histQ, fsrQ)
1393 
1394  def Go(self):
1395  # Initialise
1396  self.log.name = "GaudiPython-Parallel-Logger"
1397  self.log.info("INITIALISING SYSTEM")
1398 
1399  # Start reader, writer and main worker
1400  for p in self.system:
1401  p.Start()
1402 
1403  sc = self.sInit.syncAll(step="Initialise")
1404  if sc == SUCCESS:
1405  pass
1406  else:
1407  self.Terminate()
1408  return FAILURE
1409 
1410  # Run
1411  self.log.name = "GaudiPython-Parallel-Logger"
1412  self.log.info("RUNNING SYSTEM")
1413  sc = self.sRun.syncAll(step="Run")
1414  if sc == SUCCESS:
1415  pass
1416  else:
1417  self.Terminate()
1418  return FAILURE
1419 
1420  # Finalise
1421  self.log.name = "GaudiPython-Parallel-Logger"
1422  self.log.info("FINALISING SYSTEM")
1423  sc = self.sFin.syncAll(step="Finalise")
1424  if sc == SUCCESS:
1425  pass
1426  else:
1427  self.Terminate()
1428  return FAILURE
1429 
1430  # if we've got this far, finally report SUCCESS
1431  self.log.info("Cleanly join all Processes")
1432  self.Stop()
1433  self.log.info("Report Total Success to Main.py")
1434  return SUCCESS
1435 
1436  def Terminate(self):
1437  # Brutally kill sub-processes
1438  children = multiprocessing.active_children()
1439  for i in children:
1440  i.terminate()
1441 
1442  # self.writer.proc.terminate()
1443  # [ w.proc.terminate() for w in self.workers]
1444  # self.reader.proc.terminate()
1445 
1446  def Stop(self):
1447  # procs should be joined in reverse order to launch
1448  self.system.reverse()
1449  for s in self.system:
1450  s.proc.join()
1451  return SUCCESS
1452 
1453  def SetupQueues(self):
1454  # This method will set up the network of Queues needed
1455  # N Queues = nWorkers + 1
1456  # Each Worker has a Queue in, and a Queue out
1457  # Reader has Queue out only
1458  # Writer has nWorkers Queues in
1459 
1460  # one queue from Reader-Workers
1461  rwk = JoinableQueue()
1462  # one queue from each worker to writer
1463  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1464  d = {}
1465  d[-1] = (None, rwk) # Reader
1466  d[-2] = (workersWriter, None) # Writer
1467  for i in range(self.nWorkers):
1468  d[i] = (rwk, workersWriter[i])
1469  return d
1470 
1471 
1472 # ============================= EOF ===========================================
GaudiMP.GMPBase.CollectHistograms
Definition: GMPBase.py:215
GaudiMP.GMPBase.Coord.workers
workers
Definition: GMPBase.py:1369
AlgSequencer.all
all
Definition: AlgSequencer.py:56
GaudiMP.GMPBase.GMPComponent.num
num
Definition: GMPBase.py:476
GaudiMP.GMPBase.TESSerializer.nOut
nOut
Definition: GMPBase.py:367
GaudiMP.GMPBase.TESSerializer.__init__
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:361
GaudiMP.GMPBase.Coord.fq
fq
Definition: GMPBase.py:1336
GaudiMP.GMPBase.EventCommunicator.receive
def receive(self, timeout=None)
Definition: GMPBase.py:305
GaudiMP.GMPBase.GMPComponent.nodeID
nodeID
Definition: GMPBase.py:468
GaudiMP.GMPBase.GMPComponent.__init__
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:448
GaudiMP.GMPBase.MiniWriter.OptItemList
OptItemList
Definition: GMPBase.py:129
GaudiMP.GMPBase.EventCommunicator.log
log
Definition: GMPBase.py:271
GaudiMP.GMPBase.GMPComponent.tTime
tTime
Definition: GMPBase.py:524
GaudiMP.GMPBase.Coord.SetupQueues
def SetupQueues(self)
Definition: GMPBase.py:1453
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:238
GaudiMP.GMPBase.GMPComponent.getEventNumber
def getEventNumber(self)
Definition: GMPBase.py:569
GaudiMP.GMPBase.Writer.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1246
GaudiMP.GMPBase.GMPComponent.IdentifyWriters
def IdentifyWriters(self)
Definition: GMPBase.py:608
GaudiMP.GMPBase.TESSerializer.tLoad
tLoad
Definition: GMPBase.py:369
GaudiPython::PyAlgorithm::execute
StatusCode execute() override
Definition: Algorithm.cpp:110
GaudiMP.GMPBase.EventCommunicator.send
def send(self, item)
Definition: GMPBase.py:290
GaudiMP.GMPBase.Worker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1061
GaudiMP.GMPBase.Coord.sRun
sRun
Definition: GMPBase.py:1342
GaudiMP.GMPBase.TESSerializer.buffersIn
buffersIn
Definition: GMPBase.py:364
GaudiMP.GMPBase.GMPComponent.TS
TS
Definition: GMPBase.py:555
GaudiMP.GMPBase.GMPComponent.stat
stat
Definition: GMPBase.py:513
GaudiMP.GMPBase.Subworker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:853
GaudiMP.GMPBase.GMPComponent.dumpHistograms
def dumpHistograms(self)
Definition: GMPBase.py:632
GaudiMP.GMPBase.EventCommunicator.sizeSent
sizeSent
Definition: GMPBase.py:285
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
GaudiMP.GMPBase.TESSerializer
Definition: GMPBase.py:360
GaudiMP.GMPBase.GMPComponent.nIn
nIn
Definition: GMPBase.py:509
GaudiMP.GMPBase.MiniWriter.wType
wType
Definition: GMPBase.py:122
GaudiMP.GMPBase.GMPComponent
Definition: GMPBase.py:441
GaudiMP.GMPBase.Worker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:1005
GaudiMP.GMPBase.Reader.evtMax
evtMax
Definition: GMPBase.py:710
GaudiMP.GMPBase.MiniWriter.ItemList
ItemList
Definition: GMPBase.py:128
GaudiMP.GMPBase.EventCommunicator.maxsize
maxsize
Definition: GMPBase.py:273
GaudiMP.GMPBase.EventCommunicator.statistics
def statistics(self)
Definition: GMPBase.py:347
GaudiMP.GMPBase.MiniWriter.getNewName
def getNewName(self, replaceThis, withThis, extra="")
Definition: GMPBase.py:151
GaudiMP.GMPBase.Worker.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1010
GaudiMP.GMPBase.Coord.qs
qs
Definition: GMPBase.py:1334
GaudiMP.GMPBase.GMPComponent.Initialize
def Initialize(self)
Definition: GMPBase.py:655
GaudiMP.GMPBase.Subworker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:931
GaudiMP.GMPBase.Coord.nWorkers
nWorkers
Definition: GMPBase.py:1330
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:863
GaudiMP.GMPBase.EventCommunicator.__init__
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:269
GaudiMP.GMPBase.GMPComponent.fTime
fTime
Definition: GMPBase.py:522
GaudiMP.GMPBase.Writer
Definition: GMPBase.py:1237
max
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:225
GaudiMP.GMPBase.Coord
Definition: GMPBase.py:1320
GaudiPython::PyAlgorithm
Definition: Algorithm.h:40
GaudiMP.GMPBase.Subworker
Definition: GMPBase.py:826
GaudiMP.GMPBase.GMPComponent.hq
hq
Definition: GMPBase.py:503
GaudiMP.GMPBase.Coord.hq
hq
Definition: GMPBase.py:1335
GaudiPython.Bindings.setOwnership
setOwnership
Definition: Bindings.py:131
GaudiMP.GMPBase.Reader.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:700
GaudiMP.GMPBase.Coord.Terminate
def Terminate(self)
Definition: GMPBase.py:1436
GaudiMP.GMPBase.Worker.Engine
def Engine(self)
Definition: GMPBase.py:1050
GaudiMP.GMPBase.Worker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:998
GaudiMP.GMPBase.Writer.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1238
IOTest.start
start
Definition: IOTest.py:108
GaudiMP.GMPBase.Subworker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:827
GaudiMP.GMPBase.Subworker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:834
GaudiMP.GMPBase.GMPComponent.pers
pers
Definition: GMPBase.py:553
GaudiMP.GMPBase.GMPComponent.hvt
hvt
Definition: GMPBase.py:550
GaudiMP.GMPBase.TESSerializer.T
T
Definition: GMPBase.py:362
GaudiMP.GMPBase.TESSerializer.log
log
Definition: GMPBase.py:373
GaudiMP.GMPBase.Worker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:1198
GaudiMP.GMPBase.GMPComponent.iTime
iTime
Definition: GMPBase.py:520
GaudiMP.GMPBase.TESSerializer.nodeID
nodeID
Definition: GMPBase.py:372
GaudiMP.GMPBase.GMPComponent.currentEvent
currentEvent
Definition: GMPBase.py:472
GaudiMP.GMPBase.GMPComponent.app
app
Definition: GMPBase.py:479
compareOutputFiles.par
par
Definition: compareOutputFiles.py:477
GaudiMP.GMPBase.Writer.histoAgent
histoAgent
Definition: GMPBase.py:1264
GaudiMP.GMPBase.GMPComponent.firstEvTime
firstEvTime
Definition: GMPBase.py:523
GaudiMP.GMPBase.EventCommunicator.qin
qin
Definition: GMPBase.py:275
GaudiMP.GMPBase.Reader.DumpEvent
def DumpEvent(self)
Definition: GMPBase.py:712
GaudiMP.GMPBase.TESSerializer.buffersOut
buffersOut
Definition: GMPBase.py:365
GaudiPluginService.cpluginsvc.registry
def registry()
Definition: cpluginsvc.py:84
GaudiMP.GMPBase.MiniWriter.woutput
woutput
Definition: GMPBase.py:133
GaudiMP.GMPBase.GMPComponent.proc
proc
Definition: GMPBase.py:526
GaudiMP.GMPBase.Coord.config
config
Definition: GMPBase.py:1323
bug_34121.tool
tool
Definition: bug_34121.py:17
GaudiMP.GMPBase.GMPComponent.inc
inc
Definition: GMPBase.py:552
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: details.h:440
GaudiMP.GMPBase.MiniWriter.getItemLists
def getItemLists(self, config)
Definition: GMPBase.py:172
GaudiMP.GMPBase.GMPComponent.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:535
GaudiMP.pTools.Syncer
Definition: pTools.py:628
GaudiMP.GMPBase.Reader.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:697
GaudiMP.GMPBase.MiniWriter.w
w
Definition: GMPBase.py:121
GaudiMP.GMPBase.Writer.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1265
GaudiMP.GMPBase.Coord.sFin
sFin
Definition: GMPBase.py:1350
GaudiMP.GMPBase.Coord.reader
reader
Definition: GMPBase.py:1366
GaudiMP.GMPBase.Subworker.eventOutput
eventOutput
Definition: GMPBase.py:836
GaudiMP.GMPBase.GMPComponent.lastEvent
lastEvent
Definition: GMPBase.py:463
GaudiMP.GMPBase.GMPComponent.nOut
nOut
Definition: GMPBase.py:510
GaudiMP.GMPBase.Worker.eventOutput
eventOutput
Definition: GMPBase.py:1008
GaudiMP.GMPBase.GMPComponent.finalEvent
finalEvent
Definition: GMPBase.py:462
GaudiMP.GMPBase.MiniWriter.key
key
Definition: GMPBase.py:126
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:584
GaudiMP.GMPBase.EventCommunicator.allrecv
allrecv
Definition: GMPBase.py:280
GaudiMP.GMPBase.EventCommunicator.qoutTime
qoutTime
Definition: GMPBase.py:288
GaudiMP.GMPBase.EventCommunicator.finalize
def finalize(self)
Definition: GMPBase.py:326
GaudiMP.GMPBase.CollectHistograms.__init__
def __init__(self, gmpcomponent)
Definition: GMPBase.py:225
GaudiMP.GMPBase.TESSerializer.Dump
def Dump(self)
Definition: GMPBase.py:385
GaudiMP.GMPBase.TESSerializer.tDump
tDump
Definition: GMPBase.py:368
GaudiMP.GMPBase.Worker.writerDict
writerDict
Definition: GMPBase.py:1003
GaudiMP.GMPBase.TESSerializer.nIn
nIn
Definition: GMPBase.py:366
GaudiMP.GMPBase.EventCommunicator.qout
qout
Definition: GMPBase.py:276
GaudiMP.GMPBase.GMPComponent.Engine
def Engine(self)
Definition: GMPBase.py:530
GaudiMP.GMPBase.Coord.Stop
def Stop(self)
Definition: GMPBase.py:1446
GaudiMP.GMPBase.MiniWriter.set
def set(self, key, output)
Definition: GMPBase.py:189
GaudiMP.GMPBase.EventCommunicator.qinTime
qinTime
Definition: GMPBase.py:287
GaudiMP.GMPBase.Reader.DoFirstEvent
def DoFirstEvent(self)
Definition: GMPBase.py:719
GaudiMP.GMPBase.Coord.__init__
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1321
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.GMPBase.Writer.status
status
Definition: GMPBase.py:1243
GaudiMP.GMPBase.GMPComponent.evcoms
evcoms
Definition: GMPBase.py:497
GaudiMP.GMPBase.TESSerializer.Report
def Report(self)
Definition: GMPBase.py:394
GaudiMP.GMPBase.Worker
Definition: GMPBase.py:997
GaudiMP.GMPBase.GMPComponent.fq
fq
Definition: GMPBase.py:505
GaudiMP.GMPBase.TESSerializer.nodeType
nodeType
Definition: GMPBase.py:371
GaudiMP.GMPBase.MiniWriter.datasvcName
datasvcName
Definition: GMPBase.py:134
GaudiMP.GMPBase.GMPComponent.SetupGaudiPython
def SetupGaudiPython(self)
Definition: GMPBase.py:539
GaudiMP.GMPBase.EventCommunicator._gmpc
_gmpc
Definition: GMPBase.py:270
GaudiMP.GMPBase.GMPComponent.queues
queues
Definition: GMPBase.py:475
GaudiMP.GMPBase.Coord.histSyncEvent
histSyncEvent
Definition: GMPBase.py:1354
DataOnDemand.Dump
bool Dump
Definition: DataOnDemand.py:35
GaudiMP.GMPBase.Subworker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:958
GaudiMP.GMPBase.EventCommunicator.allsent
allsent
Definition: GMPBase.py:279
GaudiMP.GMPBase.MiniWriter.__init__
def __init__(self, writer, wType, config)
Definition: GMPBase.py:120
GaudiMP.GMPBase.Writer.Engine
def Engine(self)
Definition: GMPBase.py:1255
GaudiMP.GMPBase.GMPComponent.fsr
fsr
Definition: GMPBase.py:551
GaudiMP.GMPBase.EventCommunicator.nSent
nSent
Definition: GMPBase.py:283
gaudirun.type
type
Definition: gaudirun.py:162
GaudiMP.GMPBase.CollectHistograms.log
log
Definition: GMPBase.py:228
GaudiMP.GMPBase.aida2root
aida2root
Definition: GMPBase.py:80
GaudiMP.GMPBase.GMPComponent.ts
ts
Definition: GMPBase.py:554
GaudiMP.GMPBase.Coord.subworkers
subworkers
Definition: GMPBase.py:1359
GaudiMP.GMPBase.EventCommunicator
Definition: GMPBase.py:265
GaudiMP.GMPBase.Subworker.SetServices
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:919
GaudiMP.GMPBase.Coord.getQueues
def getQueues(self, nodeID)
Definition: GMPBase.py:1388
GaudiMP.GMPBase.Coord.sInit
sInit
Definition: GMPBase.py:1339
GaudiMP.GMPBase.GMPComponent.rTime
rTime
Definition: GMPBase.py:521
GaudiMP.GMPBase.MiniWriter.wName
wName
Definition: GMPBase.py:131
GaudiMP.GMPBase.Coord.getSyncEvents
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1382
GaudiMP.GMPBase.Reader.Engine
def Engine(self)
Definition: GMPBase.py:763
GaudiMP.GMPBase.GMPComponent.subworkers
subworkers
Definition: GMPBase.py:467
GaudiMP.GMPBase.Coord.Go
def Go(self)
Definition: GMPBase.py:1394
StringKeyEx.keys
keys
Definition: StringKeyEx.py:63
GaudiMP.GMPBase.Reader
Definition: GMPBase.py:696
GaudiPython.Bindings.InterfaceCast
Definition: Bindings.py:141
GaudiMP.GMPBase.MiniWriter.__repr__
def __repr__(self)
Definition: GMPBase.py:194
GaudiMP.GMPBase.Coord.log
log
Definition: GMPBase.py:1322
GaudiMP.GMPBase.GMPComponent.nodeType
nodeType
Definition: GMPBase.py:458
GaudiMP.GMPBase.GMPComponent.evcom
evcom
Definition: GMPBase.py:493
GaudiMP.GMPBase.Writer.writerDict
writerDict
Definition: GMPBase.py:1241
GaudiMP.GMPBase.Coord.writer
writer
Definition: GMPBase.py:1373
GaudiMP.GMPBase.TESSerializer.Load
def Load(self, tbuf)
Definition: GMPBase.py:375
GaudiMP.GMPBase.GMPComponent.Report
def Report(self)
Definition: GMPBase.py:678
GaudiMP.GMPBase.Worker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1189
GaudiPython::PyAlgorithm::finalize
StatusCode finalize() override
Definition: Algorithm.cpp:114
GaudiMP.GMPBase.EventCommunicator.sizeRecv
sizeRecv
Definition: GMPBase.py:286
GaudiMP.GMPBase.MiniWriter
Definition: GMPBase.py:111
GaudiMP.GMPBase.MiniWriter.svcOutput
svcOutput
Definition: GMPBase.py:135
GaudiMP.GMPBase.GMPComponent.Finalize
def Finalize(self)
Definition: GMPBase.py:670
GaudiMP.GMPBase.GMPComponent.a
a
Definition: GMPBase.py:542
GaudiMP.pTools
Definition: pTools.py:1
GaudiMP.GMPBase.Coord.system
system
Definition: GMPBase.py:1377
GaudiMP.GMPBase.Subworker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:949
GaudiMP.GMPBase.Worker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:1171
GaudiMP.GMPBase.Subworker.writerDict
writerDict
Definition: GMPBase.py:832
GaudiMP.GMPBase.GMPComponent.msgFormat
msgFormat
Definition: GMPBase.py:469
GaudiMP.GMPBase.GMPComponent.evt
evt
Definition: GMPBase.py:549
GaudiMP.GMPBase.EventCommunicator.nRecv
nRecv
Definition: GMPBase.py:284
GaudiMP.GMPBase.CollectHistograms._gmpc
_gmpc
Definition: GMPBase.py:227
GaudiMP.GMPBase.MiniWriter.output
output
Definition: GMPBase.py:127
GaudiMP.GMPBase.GMPComponent.log
log
Definition: GMPBase.py:466
GaudiMP.GMPBase.Subworker.Engine
def Engine(self)
Definition: GMPBase.py:838
GaudiMP.GMPBase.GMPComponent.cntr
cntr
Definition: GMPBase.py:664
GaudiMP.GMPBase.GMPComponent.StartGaudiPython
def StartGaudiPython(self)
Definition: GMPBase.py:558
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: details.h:169
GaudiMP.GMPBase.TESSerializer.evt
evt
Definition: GMPBase.py:363
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: details.h:98
GaudiMP.GMPBase.GMPComponent.Start
def Start(self)
Definition: GMPBase.py:485
GaudiPython.Pythonizations.items
items
Definition: Pythonizations.py:546
GaudiMP.GMPBase.GMPComponent.LoadTES
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:563