Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v36r16 (ea80daf8)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GMPBase.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 
13 import multiprocessing
14 import time
15 from multiprocessing import Event, JoinableQueue, Process, cpu_count, current_process
16 from multiprocessing.queues import Empty
17 
18 from GaudiMP.pTools import FileRecordsAgent, HistoAgent, Syncer
19 from ROOT import TBuffer, TBufferFile
20 
21 from GaudiPython import (
22  FAILURE,
23  SUCCESS,
24  AppMgr,
25  InterfaceCast,
26  PyAlgorithm,
27  gbl,
28  setOwnership,
29 )
30 
31 # This script contains the bases for the Gaudi MultiProcessing (GMP)
32 # classes
33 
34 # There are three classes :
35 # Reader
36 # Worker
37 # Writer
38 
39 # Each class needs to perform communication with the others
40 # For this, we need a means of communication, which will be based on
41 # the python multiprocessing package
42 # This is provided in SPI pytools package
43 # cmt line : use pytools v1.1 LCG_Interfaces
44 # The PYTHONPATH env variable may need to be modified, as this might
45 # still point to 1.0_python2.5
46 
47 # Each class will need Queues, and a defined method for using these
48 # queues.
49 # For example, as long as there is something in the Queue, both ends
50 # of the queue must be open
51 # Also, there needs to be proper Termination flags and criteria
52 # The System should be error proof.
53 
54 # Constants -------------------------------------------------------------------
55 NAP = 0.001
56 MB = 1024.0 * 1024.0
57 # waits to guard against hanging, in seconds
58 WAIT_INITIALISE = 60 * 5
59 WAIT_FIRST_EVENT = 60 * 3
60 WAIT_SINGLE_EVENT = 60 * 6
61 WAIT_FINALISE = 60 * 2
62 STEP_INITIALISE = 10
63 STEP_EVENT = 2
64 STEP_FINALISE = 10
65 
66 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
67 SMAPS = False
68 
69 # -----------------------------------------------------------------------------
70 
71 # definitions
72 # ----------
73 # used to convert stored histos (in AIDA format) to ROOT format
74 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
75 
76 # used to check which type of histo we are dealing with
77 # i.e. if currentHisto in aidatypes : pass
78 aidatypes = (
79  gbl.AIDA.IHistogram,
80  gbl.AIDA.IHistogram1D,
81  gbl.AIDA.IHistogram2D,
82  gbl.AIDA.IHistogram3D,
83  gbl.AIDA.IProfile1D,
84  gbl.AIDA.IProfile2D,
85  gbl.AIDA.IBaseHistogram,
86 ) # extra?
87 
88 # similar to aidatypes
89 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
90 
91 # Types of OutputStream in Gaudi
92 WRITERTYPES = {
93  "EvtCollectionStream": "tuples",
94  "InputCopyStream": "events",
95  "OutputStream": "events",
96  "RecordStream": "records",
97  "RunRecordStream": "records",
98  "SequentialOutputStream": "events",
99  "TagCollectionStream": "tuples",
100 }
101 
102 # =============================================================================
103 
104 
105 class MiniWriter(object):
106  """
107  A class to represent a writer in the GaudiPython configuration
108  It can be non-trivial to access the name of the output file; it may be
109  specified in the DataSvc, or just on the writer, may be a list, or string
110  Also, there are three different types of writer (events, records, tuples)
111  so this bootstrap class provides easy access to this info while configuring
112  """
113 
114  def __init__(self, writer, wType, config):
115  self.w = writer
116  self.wType = wType
117  # set parameters for directly accessing the correct
118  # part of the configuration, so that later, we can do
119  # config[ key ].Output = modified(output)
120  self.key = None
121  self.output = None
122  self.ItemList = None
123  self.OptItemList = None
124  #
125  self.wName = writer.getName()
126  # Now process the Writer, find where the output is named
127  self.woutput = None
128  self.datasvcName = None
129  self.svcOutput = None
130  if hasattr(self.w, "Output"):
131  self.woutput = self.w.Output
132  self.getItemLists(config)
133  self.set(self.wName, self.w.Output)
134  return
135  else:
136  # if there is no output file, get it via the datasvc
137  # (every writer always has a datasvc property)
138  self.datasvcName = self.w.EvtDataSvc
139  datasvc = config[self.datasvcName]
140  if hasattr(datasvc, "Output"):
141  self.getItemLists(config)
142  self.set(self.datasvcName, datasvc.Output)
143  return
144 
145  def getNewName(self, replaceThis, withThis, extra=""):
146  # replace one pattern in the output name string
147  # with another, and return the Output name
148  # It *might* be in a list, so check for this
149  #
150  # @param extra : might need to add ROOT flags
151  # i.e.: OPT='RECREATE', or such
152  assert replaceThis.__class__.__name__ == "str"
153  assert withThis.__class__.__name__ == "str"
154  old = self.output
155  lst = False
156  if old.__class__.__name__ == "list":
157  old = self.output[0]
158  lst = True
159  new = old.replace(replaceThis, withThis)
160  new += extra
161  if lst:
162  return [new]
163  else:
164  return new
165 
166  def getItemLists(self, config):
167  # the item list
168  if hasattr(self.w, "ItemList"):
169  self.ItemList = self.w.ItemList
170  else:
171  datasvc = config[self.w.EvtDataSvc]
172  if hasattr(datasvc, "ItemList"):
173  self.ItemList = datasvc.ItemList
174  # The option item list; possibly not a valid variable
175  if hasattr(self.w, "OptItemList"):
176  self.OptItemList = self.w.OptItemList
177  else:
178  datasvc = config[self.w.EvtDataSvc]
179  if hasattr(datasvc, "OptItemList"):
180  self.OptItemList = datasvc.OptItemList
181  return
182 
183  def set(self, key, output):
184  self.key = key
185  self.output = output
186  return
187 
188  def __repr__(self):
189  s = ""
190  line = "-" * 80
191  s += line + "\n"
192  s += "Writer : %s\n" % (self.wName)
193  s += "Writer Type : %s\n" % (self.wType)
194  s += "Writer Output : %s\n" % (self.output)
195  s += "DataSvc : %s\n" % (self.datasvcName)
196  s += "DataSvc Output : %s\n" % (self.svcOutput)
197  s += "\n"
198  s += "Key for config : %s\n" % (self.key)
199  s += "Output File : %s\n" % (self.output)
200  s += "ItemList : %s\n" % (self.ItemList)
201  s += "OptItemList : %s\n" % (self.OptItemList)
202  s += line + "\n"
203  return s
204 
205 
206 # =============================================================================
207 
208 
210  """
211  GaudiPython algorithm used to clean up histos on the Reader and Workers
212  Only has a finalize method()
213  This retrieves a dictionary of path:histo objects and sends it to the
214  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
215  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
216  at the writer end, there will be an error.
217  """
218 
219  def __init__(self, gmpcomponent):
220  PyAlgorithm.__init__(self)
221  self._gmpc = gmpcomponent
222  self.log = self._gmpc.log
223  return None
224 
225  def execute(self):
226  return SUCCESS
227 
228  def finalize(self):
229  self.log.info("CollectHistograms Finalise (%s)" % (self._gmpc.nodeType))
230  self._gmpc.hDict = self._gmpc.dumpHistograms()
231  ks = self._gmpc.hDict.keys()
232  self.log.info("%i Objects in Histogram Store" % (len(ks)))
233 
234  # crashes occurred due to Memory Error during the sending of hundreds
235  # histos on slc5 machines, so instead, break into chunks
236  # send 100 at a time
237  chunk = 100
238  reps = len(ks) / chunk + 1
239  for i in range(reps):
240  someKeys = ks[i * chunk : (i + 1) * chunk]
241  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
242  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
243  # "finished" Notification
244  self.log.debug("Signalling end of histos to Writer")
245  self._gmpc.hq.put("HISTOS_SENT")
246  self.log.debug("Waiting on Sync Event")
247  self._gmpc.sEvent.wait()
248  self.log.debug("Histo Sync Event set, clearing and returning")
249  self._gmpc.hvt.clearStore()
250  root = gbl.DataObject()
251  setOwnership(root, False)
252  self._gmpc.hvt.setRoot("/stat", root)
253  return SUCCESS
254 
255 
256 # =============================================================================
257 
258 
259 class EventCommunicator(object):
260  # This class is responsible for communicating Gaudi Events via Queues
261  # Events are communicated as TBufferFiles, filled either by the
262  # TESSerializer, or the GaudiSvc, "IPCSvc"
263  def __init__(self, GMPComponent, qin, qout):
264  self._gmpc = GMPComponent
265  self.log = self._gmpc.log
266  # maximum capacity of Queues
267  self.maxsize = 50
268  # central variables : Queues
269  self.qin = qin
270  self.qout = qout
271 
272  # flags
273  self.allsent = False
274  self.allrecv = False
275 
276  # statistics storage
277  self.nSent = 0 # counter : items sent
278  self.nRecv = 0 # counter : items received
279  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
280  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
281  self.qinTime = 0 # time : receiving from self.qin
282  self.qoutTime = 0 # time : sending on qout
283 
284  def send(self, item):
285  # This class manages the sending of a TBufferFile Event to a Queue
286  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
287  assert item.__class__.__name__ == "tuple"
288  startTransmission = time.time()
289  self.qout.put(item)
290  # allow the background thread to feed the Queue; not 100% guaranteed to
291  # finish before next line
292  while self.qout._buffer:
293  time.sleep(NAP)
294  self.qoutTime += time.time() - startTransmission
295  self.sizeSent += item[1].Length()
296  self.nSent += 1
297  return SUCCESS
298 
299  def receive(self, timeout=None):
300  # Receive items from self.qin
301  startWait = time.time()
302  try:
303  itemIn = self.qin.get(timeout=timeout)
304  except Empty:
305  return None
306  self.qinTime += time.time() - startWait
307  self.nRecv += 1
308  if itemIn.__class__.__name__ == "tuple":
309  self.sizeRecv += itemIn[1].Length()
310  else:
311  self.nRecv -= 1
312  try:
313  self.qin.task_done()
314  except Exception:
315  self._gmpc.log.warning(
316  "TASK_DONE called too often by : %s" % (self._gmpc.nodeType)
317  )
318  return itemIn
319 
320  def finalize(self):
321  self.log.info(
322  "Finalize Event Communicator : %s %s" % (self._gmpc, self._gmpc.nodeType)
323  )
324  # Reader sends one flag for each worker
325  # Workers send one flag each
326  # Writer sends nothing (it's at the end of the chain)
327  if self._gmpc.nodeType == "Reader":
328  downstream = self._gmpc.nWorkers
329  elif self._gmpc.nodeType == "Writer":
330  downstream = 0
331  elif self._gmpc.nodeType == "Worker":
332  downstream = 1
333 
334  for i in range(downstream):
335  self.qout.put("FINISHED")
336  if self._gmpc.nodeType != "Writer":
337  self.qout.join()
338  # Now some reporting...
339  self.statistics()
340 
341  def statistics(self):
342  self.log.name = "%s-%i Audit " % (self._gmpc.nodeType, self._gmpc.nodeID)
343  self.log.info("Items Sent : %i" % (self.nSent))
344  self.log.info("Items Received : %i" % (self.nRecv))
345  self.log.info("Data Sent : %i" % (self.sizeSent))
346  self.log.info("Data Received : %i" % (self.sizeRecv))
347  self.log.info("Q-out Time : %5.2f" % (self.qoutTime))
348  self.log.info("Q-in Time : %5.2f" % (self.qinTime))
349 
350 
351 # =============================================================================
352 
353 
354 class TESSerializer(object):
355  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
356  self.T = gaudiTESSerializer
357  self.evt = evtDataSvc
358  self.buffersIn = []
359  self.buffersOut = []
360  self.nIn = 0
361  self.nOut = 0
362  self.tDump = 0.0
363  self.tLoad = 0.0
364  # logging
365  self.nodeType = nodeType
366  self.nodeID = nodeID
367  self.log = log
368 
369  def Load(self, tbuf):
370  root = gbl.DataObject()
371  setOwnership(root, False)
372  self.evt.setRoot("/Event", root)
373  t = time.time()
374  self.T.loadBuffer(tbuf)
375  self.tLoad += time.time() - t
376  self.nIn += 1
377  self.buffersIn.append(tbuf.Length())
378 
379  def Dump(self):
380  t = time.time()
381  tb = TBufferFile(TBuffer.kWrite)
382  self.T.dumpBuffer(tb)
383  self.tDump += time.time() - t
384  self.nOut += 1
385  self.buffersOut.append(tb.Length())
386  return tb
387 
388  def Report(self):
389  evIn = "Events Loaded : %i" % (self.nIn)
390  evOut = "Events Dumped : %i" % (self.nOut)
391  din = sum(self.buffersIn)
392  dataIn = "Data Loaded : %i" % (din)
393  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
394  if self.nIn:
395  avgIn = "Avg Buf Loaded : %5.2f Mb" % (din / (self.nIn * MB))
396  maxIn = "Max Buf Loaded : %5.2f Mb" % (max(self.buffersIn) / MB)
397  else:
398  avgIn = "Avg Buf Loaded : N/A"
399  maxIn = "Max Buf Loaded : N/A"
400  dout = sum(self.buffersOut)
401  dataOut = "Data Dumped : %i" % (dout)
402  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
403  if self.nOut:
404  avgOut = "Avg Buf Dumped : %5.2f Mb" % (din / (self.nOut * MB))
405  maxOut = "Max Buf Dumped : %5.2f Mb" % (max(self.buffersOut) / MB)
406  else:
407  avgOut = "Avg Buf Dumped : N/A"
408  maxOut = "Max Buf Dumped : N/A"
409  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
410  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
411 
412  lines = (
413  evIn,
414  evOut,
415  dataIn,
416  dataInMb,
417  avgIn,
418  maxIn,
419  dataOut,
420  dataOutMb,
421  avgOut,
422  maxOut,
423  dumpTime,
424  loadTime,
425  )
426  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
427  for line in lines:
428  self.log.info(line)
429  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
430 
431 
432 # =============================================================================
433 
434 
435 class GMPComponent(object):
436  # This class will be the template for Reader, Worker and Writer
437  # containing all common components
438  # nodeId will be a numerical identifier for the node
439  # -1 for reader
440  # -2 for writer
441  # 0,...,nWorkers-1 for the Workers
442  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
443  # declare a Gaudi MultiProcessing Node
444  # the nodeType is going to be one of Reader, Worker, Writer
445  # qPair is going to be a tuple of ( qin, qout )
446  # for sending and receiving
447  # if nodeType is "Writer", it will be a list of qPairs,
448  # as there's one queue-in from each Worker
449  #
450  # params is a tuple of (nWorkers, config, log)
451 
452  self.nodeType = nodeType
453  current_process().name = nodeType
454 
455  # Synchronisation Event() objects for keeping track of the system
456  self.initEvent, eventLoopSyncer, self.finalEvent = events
457  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
458 
459  # necessary for knowledge of the system
460  self.nWorkers, self.sEvent, self.config, self.log = params
461  self.subworkers = subworkers
462  self.nodeID = nodeID
463  self.msgFormat = self.config["MessageSvc"].Format
464 
465  # describe the state of the node by the current Event Number
466  self.currentEvent = None
467 
468  # Unpack the Queues : (events, histos, filerecords)
469  self.queues = queues
470  self.num = 0
471 
472  ks = self.config.keys()
473  self.app = None
474  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
475  for k in list:
476  if k in ks:
477  self.app = k
478 
479  def Start(self):
480  # define the separate process
481  qPair, histq, fq = self.queues
482 
483  # Set up the Queue Mechanisms ( Event Communicators )
484  if self.nodeType == "Reader" or self.nodeType == "Worker":
485  # Reader or Worker Node
486  qin, qout = qPair
487  self.evcom = EventCommunicator(self, qin, qout)
488  else:
489  # Writer : many queues in, no queue out
490  assert self.nodeType == "Writer"
491  self.evcoms = []
492  qsin = qPair[0]
493  for q in qsin:
494  ec = EventCommunicator(self, q, None)
495  self.evcoms.append(ec)
496  # Histogram Queue
497  self.hq = histq
498  # FileRecords Queue
499  self.fq = fq
500 
501  # Universal Counters (available to all nodes)
502  # Use sensibly!!!
503  self.nIn = 0
504  self.nOut = 0
505 
506  # Status Flag (possibly remove later)
507  self.stat = SUCCESS
508 
509  # Set logger name
510  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
511 
512  # Heuristic variables
513  # time for init, run, final, firstEventTime, totalTime
514  self.iTime = 0.0
515  self.rTime = 0.0
516  self.fTime = 0.0
517  self.firstEvTime = 0.0
518  self.tTime = 0.0
519 
520  self.proc = Process(target=self.Engine)
521  # Fork and start the separate process
522  self.proc.start()
523 
524  def Engine(self):
525  # This will be the method carried out by the Node
526  # Different for all
527  pass
528 
530  # Different for all ; customize Configuration for multicore
531  pass
532 
533  def SetupGaudiPython(self):
534  # This method will initialize the GaudiPython Tools
535  # such as the AppMgr and so on
536  self.a = AppMgr()
537  if SMAPS:
538  from AlgSmapShot import SmapShot
539 
540  smapsLog = self.nodeType + "-" + str(self.nodeID) + ".smp"
541  ss = SmapShot(logname=smapsLog)
542  self.a.addAlgorithm(ss)
543  self.evt = self.a.evtsvc()
544  self.hvt = self.a.histsvc()
545  self.fsr = self.a.filerecordsvc()
546  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
547  self.pers = self.a.service("EventPersistencySvc", "IAddressCreator")
548  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
549  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
550  return SUCCESS
551 
552  def StartGaudiPython(self):
553  self.a.initialize()
554  self.a.start()
555  return SUCCESS
556 
557  def LoadTES(self, tbufferfile):
558  root = gbl.DataObject()
559  setOwnership(root, False)
560  self.evt.setRoot("/Event", root)
561  self.ts.loadBuffer(tbufferfile)
562 
563  def getEventNumber(self):
564  if self.app != "Gauss":
565  # Using getList or getHistoNames can result in the EventSelector
566  # re-initialising connection to RootDBase, which costs a lot of
567  # time... try to build a set of Header paths??
568 
569  # First Attempt : Unpacked Event Data
570  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
571  for l in lst:
572  path = l
573  try:
574  n = self.evt[path].evtNumber()
575 
576  return n
577  except Exception:
578  # No evt number at this path
579  continue
580 
581  # second attepmt : try DAQ/RawEvent data
582  # The Evt Number is in bank type 16, bank 0, data pt 4
583  try:
584  n = self.evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
585 
586  return n
587  except Exception:
588  pass
589 
590  # Default Action
591  if self.nIn > 0 or self.nOut > 0:
592  pass
593  else:
594  self.log.warning("Could not determine Event Number")
595  return -1
596  else:
597  if self.nodeID == -1:
598  self.num = self.num + 1
599 
600  return self.num
601 
602  def IdentifyWriters(self):
603  #
604  # Identify Writers in the Configuration
605  #
606  d = {}
607  keys = ["events", "records", "tuples", "histos"]
608  for k in keys:
609  d[k] = []
610 
611  # Identify Writers and Classify
612  wkeys = WRITERTYPES.keys()
613  for v in self.config.values():
614  if v.__class__.__name__ in wkeys:
615  writerType = WRITERTYPES[v.__class__.__name__]
616  d[writerType].append(MiniWriter(v, writerType, self.config))
617  if self.nodeID == 0:
618  self.log.info("Writer Found : %s" % (v.name()))
619 
620  # Now Check for the Histogram Service
621  if "HistogramPersistencySvc" in self.config.keys():
622  hfile = self.config["HistogramPersistencySvc"].getProp("OutputFile")
623  d["histos"].append(hfile)
624  return d
625 
626  def dumpHistograms(self):
627  """
628  Method used by the GaudiPython algorithm CollectHistos
629  to obtain a dictionary of form { path : object }
630  representing the Histogram Store
631  """
632  nlist = self.hvt.getHistoNames()
633  histDict = {}
634  objects = 0
635  histos = 0
636  if nlist:
637  for n in nlist:
638  o = self.hvt[n]
639  if type(o) in aidatypes:
640  o = aida2root(o)
641  histos += 1
642  else:
643  objects += 1
644  histDict[n] = o
645  else:
646  print("WARNING : no histograms to recover?")
647  return histDict
648 
649  def Initialize(self):
650  start = time.time()
651  self.processConfiguration()
652  self.SetupGaudiPython()
653  self.initEvent.set()
654  self.StartGaudiPython()
655 
656  if self.app == "Gauss":
657 
658  tool = self.a.tool("ToolSvc.EvtCounter")
659  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
660  else:
661  self.cntr = None
662 
663  self.iTime = time.time() - start
664 
665  def Finalize(self):
666  start = time.time()
667  self.a.stop()
668  self.a.finalize()
669  self.log.info("%s-%i Finalized" % (self.nodeType, self.nodeID))
670  self.finalEvent.set()
671  self.fTime = time.time() - start
672 
673  def Report(self):
674  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
675  allTime = "Alive Time : %5.2f" % (self.tTime)
676  initTime = "Init Time : %5.2f" % (self.iTime)
677  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
678  runTime = "Run Time : %5.2f" % (self.rTime)
679  finTime = "Finalise Time : %5.2f" % (self.fTime)
680  tup = (allTime, initTime, frstTime, runTime, finTime)
681  for t in tup:
682  self.log.info(t)
683  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
684  # and report from the TESSerializer
685  self.TS.Report()
686 
687 
688 # =============================================================================
689 
690 
692  def __init__(self, queues, events, params, subworkers):
693  GMPComponent.__init__(self, "Reader", -1, queues, events, params, subworkers)
694 
696  # Reader :
697  # No algorithms
698  # No output
699  # No histos
700  self.config["ApplicationMgr"].TopAlg = []
701  self.config["ApplicationMgr"].OutStream = []
702  if "HistogramPersistencySvc" in self.config.keys():
703  self.config["HistogramPersistencySvc"].OutputFile = ""
704  self.config["MessageSvc"].Format = "%-13s " % "[Reader]" + self.msgFormat
705  self.evtMax = self.config["ApplicationMgr"].EvtMax
706 
707  def DumpEvent(self):
708  tb = TBufferFile(TBuffer.kWrite)
709  # print '----Reader dumping Buffer!!!'
710  self.ts.dumpBuffer(tb)
711  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
712  return tb
713 
714  def DoFirstEvent(self):
715  # Do First Event ------------------------------------------------------
716  # Check Termination Criteria
717  startFirst = time.time()
718  self.log.info("Reader : First Event")
719  if self.nOut == self.evtMax:
720  self.log.info("evtMax( %i ) reached" % (self.evtMax))
721  self.lastEvent.set()
722  return SUCCESS
723  else:
724  # Continue to read, dump and send event
725  self.a.run(1)
726  if not bool(self.evt["/Event"]):
727  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
728  self.lastEvent.set()
729  return SUCCESS
730  else:
731  # Popluate TESSerializer list and send Event
732  if self.app == "Gauss":
733  lst = self.evt.getHistoNames()
734  else:
735  try:
736  lst = self.evt.getList()
737  if self.app == "DaVinci":
738  daqnode = self.evt.retrieveObject("/Event/DAQ").registry()
739  setOwnership(daqnode, False)
740  self.evt.getList(daqnode, lst, daqnode.address().par())
741  except Exception:
742  self.log.critical("Reader could not acquire TES List!")
743  self.lastEvent.set()
744  return FAILURE
745  self.log.info("Reader : TES List : %i items" % (len(lst)))
746  for l in lst:
747  self.ts.addItem(l)
749  tb = self.TS.Dump()
750  self.log.info("First Event Sent")
751  self.evcom.send((self.currentEvent, tb))
752  self.nOut += 1
753  self.eventLoopSyncer.set()
754  self.evt.clearStore()
755  self.firstEvTime = time.time() - startFirst
756  return SUCCESS
757 
758  def Engine(self):
759  # rename process
760  import ctypes
761 
762  libc = ctypes.CDLL("libc.so.6")
763  name = str(self.nodeType) + str(self.nodeID) + "\0"
764  libc.prctl(15, name, 0, 0, 0)
765 
766  startEngine = time.time()
767  self.log.name = "Reader"
768  self.log.info("Reader Process starting")
769 
770  self.Initialize()
771 
772  # add the Histogram Collection Algorithm
773  self.a.addAlgorithm(CollectHistograms(self))
774 
775  self.log.info("Reader Beginning Distribution")
776  sc = self.DoFirstEvent()
777  if sc.isSuccess():
778  self.log.info("Reader First Event OK")
779  else:
780  self.log.critical("Reader Failed on First Event")
781  self.stat = FAILURE
782 
783  # Do All Others -------------------------------------------------------
784  while True:
785  # Check Termination Criteria
786  if self.nOut == self.evtMax:
787  self.log.info("evtMax( %i ) reached" % (self.evtMax))
788  break
789  # Check Health
790  if not self.stat.isSuccess():
791  self.log.critical("Reader is Damaged!")
792  break
793  # Continue to read, dump and send event
794  t = time.time()
795  self.a.run(1)
796  self.rTime += time.time() - t
797  if not bool(self.evt["/Event"]):
798  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
799  break
800  self.currentEvent = self.getEventNumber()
801  tb = self.TS.Dump()
802  self.evcom.send((self.currentEvent, tb))
803  # clean up
804  self.nOut += 1
805  self.eventLoopSyncer.set()
806  self.evt.clearStore()
807  self.log.info("Setting <Last> Event")
808  self.lastEvent.set()
809 
810  # Finalize
811  self.log.info("Reader : Event Distribution complete.")
812  self.evcom.finalize()
813  self.Finalize()
814  self.tTime = time.time() - startEngine
815  self.Report()
816 
817 
818 # =============================================================================
819 
820 
822  def __init__(self, workerID, queues, events, params, subworkers):
823  GMPComponent.__init__(
824  self, "Worker", workerID, queues, events, params, subworkers
825  )
826  # Identify the writer streams
828  # Identify the accept/veto checks for each event
829  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
830  self.log.info("Subworker-%i Created OK" % (self.nodeID))
831  self.eventOutput = True
832 
833  def Engine(self):
834  # rename process
835  import ctypes
836 
837  libc = ctypes.CDLL("libc.so.6")
838  name = str(self.nodeType) + str(self.nodeID) + "\0"
839  libc.prctl(15, name, 0, 0, 0)
840 
841  self.initEvent.set()
842  startEngine = time.time()
843  msg = self.a.service("MessageSvc")
844  msg.Format = "%-13s " % ("[" + self.log.name + "]") + self.msgFormat
845 
846  self.log.name = "Worker-%i" % (self.nodeID)
847  self.log.info("Subworker %i starting Engine" % (self.nodeID))
849 
850  # populate the TESSerializer itemlist
851  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
852 
853  self.a.addAlgorithm(CollectHistograms(self))
854 
855  # Begin processing
856  Go = True
857  while Go:
858  packet = self.evcom.receive()
859  if packet:
860  pass
861  else:
862  continue
863  if packet == "FINISHED":
864  break
865  evtNumber, tbin = packet # unpack
866  if self.cntr is not None:
867 
868  self.cntr.setEventCounter(evtNumber)
869 
870  self.nIn += 1
871  self.TS.Load(tbin)
872 
873  t = time.time()
874  sc = self.a.executeEvent()
875  if self.nIn == 1:
876  self.firstEvTime = time.time() - t
877  else:
878  self.rTime += time.time() - t
879  if sc.isSuccess():
880  pass
881  else:
882  self.log.name = "Worker-%i" % (self.nodeID)
883  self.log.warning("Did not Execute Event")
884  self.evt.clearStore()
885  continue
886  if self.isEventPassed():
887  pass
888  else:
889  self.log.name = "Worker-%i" % (self.nodeID)
890  self.log.warning("Event did not pass : %i" % (evtNumber))
891  self.evt.clearStore()
892  continue
893  if self.eventOutput:
894  # It may be the case of generating Event Tags; hence
895  # no event output
897  tb = self.TS.Dump()
898  self.evcom.send((self.currentEvent, tb))
899  self.nOut += 1
900  self.inc.fireIncident(gbl.Incident("Subworker", "EndEvent"))
901  self.eventLoopSyncer.set()
902  self.evt.clearStore()
903  self.log.name = "Worker-%i" % (self.nodeID)
904  self.log.info("Setting <Last> Event %s" % (self.nodeID))
905  self.lastEvent.set()
906 
907  self.evcom.finalize()
908  # Now send the FileRecords and stop/finalize the appMgr
909  self.filerecordsAgent.SendFileRecords()
910  self.tTime = time.time() - startEngine
911  self.Finalize()
912  self.Report()
913  # self.finalEvent.set()
914 
915  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
916  self.a = a
917  self.evt = evt
918  self.hvt = hvt
919  self.fsr = fsr
920  # self.inc = inc
921  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
922  self.pers = pers
923  self.ts = ts
924  self.cntr = cntr
925  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
926 
927  def getCheckAlgs(self):
928  """
929  For some output writers, a check is performed to see if the event has
930  executed certain algorithms.
931  These reside in the AcceptAlgs property for those writers
932  """
933  acc = []
934  req = []
935  vet = []
936  for m in self.writerDict["events"]:
937  if hasattr(m.w, "AcceptAlgs"):
938  acc += m.w.AcceptAlgs
939  if hasattr(m.w, "RequireAlgs"):
940  req += m.w.RequireAlgs
941  if hasattr(m.w, "VetoAlgs"):
942  vet += m.w.VetoAlgs
943  return (acc, req, vet)
944 
945  def checkExecutedPassed(self, algName):
946  if (
947  self.a.algorithm(algName)._ialg.isExecuted()
948  and self.a.algorithm(algName)._ialg.filterPassed()
949  ):
950  return True
951  else:
952  return False
953 
954  def isEventPassed(self):
955  """
956  Check the algorithm status for an event.
957  Depending on output writer settings, the event
958  may be declined based on various criteria.
959  This is a transcript of the check that occurs in GaudiSvc::OutputStream
960  """
961  passed = False
962 
963  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
964  if self.acceptAlgs:
965  for name in self.acceptAlgs:
966  if self.checkExecutedPassed(name):
967  passed = True
968  break
969  else:
970  passed = True
971 
972  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
973  for name in self.requireAlgs:
974  if self.checkExecutedPassed(name):
975  pass
976  else:
977  self.log.info("Evt declined (requireAlgs) : %s" % (name))
978  passed = False
979 
980  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
981  for name in self.vetoAlgs:
982  if self.checkExecutedPassed(name):
983  pass
984  else:
985  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
986  passed = False
987  return passed
988 
989 
990 # =============================================================================
991 
992 
994  def __init__(self, workerID, queues, events, params, subworkers):
995  GMPComponent.__init__(
996  self, "Worker", workerID, queues, events, params, subworkers
997  )
998  # Identify the writer streams
1000  # Identify the accept/veto checks for each event
1001  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
1002  self.log.name = "Worker-%i" % (self.nodeID)
1003  self.log.info("Worker-%i Created OK" % (self.nodeID))
1004  self.eventOutput = True
1005 
1007 
1008  # Worker :
1009  # No input
1010  # No output
1011  # No Histos
1012  self.config["EventSelector"].Input = []
1013  self.config["ApplicationMgr"].OutStream = []
1014  if "HistogramPersistencySvc" in self.config.keys():
1015  self.config["HistogramPersistencySvc"].OutputFile = ""
1016  formatHead = "[Worker-%i]" % (self.nodeID)
1017  self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1018 
1019  for key, lst in self.writerDict.iteritems():
1020  self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1021 
1022  for m in self.writerDict["tuples"]:
1023  # rename Tuple output file with an appendix
1024  # based on worker id, for merging later
1025  newName = m.getNewName(".", ".w%i." % (self.nodeID))
1026  self.config[m.key].Output = newName
1027 
1028  # Suppress INFO Output for all but Worker-0
1029  # if self.nodeID == 0 :
1030  # pass
1031  # else :
1032  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1033 
1034  if self.app == "Gauss":
1035  try:
1036  if "ToolSvc.EvtCounter" not in self.config:
1037  from Configurables import EvtCounter
1038 
1039  counter = EvtCounter()
1040  else:
1041  counter = self.config["ToolSvc.EvtCounter"]
1042  counter.UseIncident = False
1043  except Exception:
1044  # ignore errors when trying to change the configuration of the EvtCounter
1045  self.log.warning("Cannot configure EvtCounter")
1046 
1047  def Engine(self):
1048 
1049  # rename process
1050  import ctypes
1051 
1052  libc = ctypes.CDLL("libc.so.6")
1053  name = str(self.nodeType) + str(self.nodeID) + "\0"
1054  libc.prctl(15, name, 0, 0, 0)
1055 
1056  startEngine = time.time()
1057  self.log.info("Worker %i starting Engine" % (self.nodeID))
1058  self.Initialize()
1060 
1061  # populate the TESSerializer itemlist
1062  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1063 
1064  nEventWriters = len(self.writerDict["events"])
1065  if nEventWriters:
1066  itemList = set()
1067  optItemList = set()
1068  for m in self.writerDict["events"]:
1069  for item in m.ItemList:
1070  hsh = item.find("#")
1071  if hsh != -1:
1072  item = item[:hsh]
1073  itemList.add(item)
1074  for item in m.OptItemList:
1075  hsh = item.find("#")
1076  if hsh != -1:
1077  item = item[:hsh]
1078  optItemList.add(item)
1079  # If an item is mandatory and optional, keep it only in the optional list
1080  itemList -= optItemList
1081  for item in sorted(itemList):
1082  self.log.info(" adding ItemList Item to ts : %s" % (item))
1083  self.ts.addItem(item)
1084  for item in sorted(optItemList):
1085  self.log.info(" adding Optional Item to ts : %s" % (item))
1086  self.ts.addOptItem(item)
1087  else:
1088  self.log.info("There is no Event Output for this app")
1089  self.eventOutput = False
1090 
1091  # Begin processing
1092  Go = True
1093  while Go:
1094  packet = self.evcom.receive()
1095  if packet:
1096  pass
1097  else:
1098  continue
1099  if packet == "FINISHED":
1100  break
1101  evtNumber, tbin = packet # unpack
1102  if self.cntr is not None:
1103  self.cntr.setEventCounter(evtNumber)
1104 
1105  # subworkers are forked before the first event is processed
1106  if self.nIn == 0:
1107  self.log.info("Fork new subworkers")
1108 
1109  # Fork subworkers and share services
1110  for k in self.subworkers:
1111  k.SetServices(
1112  self.a,
1113  self.evt,
1114  self.hvt,
1115  self.fsr,
1116  self.inc,
1117  self.pers,
1118  self.ts,
1119  self.cntr,
1120  )
1121  k.Start()
1122  self.a.addAlgorithm(CollectHistograms(self))
1123  self.nIn += 1
1124  self.TS.Load(tbin)
1125 
1126  t = time.time()
1127  sc = self.a.executeEvent()
1128  if self.nIn == 1:
1129  self.firstEvTime = time.time() - t
1130  else:
1131  self.rTime += time.time() - t
1132  if sc.isSuccess():
1133  pass
1134  else:
1135  self.log.warning("Did not Execute Event")
1136  self.evt.clearStore()
1137  continue
1138  if self.isEventPassed():
1139  pass
1140  else:
1141  self.log.warning("Event did not pass : %i" % (evtNumber))
1142  self.evt.clearStore()
1143  continue
1144  if self.eventOutput:
1145  # It may be the case of generating Event Tags; hence
1146  # no event output
1148  tb = self.TS.Dump()
1149  self.evcom.send((self.currentEvent, tb))
1150  self.nOut += 1
1151  self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1152  self.eventLoopSyncer.set()
1153  self.evt.clearStore()
1154  self.log.info("Setting <Last> Event")
1155  self.lastEvent.set()
1156 
1157  self.evcom.finalize()
1158  self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1159  # Now send the FileRecords and stop/finalize the appMgr
1160  self.filerecordsAgent.SendFileRecords()
1161  self.Finalize()
1162  self.tTime = time.time() - startEngine
1163  self.Report()
1164 
1165  for k in self.subworkers:
1166  self.log.info("Join subworkers")
1167  k.proc.join()
1168 
1169  def getCheckAlgs(self):
1170  """
1171  For some output writers, a check is performed to see if the event has
1172  executed certain algorithms.
1173  These reside in the AcceptAlgs property for those writers
1174  """
1175  acc = []
1176  req = []
1177  vet = []
1178  for m in self.writerDict["events"]:
1179  if hasattr(m.w, "AcceptAlgs"):
1180  acc += m.w.AcceptAlgs
1181  if hasattr(m.w, "RequireAlgs"):
1182  req += m.w.RequireAlgs
1183  if hasattr(m.w, "VetoAlgs"):
1184  vet += m.w.VetoAlgs
1185  return (acc, req, vet)
1186 
1187  def checkExecutedPassed(self, algName):
1188  if (
1189  self.a.algorithm(algName)._ialg.isExecuted()
1190  and self.a.algorithm(algName)._ialg.filterPassed()
1191  ):
1192  return True
1193  else:
1194  return False
1195 
1196  def isEventPassed(self):
1197  """
1198  Check the algorithm status for an event.
1199  Depending on output writer settings, the event
1200  may be declined based on various criteria.
1201  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1202  """
1203  passed = False
1204 
1205  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1206  if self.acceptAlgs:
1207  for name in self.acceptAlgs:
1208  if self.checkExecutedPassed(name):
1209  passed = True
1210  break
1211  else:
1212  passed = True
1213 
1214  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1215  for name in self.requireAlgs:
1216  if self.checkExecutedPassed(name):
1217  pass
1218  else:
1219  self.log.info("Evt declined (requireAlgs) : %s" % (name))
1220  passed = False
1221 
1222  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1223  for name in self.vetoAlgs:
1224  if self.checkExecutedPassed(name):
1225  pass
1226  else:
1227  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1228  passed = False
1229  return passed
1230 
1231 
1232 # =============================================================================
1233 
1234 
1236  def __init__(self, queues, events, params, subworkers):
1237  GMPComponent.__init__(self, "Writer", -2, queues, events, params, subworkers)
1238  # Identify the writer streams
1240  # This keeps track of workers as they finish
1241  self.status = [False] * self.nWorkers
1242  self.log.name = "Writer--2"
1243 
1245  # Writer :
1246  # No input
1247  # No Algs
1248  self.config["ApplicationMgr"].TopAlg = []
1249  self.config["EventSelector"].Input = []
1250 
1251  self.config["MessageSvc"].Format = "%-13s " % "[Writer]" + self.msgFormat
1252 
1253  def Engine(self):
1254  # rename process
1255  import ctypes
1256 
1257  libc = ctypes.CDLL("libc.so.6")
1258  name = str(self.nodeType) + str(self.nodeID) + "\0"
1259  libc.prctl(15, name, 0, 0, 0)
1260 
1261  self.Initialize()
1262  self.histoAgent = HistoAgent(self)
1264 
1265  # Begin processing
1266  Go = True
1267  current = -1
1268  while Go:
1269  current = (current + 1) % self.nWorkers
1270  packet = self.evcoms[current].receive(timeout=0.01)
1271  if packet is None:
1272  continue
1273  if packet == "FINISHED":
1274  self.log.info("Writer got FINISHED flag : Worker %i" % (current))
1275 
1276  self.status[current] = True
1277  if all(self.status):
1278  self.log.info("FINISHED recd from all workers, break loop")
1279  break
1280  continue
1281  # otherwise, continue as normal
1282  self.nIn += 1 # update central count (maybe needed by FSR store)
1283  evtNumber, tbin = packet # unpack
1284  self.TS.Load(tbin)
1285  t = time.time()
1286  self.a.executeEvent()
1287  self.rTime += time.time() - t
1289  self.evt.clearStore()
1290  self.eventLoopSyncer.set()
1291  self.log.name = "Writer--2"
1292  self.log.info("Setting <Last> Event")
1293  self.lastEvent.set()
1294 
1295  # finalisation steps
1296  [e.finalize() for e in self.evcoms]
1297  # Now do Histograms
1298  sc = self.histoAgent.Receive()
1299  sc = self.histoAgent.RebuildHistoStore()
1300  if sc.isSuccess():
1301  self.log.info("Histo Store rebuilt ok")
1302  else:
1303  self.log.warning("Histo Store Error in Rebuild")
1304 
1305  # Now do FileRecords
1306  sc = self.filerecordsAgent.Receive()
1307  self.filerecordsAgent.Rebuild()
1308  self.Finalize()
1309  # self.rTime = time.time()-startEngine
1310  self.Report()
1311 
1312 
1313 # =============================================================================
1314 
1315 # =============================================================================
1316 
1317 
1318 class Coord(object):
1319  def __init__(self, nWorkers, config, log):
1320 
1321  self.log = log
1322  self.config = config
1323  # set up Logging
1324  self.log.name = "GaudiPython-Parallel-Logger"
1325  self.log.info("GaudiPython Parallel Process Co-ordinator beginning")
1326 
1327  if nWorkers == -1:
1328  # The user has requested all available cpus in the machine
1329  self.nWorkers = cpu_count()
1330  else:
1331  self.nWorkers = nWorkers
1332 
1333  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1334  self.hq = JoinableQueue() # for Histogram data
1335  self.fq = JoinableQueue() # for FileRecords data
1336 
1337  # Make a Syncer for Initalise, Run, and Finalise
1338  self.sInit = Syncer(
1339  self.nWorkers, self.log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1340  )
1341  self.sRun = Syncer(
1342  self.nWorkers,
1343  self.log,
1344  manyEvents=True,
1345  limit=WAIT_SINGLE_EVENT,
1346  step=STEP_EVENT,
1347  firstEvent=WAIT_FIRST_EVENT,
1348  )
1349  self.sFin = Syncer(
1350  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE
1351  )
1352  # and one final one for Histogram Transfer
1353  self.histSyncEvent = Event()
1354 
1355  # params are common to al subprocesses
1356  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1357 
1358  self.subworkers = []
1359  # Declare SubProcesses!
1360  for i in range(1, self.nWorkers):
1361  sub = Subworker(
1362  i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers
1363  )
1364  self.subworkers.append(sub)
1365  self.reader = Reader(
1366  self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers
1367  )
1368  self.workers = []
1369  wk = Worker(
1370  0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers
1371  )
1372  self.writer = Writer(
1373  self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers
1374  )
1375 
1376  self.system = []
1377  self.system.append(self.writer)
1378  self.system.append(wk)
1379  self.system.append(self.reader)
1380 
1381  def getSyncEvents(self, nodeID):
1382  init = self.sInit.d[nodeID].event
1383  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1384  fin = self.sFin.d[nodeID].event
1385  return (init, run, fin)
1386 
1387  def getQueues(self, nodeID):
1388  eventQ = self.qs[nodeID]
1389  histQ = self.hq
1390  fsrQ = self.fq
1391  return (eventQ, histQ, fsrQ)
1392 
1393  def Go(self):
1394 
1395  # Initialise
1396  self.log.name = "GaudiPython-Parallel-Logger"
1397  self.log.info("INITIALISING SYSTEM")
1398 
1399  # Start reader, writer and main worker
1400  for p in self.system:
1401  p.Start()
1402 
1403  sc = self.sInit.syncAll(step="Initialise")
1404  if sc == SUCCESS:
1405  pass
1406  else:
1407  self.Terminate()
1408  return FAILURE
1409 
1410  # Run
1411  self.log.name = "GaudiPython-Parallel-Logger"
1412  self.log.info("RUNNING SYSTEM")
1413  sc = self.sRun.syncAll(step="Run")
1414  if sc == SUCCESS:
1415  pass
1416  else:
1417  self.Terminate()
1418  return FAILURE
1419 
1420  # Finalise
1421  self.log.name = "GaudiPython-Parallel-Logger"
1422  self.log.info("FINALISING SYSTEM")
1423  sc = self.sFin.syncAll(step="Finalise")
1424  if sc == SUCCESS:
1425  pass
1426  else:
1427  self.Terminate()
1428  return FAILURE
1429 
1430  # if we've got this far, finally report SUCCESS
1431  self.log.info("Cleanly join all Processes")
1432  self.Stop()
1433  self.log.info("Report Total Success to Main.py")
1434  return SUCCESS
1435 
1436  def Terminate(self):
1437  # Brutally kill sub-processes
1438  children = multiprocessing.active_children()
1439  for i in children:
1440  i.terminate()
1441 
1442  # self.writer.proc.terminate()
1443  # [ w.proc.terminate() for w in self.workers]
1444  # self.reader.proc.terminate()
1445 
1446  def Stop(self):
1447  # procs should be joined in reverse order to launch
1448  self.system.reverse()
1449  for s in self.system:
1450  s.proc.join()
1451  return SUCCESS
1452 
1453  def SetupQueues(self):
1454  # This method will set up the network of Queues needed
1455  # N Queues = nWorkers + 1
1456  # Each Worker has a Queue in, and a Queue out
1457  # Reader has Queue out only
1458  # Writer has nWorkers Queues in
1459 
1460  # one queue from Reader-Workers
1461  rwk = JoinableQueue()
1462  # one queue from each worker to writer
1463  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1464  d = {}
1465  d[-1] = (None, rwk) # Reader
1466  d[-2] = (workersWriter, None) # Writer
1467  for i in range(self.nWorkers):
1468  d[i] = (rwk, workersWriter[i])
1469  return d
1470 
1471 
1472 # ============================= EOF ===========================================
GaudiMP.GMPBase.CollectHistograms
Definition: GMPBase.py:209
GaudiMP.GMPBase.Coord.workers
workers
Definition: GMPBase.py:1368
AlgSequencer.all
all
Definition: AlgSequencer.py:61
GaudiMP.GMPBase.GMPComponent.num
num
Definition: GMPBase.py:470
GaudiMP.GMPBase.TESSerializer.nOut
nOut
Definition: GMPBase.py:361
GaudiMP.GMPBase.TESSerializer.__init__
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:355
GaudiMP.GMPBase.Coord.fq
fq
Definition: GMPBase.py:1335
GaudiMP.GMPBase.EventCommunicator.receive
def receive(self, timeout=None)
Definition: GMPBase.py:299
GaudiMP.GMPBase.GMPComponent.nodeID
nodeID
Definition: GMPBase.py:462
GaudiMP.GMPBase.GMPComponent.__init__
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:442
GaudiMP.GMPBase.MiniWriter.OptItemList
OptItemList
Definition: GMPBase.py:123
GaudiMP.GMPBase.EventCommunicator.log
log
Definition: GMPBase.py:265
GaudiMP.GMPBase.GMPComponent.tTime
tTime
Definition: GMPBase.py:518
GaudiMP.GMPBase.Coord.SetupQueues
def SetupQueues(self)
Definition: GMPBase.py:1453
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:239
GaudiMP.GMPBase.GMPComponent.getEventNumber
def getEventNumber(self)
Definition: GMPBase.py:563
GaudiMP.GMPBase.Writer.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1244
GaudiMP.GMPBase.GMPComponent.IdentifyWriters
def IdentifyWriters(self)
Definition: GMPBase.py:602
GaudiMP.GMPBase.TESSerializer.tLoad
tLoad
Definition: GMPBase.py:363
GaudiPython::PyAlgorithm::execute
StatusCode execute() override
Definition: Algorithm.cpp:110
GaudiMP.GMPBase.EventCommunicator.send
def send(self, item)
Definition: GMPBase.py:284
GaudiMP.GMPBase.Worker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1059
GaudiMP.GMPBase.Coord.sRun
sRun
Definition: GMPBase.py:1341
GaudiMP.GMPBase.TESSerializer.buffersIn
buffersIn
Definition: GMPBase.py:358
GaudiMP.GMPBase.GMPComponent.TS
TS
Definition: GMPBase.py:549
GaudiMP.GMPBase.GMPComponent.stat
stat
Definition: GMPBase.py:507
GaudiMP.GMPBase.Subworker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:848
GaudiMP.GMPBase.GMPComponent.dumpHistograms
def dumpHistograms(self)
Definition: GMPBase.py:626
GaudiMP.GMPBase.EventCommunicator.sizeSent
sizeSent
Definition: GMPBase.py:279
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
GaudiMP.GMPBase.TESSerializer
Definition: GMPBase.py:354
GaudiMP.GMPBase.GMPComponent.nIn
nIn
Definition: GMPBase.py:503
GaudiMP.GMPBase.MiniWriter.wType
wType
Definition: GMPBase.py:116
GaudiMP.GMPBase.GMPComponent
Definition: GMPBase.py:435
GaudiMP.GMPBase.Worker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:1001
GaudiMP.GMPBase.Reader.evtMax
evtMax
Definition: GMPBase.py:705
GaudiMP.GMPBase.MiniWriter.ItemList
ItemList
Definition: GMPBase.py:122
GaudiMP.GMPBase.EventCommunicator.maxsize
maxsize
Definition: GMPBase.py:267
GaudiMP.GMPBase.EventCommunicator.statistics
def statistics(self)
Definition: GMPBase.py:341
GaudiMP.GMPBase.MiniWriter.getNewName
def getNewName(self, replaceThis, withThis, extra="")
Definition: GMPBase.py:145
GaudiMP.GMPBase.Worker.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1006
GaudiMP.GMPBase.Coord.qs
qs
Definition: GMPBase.py:1333
GaudiMP.GMPBase.GMPComponent.Initialize
def Initialize(self)
Definition: GMPBase.py:649
GaudiMP.GMPBase.Subworker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:927
GaudiMP.GMPBase.Coord.nWorkers
nWorkers
Definition: GMPBase.py:1329
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:869
GaudiMP.GMPBase.EventCommunicator.__init__
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:263
GaudiMP.GMPBase.GMPComponent.fTime
fTime
Definition: GMPBase.py:516
GaudiMP.GMPBase.Writer
Definition: GMPBase.py:1235
max
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:225
GaudiMP.GMPBase.Coord
Definition: GMPBase.py:1318
GaudiPython::PyAlgorithm
Definition: Algorithm.h:40
GaudiMP.GMPBase.Subworker
Definition: GMPBase.py:821
GaudiMP.GMPBase.GMPComponent.hq
hq
Definition: GMPBase.py:497
GaudiMP.GMPBase.Coord.hq
hq
Definition: GMPBase.py:1334
GaudiPython.Bindings.setOwnership
setOwnership
Definition: Bindings.py:135
GaudiMP.GMPBase.Reader.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:695
GaudiMP.GMPBase.Coord.Terminate
def Terminate(self)
Definition: GMPBase.py:1436
GaudiMP.GMPBase.Worker.Engine
def Engine(self)
Definition: GMPBase.py:1047
GaudiMP.GMPBase.Worker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:994
GaudiMP.GMPBase.Writer.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1236
IOTest.start
start
Definition: IOTest.py:108
GaudiMP.GMPBase.Subworker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:822
GaudiMP.GMPBase.Subworker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:829
GaudiMP.GMPBase.GMPComponent.pers
pers
Definition: GMPBase.py:547
GaudiMP.GMPBase.GMPComponent.hvt
hvt
Definition: GMPBase.py:544
GaudiMP.GMPBase.TESSerializer.T
T
Definition: GMPBase.py:356
GaudiMP.GMPBase.TESSerializer.log
log
Definition: GMPBase.py:367
GaudiMP.GMPBase.Worker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:1196
GaudiMP.GMPBase.GMPComponent.iTime
iTime
Definition: GMPBase.py:514
GaudiMP.GMPBase.TESSerializer.nodeID
nodeID
Definition: GMPBase.py:366
GaudiMP.GMPBase.GMPComponent.currentEvent
currentEvent
Definition: GMPBase.py:466
GaudiMP.GMPBase.GMPComponent.app
app
Definition: GMPBase.py:473
compareOutputFiles.par
par
Definition: compareOutputFiles.py:486
GaudiMP.GMPBase.Writer.histoAgent
histoAgent
Definition: GMPBase.py:1262
GaudiMP.GMPBase.GMPComponent.firstEvTime
firstEvTime
Definition: GMPBase.py:517
GaudiMP.GMPBase.EventCommunicator.qin
qin
Definition: GMPBase.py:269
GaudiMP.GMPBase.Reader.DumpEvent
def DumpEvent(self)
Definition: GMPBase.py:707
GaudiMP.GMPBase.TESSerializer.buffersOut
buffersOut
Definition: GMPBase.py:359
GaudiPluginService.cpluginsvc.registry
def registry()
Definition: cpluginsvc.py:84
GaudiMP.GMPBase.MiniWriter.woutput
woutput
Definition: GMPBase.py:127
GaudiMP.GMPBase.GMPComponent.proc
proc
Definition: GMPBase.py:520
GaudiMP.GMPBase.Coord.config
config
Definition: GMPBase.py:1322
bug_34121.tool
tool
Definition: bug_34121.py:17
GaudiMP.GMPBase.GMPComponent.inc
inc
Definition: GMPBase.py:546
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:444
GaudiMP.GMPBase.MiniWriter.getItemLists
def getItemLists(self, config)
Definition: GMPBase.py:166
GaudiMP.GMPBase.GMPComponent.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:529
GaudiMP.pTools.Syncer
Definition: pTools.py:629
GaudiMP.GMPBase.Reader.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:692
GaudiMP.GMPBase.MiniWriter.w
w
Definition: GMPBase.py:115
GaudiMP.GMPBase.Writer.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1263
GaudiMP.GMPBase.Coord.sFin
sFin
Definition: GMPBase.py:1349
GaudiMP.GMPBase.Coord.reader
reader
Definition: GMPBase.py:1365
GaudiMP.GMPBase.Subworker.eventOutput
eventOutput
Definition: GMPBase.py:831
GaudiMP.GMPBase.GMPComponent.lastEvent
lastEvent
Definition: GMPBase.py:457
GaudiMP.GMPBase.GMPComponent.nOut
nOut
Definition: GMPBase.py:504
GaudiMP.GMPBase.Worker.eventOutput
eventOutput
Definition: GMPBase.py:1004
GaudiMP.GMPBase.GMPComponent.finalEvent
finalEvent
Definition: GMPBase.py:456
GaudiMP.GMPBase.MiniWriter.key
key
Definition: GMPBase.py:120
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:584
GaudiMP.GMPBase.EventCommunicator.allrecv
allrecv
Definition: GMPBase.py:274
GaudiMP.GMPBase.EventCommunicator.qoutTime
qoutTime
Definition: GMPBase.py:282
GaudiMP.GMPBase.EventCommunicator.finalize
def finalize(self)
Definition: GMPBase.py:320
GaudiMP.GMPBase.CollectHistograms.__init__
def __init__(self, gmpcomponent)
Definition: GMPBase.py:219
GaudiMP.GMPBase.TESSerializer.Dump
def Dump(self)
Definition: GMPBase.py:379
GaudiMP.GMPBase.TESSerializer.tDump
tDump
Definition: GMPBase.py:362
GaudiMP.GMPBase.Worker.writerDict
writerDict
Definition: GMPBase.py:999
GaudiMP.GMPBase.TESSerializer.nIn
nIn
Definition: GMPBase.py:360
GaudiMP.GMPBase.EventCommunicator.qout
qout
Definition: GMPBase.py:270
GaudiMP.GMPBase.GMPComponent.Engine
def Engine(self)
Definition: GMPBase.py:524
GaudiMP.GMPBase.Coord.Stop
def Stop(self)
Definition: GMPBase.py:1446
GaudiMP.GMPBase.MiniWriter.set
def set(self, key, output)
Definition: GMPBase.py:183
GaudiMP.GMPBase.EventCommunicator.qinTime
qinTime
Definition: GMPBase.py:281
GaudiMP.GMPBase.Reader.DoFirstEvent
def DoFirstEvent(self)
Definition: GMPBase.py:714
GaudiMP.GMPBase.Coord.__init__
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1319
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.GMPBase.Writer.status
status
Definition: GMPBase.py:1241
GaudiMP.GMPBase.GMPComponent.evcoms
evcoms
Definition: GMPBase.py:491
GaudiMP.GMPBase.TESSerializer.Report
def Report(self)
Definition: GMPBase.py:388
GaudiMP.GMPBase.Worker
Definition: GMPBase.py:993
GaudiMP.GMPBase.GMPComponent.fq
fq
Definition: GMPBase.py:499
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:545
GaudiMP.GMPBase.TESSerializer.nodeType
nodeType
Definition: GMPBase.py:365
GaudiMP.GMPBase.MiniWriter.datasvcName
datasvcName
Definition: GMPBase.py:128
GaudiMP.GMPBase.GMPComponent.SetupGaudiPython
def SetupGaudiPython(self)
Definition: GMPBase.py:533
GaudiMP.GMPBase.EventCommunicator._gmpc
_gmpc
Definition: GMPBase.py:264
GaudiMP.GMPBase.GMPComponent.queues
queues
Definition: GMPBase.py:469
GaudiMP.GMPBase.Coord.histSyncEvent
histSyncEvent
Definition: GMPBase.py:1353
DataOnDemand.Dump
bool Dump
Definition: DataOnDemand.py:35
GaudiMP.GMPBase.Subworker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:954
GaudiMP.GMPBase.EventCommunicator.allsent
allsent
Definition: GMPBase.py:273
GaudiMP.GMPBase.MiniWriter.__init__
def __init__(self, writer, wType, config)
Definition: GMPBase.py:114
GaudiMP.GMPBase.Writer.Engine
def Engine(self)
Definition: GMPBase.py:1253
GaudiMP.GMPBase.GMPComponent.fsr
fsr
Definition: GMPBase.py:545
GaudiMP.GMPBase.EventCommunicator.nSent
nSent
Definition: GMPBase.py:277
gaudirun.type
type
Definition: gaudirun.py:162
GaudiMP.GMPBase.CollectHistograms.log
log
Definition: GMPBase.py:222
GaudiMP.GMPBase.aida2root
aida2root
Definition: GMPBase.py:74
GaudiMP.GMPBase.GMPComponent.ts
ts
Definition: GMPBase.py:548
GaudiMP.GMPBase.Coord.subworkers
subworkers
Definition: GMPBase.py:1358
GaudiMP.GMPBase.EventCommunicator
Definition: GMPBase.py:259
GaudiMP.GMPBase.Subworker.SetServices
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:915
GaudiMP.GMPBase.Coord.getQueues
def getQueues(self, nodeID)
Definition: GMPBase.py:1387
GaudiMP.GMPBase.Coord.sInit
sInit
Definition: GMPBase.py:1338
GaudiMP.GMPBase.GMPComponent.rTime
rTime
Definition: GMPBase.py:515
GaudiMP.GMPBase.MiniWriter.wName
wName
Definition: GMPBase.py:125
GaudiMP.GMPBase.Coord.getSyncEvents
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1381
GaudiMP.GMPBase.Reader.Engine
def Engine(self)
Definition: GMPBase.py:758
GaudiMP.GMPBase.GMPComponent.subworkers
subworkers
Definition: GMPBase.py:461
GaudiMP.GMPBase.Coord.Go
def Go(self)
Definition: GMPBase.py:1393
StringKeyEx.keys
keys
Definition: StringKeyEx.py:64
GaudiMP.GMPBase.Reader
Definition: GMPBase.py:691
GaudiPython.Bindings.InterfaceCast
Definition: Bindings.py:145
GaudiMP.GMPBase.MiniWriter.__repr__
def __repr__(self)
Definition: GMPBase.py:188
GaudiMP.GMPBase.Coord.log
log
Definition: GMPBase.py:1321
GaudiMP.GMPBase.GMPComponent.nodeType
nodeType
Definition: GMPBase.py:452
GaudiMP.GMPBase.GMPComponent.evcom
evcom
Definition: GMPBase.py:487
GaudiMP.GMPBase.Writer.writerDict
writerDict
Definition: GMPBase.py:1239
GaudiMP.GMPBase.Coord.writer
writer
Definition: GMPBase.py:1372
GaudiMP.GMPBase.TESSerializer.Load
def Load(self, tbuf)
Definition: GMPBase.py:369
GaudiMP.GMPBase.GMPComponent.Report
def Report(self)
Definition: GMPBase.py:673
GaudiMP.GMPBase.Worker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1187
GaudiPython::PyAlgorithm::finalize
StatusCode finalize() override
Definition: Algorithm.cpp:114
GaudiMP.GMPBase.EventCommunicator.sizeRecv
sizeRecv
Definition: GMPBase.py:280
GaudiMP.GMPBase.MiniWriter
Definition: GMPBase.py:105
GaudiMP.GMPBase.MiniWriter.svcOutput
svcOutput
Definition: GMPBase.py:129
GaudiMP.GMPBase.GMPComponent.Finalize
def Finalize(self)
Definition: GMPBase.py:665
GaudiMP.GMPBase.GMPComponent.a
a
Definition: GMPBase.py:536
GaudiMP.pTools
Definition: pTools.py:1
GaudiMP.GMPBase.Coord.system
system
Definition: GMPBase.py:1376
GaudiMP.GMPBase.Subworker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:945
GaudiMP.GMPBase.Worker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:1169
GaudiMP.GMPBase.Subworker.writerDict
writerDict
Definition: GMPBase.py:827
GaudiMP.GMPBase.GMPComponent.msgFormat
msgFormat
Definition: GMPBase.py:463
GaudiMP.GMPBase.GMPComponent.evt
evt
Definition: GMPBase.py:543
GaudiMP.GMPBase.EventCommunicator.nRecv
nRecv
Definition: GMPBase.py:278
GaudiMP.GMPBase.CollectHistograms._gmpc
_gmpc
Definition: GMPBase.py:221
GaudiMP.GMPBase.MiniWriter.output
output
Definition: GMPBase.py:121
GaudiMP.GMPBase.GMPComponent.log
log
Definition: GMPBase.py:460
GaudiMP.GMPBase.Subworker.Engine
def Engine(self)
Definition: GMPBase.py:833
GaudiMP.GMPBase.GMPComponent.cntr
cntr
Definition: GMPBase.py:659
GaudiMP.GMPBase.GMPComponent.StartGaudiPython
def StartGaudiPython(self)
Definition: GMPBase.py:552
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:173
GaudiMP.GMPBase.TESSerializer.evt
evt
Definition: GMPBase.py:357
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiMP.GMPBase.GMPComponent.Start
def Start(self)
Definition: GMPBase.py:479
GaudiMP.GMPBase.GMPComponent.LoadTES
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:557