The Gaudi Framework  v36r7 (7f57a304)
GMPBase.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 
13 import multiprocessing
14 import os
15 import sys
16 import time
17 from multiprocessing import (
18  Event,
19  JoinableQueue,
20  Process,
21  Queue,
22  cpu_count,
23  current_process,
24 )
25 from multiprocessing.queues import Empty
26 
27 from Gaudi.Configuration import *
28 from GaudiMP.pTools import *
29 from ROOT import TBuffer, TBufferFile, TParallelMergingFile
30 
31 from GaudiPython import (
32  FAILURE,
33  SUCCESS,
34  AppMgr,
35  InterfaceCast,
36  PyAlgorithm,
37  gbl,
38  setOwnership,
39 )
40 
41 # This script contains the bases for the Gaudi MultiProcessing (GMP)
42 # classes
43 
44 # There are three classes :
45 # Reader
46 # Worker
47 # Writer
48 
49 # Each class needs to perform communication with the others
50 # For this, we need a means of communication, which will be based on
51 # the python multiprocessing package
52 # This is provided in SPI pytools package
53 # cmt line : use pytools v1.1 LCG_Interfaces
54 # The PYTHONPATH env variable may need to be modified, as this might
55 # still point to 1.0_python2.5
56 
57 # Each class will need Queues, and a defined method for using these
58 # queues.
59 # For example, as long as there is something in the Queue, both ends
60 # of the queue must be open
61 # Also, there needs to be proper Termination flags and criteria
62 # The System should be error proof.
63 
64 # Constants -------------------------------------------------------------------
65 NAP = 0.001
66 MB = 1024.0 * 1024.0
67 # waits to guard against hanging, in seconds
68 WAIT_INITIALISE = 60 * 5
69 WAIT_FIRST_EVENT = 60 * 3
70 WAIT_SINGLE_EVENT = 60 * 6
71 WAIT_FINALISE = 60 * 2
72 STEP_INITIALISE = 10
73 STEP_EVENT = 2
74 STEP_FINALISE = 10
75 
76 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
77 SMAPS = False
78 
79 # -----------------------------------------------------------------------------
80 
81 # definitions
82 # ----------
83 # used to convert stored histos (in AIDA format) to ROOT format
84 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
85 
86 # used to check which type of histo we are dealing with
87 # i.e. if currentHisto in aidatypes : pass
88 aidatypes = (
89  gbl.AIDA.IHistogram,
90  gbl.AIDA.IHistogram1D,
91  gbl.AIDA.IHistogram2D,
92  gbl.AIDA.IHistogram3D,
93  gbl.AIDA.IProfile1D,
94  gbl.AIDA.IProfile2D,
95  gbl.AIDA.IBaseHistogram,
96 ) # extra?
97 
98 # similar to aidatypes
99 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
100 
101 # Types of OutputStream in Gaudi
102 WRITERTYPES = {
103  "EvtCollectionStream": "tuples",
104  "InputCopyStream": "events",
105  "OutputStream": "events",
106  "RecordStream": "records",
107  "RunRecordStream": "records",
108  "SequentialOutputStream": "events",
109  "TagCollectionStream": "tuples",
110 }
111 
112 # =============================================================================
113 
114 
115 class MiniWriter(object):
116  """
117  A class to represent a writer in the GaudiPython configuration
118  It can be non-trivial to access the name of the output file; it may be
119  specified in the DataSvc, or just on the writer, may be a list, or string
120  Also, there are three different types of writer (events, records, tuples)
121  so this bootstrap class provides easy access to this info while configuring
122  """
123 
124  def __init__(self, writer, wType, config):
125  self.w = writer
126  self.wType = wType
127  # set parameters for directly accessing the correct
128  # part of the configuration, so that later, we can do
129  # config[ key ].Output = modified(output)
130  self.key = None
131  self.output = None
132  self.ItemList = None
133  self.OptItemList = None
134  #
135  self.wName = writer.getName()
136  # Now process the Writer, find where the output is named
137  self.woutput = None
138  self.datasvcName = None
139  self.svcOutput = None
140  if hasattr(self.w, "Output"):
141  self.woutput = self.w.Output
142  self.getItemLists(config)
143  self.set(self.wName, self.w.Output)
144  return
145  else:
146  # if there is no output file, get it via the datasvc
147  # (every writer always has a datasvc property)
148  self.datasvcName = self.w.EvtDataSvc
149  datasvc = config[self.datasvcName]
150  if hasattr(datasvc, "Output"):
151  self.getItemLists(config)
152  self.set(self.datasvcName, datasvc.Output)
153  return
154 
155  def getNewName(self, replaceThis, withThis, extra=""):
156  # replace one pattern in the output name string
157  # with another, and return the Output name
158  # It *might* be in a list, so check for this
159  #
160  # @param extra : might need to add ROOT flags
161  # i.e.: OPT='RECREATE', or such
162  assert replaceThis.__class__.__name__ == "str"
163  assert withThis.__class__.__name__ == "str"
164  old = self.output
165  lst = False
166  if old.__class__.__name__ == "list":
167  old = self.output[0]
168  lst = True
169  new = old.replace(replaceThis, withThis)
170  new += extra
171  if lst:
172  return [new]
173  else:
174  return new
175 
176  def getItemLists(self, config):
177  # the item list
178  if hasattr(self.w, "ItemList"):
179  self.ItemList = self.w.ItemList
180  else:
181  datasvc = config[self.w.EvtDataSvc]
182  if hasattr(datasvc, "ItemList"):
183  self.ItemList = datasvc.ItemList
184  # The option item list; possibly not a valid variable
185  if hasattr(self.w, "OptItemList"):
186  self.OptItemList = self.w.OptItemList
187  else:
188  datasvc = config[self.w.EvtDataSvc]
189  if hasattr(datasvc, "OptItemList"):
190  self.OptItemList = datasvc.OptItemList
191  return
192 
193  def set(self, key, output):
194  self.key = key
195  self.output = output
196  return
197 
198  def __repr__(self):
199  s = ""
200  line = "-" * 80
201  s += line + "\n"
202  s += "Writer : %s\n" % (self.wName)
203  s += "Writer Type : %s\n" % (self.wType)
204  s += "Writer Output : %s\n" % (self.output)
205  s += "DataSvc : %s\n" % (self.datasvcName)
206  s += "DataSvc Output : %s\n" % (self.svcOutput)
207  s += "\n"
208  s += "Key for config : %s\n" % (self.key)
209  s += "Output File : %s\n" % (self.output)
210  s += "ItemList : %s\n" % (self.ItemList)
211  s += "OptItemList : %s\n" % (self.OptItemList)
212  s += line + "\n"
213  return s
214 
215 
216 # =============================================================================
217 
218 
220  """
221  GaudiPython algorithm used to clean up histos on the Reader and Workers
222  Only has a finalize method()
223  This retrieves a dictionary of path:histo objects and sends it to the
224  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
225  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
226  at the writer end, there will be an error.
227  """
228 
229  def __init__(self, gmpcomponent):
230  PyAlgorithm.__init__(self)
231  self._gmpc = gmpcomponent
232  self.log = self._gmpc.log
233  return None
234 
235  def execute(self):
236  return SUCCESS
237 
238  def finalize(self):
239  self.log.info("CollectHistograms Finalise (%s)" % (self._gmpc.nodeType))
240  self._gmpc.hDict = self._gmpc.dumpHistograms()
241  ks = self._gmpc.hDict.keys()
242  self.log.info("%i Objects in Histogram Store" % (len(ks)))
243 
244  # crashes occurred due to Memory Error during the sending of hundreds
245  # histos on slc5 machines, so instead, break into chunks
246  # send 100 at a time
247  chunk = 100
248  reps = len(ks) / chunk + 1
249  for i in range(reps):
250  someKeys = ks[i * chunk : (i + 1) * chunk]
251  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
252  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
253  # "finished" Notification
254  self.log.debug("Signalling end of histos to Writer")
255  self._gmpc.hq.put("HISTOS_SENT")
256  self.log.debug("Waiting on Sync Event")
257  self._gmpc.sEvent.wait()
258  self.log.debug("Histo Sync Event set, clearing and returning")
259  self._gmpc.hvt.clearStore()
260  root = gbl.DataObject()
261  setOwnership(root, False)
262  self._gmpc.hvt.setRoot("/stat", root)
263  return SUCCESS
264 
265 
266 # =============================================================================
267 
268 
269 class EventCommunicator(object):
270  # This class is responsible for communicating Gaudi Events via Queues
271  # Events are communicated as TBufferFiles, filled either by the
272  # TESSerializer, or the GaudiSvc, "IPCSvc"
273  def __init__(self, GMPComponent, qin, qout):
274  self._gmpc = GMPComponent
275  self.log = self._gmpc.log
276  # maximum capacity of Queues
277  self.maxsize = 50
278  # central variables : Queues
279  self.qin = qin
280  self.qout = qout
281 
282  # flags
283  self.allsent = False
284  self.allrecv = False
285 
286  # statistics storage
287  self.nSent = 0 # counter : items sent
288  self.nRecv = 0 # counter : items received
289  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
290  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
291  self.qinTime = 0 # time : receiving from self.qin
292  self.qoutTime = 0 # time : sending on qout
293 
294  def send(self, item):
295  # This class manages the sending of a TBufferFile Event to a Queue
296  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
297  assert item.__class__.__name__ == "tuple"
298  startTransmission = time.time()
299  self.qout.put(item)
300  # allow the background thread to feed the Queue; not 100% guaranteed to
301  # finish before next line
302  while self.qout._buffer:
303  time.sleep(NAP)
304  self.qoutTime += time.time() - startTransmission
305  self.sizeSent += item[1].Length()
306  self.nSent += 1
307  return SUCCESS
308 
309  def receive(self, timeout=None):
310  # Receive items from self.qin
311  startWait = time.time()
312  try:
313  itemIn = self.qin.get(timeout=timeout)
314  except Empty:
315  return None
316  self.qinTime += time.time() - startWait
317  self.nRecv += 1
318  if itemIn.__class__.__name__ == "tuple":
319  self.sizeRecv += itemIn[1].Length()
320  else:
321  self.nRecv -= 1
322  try:
323  self.qin.task_done()
324  except:
325  self._gmpc.log.warning(
326  "TASK_DONE called too often by : %s" % (self._gmpc.nodeType)
327  )
328  return itemIn
329 
330  def finalize(self):
331  self.log.info(
332  "Finalize Event Communicator : %s %s" % (self._gmpc, self._gmpc.nodeType)
333  )
334  # Reader sends one flag for each worker
335  # Workers send one flag each
336  # Writer sends nothing (it's at the end of the chain)
337  if self._gmpc.nodeType == "Reader":
338  downstream = self._gmpc.nWorkers
339  elif self._gmpc.nodeType == "Writer":
340  downstream = 0
341  elif self._gmpc.nodeType == "Worker":
342  downstream = 1
343 
344  for i in range(downstream):
345  self.qout.put("FINISHED")
346  if self._gmpc.nodeType != "Writer":
347  self.qout.join()
348  # Now some reporting...
349  self.statistics()
350 
351  def statistics(self):
352  self.log.name = "%s-%i Audit " % (self._gmpc.nodeType, self._gmpc.nodeID)
353  self.log.info("Items Sent : %i" % (self.nSent))
354  self.log.info("Items Received : %i" % (self.nRecv))
355  self.log.info("Data Sent : %i" % (self.sizeSent))
356  self.log.info("Data Received : %i" % (self.sizeRecv))
357  self.log.info("Q-out Time : %5.2f" % (self.qoutTime))
358  self.log.info("Q-in Time : %5.2f" % (self.qinTime))
359 
360 
361 # =============================================================================
362 
363 
364 class TESSerializer(object):
365  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
366  self.T = gaudiTESSerializer
367  self.evt = evtDataSvc
368  self.buffersIn = []
369  self.buffersOut = []
370  self.nIn = 0
371  self.nOut = 0
372  self.tDump = 0.0
373  self.tLoad = 0.0
374  # logging
375  self.nodeType = nodeType
376  self.nodeID = nodeID
377  self.log = log
378 
379  def Load(self, tbuf):
380  root = gbl.DataObject()
381  setOwnership(root, False)
382  self.evt.setRoot("/Event", root)
383  t = time.time()
384  self.T.loadBuffer(tbuf)
385  self.tLoad += time.time() - t
386  self.nIn += 1
387  self.buffersIn.append(tbuf.Length())
388 
389  def Dump(self):
390  t = time.time()
391  tb = TBufferFile(TBuffer.kWrite)
392  self.T.dumpBuffer(tb)
393  self.tDump += time.time() - t
394  self.nOut += 1
395  self.buffersOut.append(tb.Length())
396  return tb
397 
398  def Report(self):
399  evIn = "Events Loaded : %i" % (self.nIn)
400  evOut = "Events Dumped : %i" % (self.nOut)
401  din = sum(self.buffersIn)
402  dataIn = "Data Loaded : %i" % (din)
403  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
404  if self.nIn:
405  avgIn = "Avg Buf Loaded : %5.2f Mb" % (din / (self.nIn * MB))
406  maxIn = "Max Buf Loaded : %5.2f Mb" % (max(self.buffersIn) / MB)
407  else:
408  avgIn = "Avg Buf Loaded : N/A"
409  maxIn = "Max Buf Loaded : N/A"
410  dout = sum(self.buffersOut)
411  dataOut = "Data Dumped : %i" % (dout)
412  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
413  if self.nOut:
414  avgOut = "Avg Buf Dumped : %5.2f Mb" % (din / (self.nOut * MB))
415  maxOut = "Max Buf Dumped : %5.2f Mb" % (max(self.buffersOut) / MB)
416  else:
417  avgOut = "Avg Buf Dumped : N/A"
418  maxOut = "Max Buf Dumped : N/A"
419  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
420  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
421 
422  lines = (
423  evIn,
424  evOut,
425  dataIn,
426  dataInMb,
427  avgIn,
428  maxIn,
429  dataOut,
430  dataOutMb,
431  avgOut,
432  maxOut,
433  dumpTime,
434  loadTime,
435  )
436  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
437  for line in lines:
438  self.log.info(line)
439  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
440 
441 
442 # =============================================================================
443 
444 
445 class GMPComponent(object):
446  # This class will be the template for Reader, Worker and Writer
447  # containing all common components
448  # nodeId will be a numerical identifier for the node
449  # -1 for reader
450  # -2 for writer
451  # 0,...,nWorkers-1 for the Workers
452  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
453  # declare a Gaudi MultiProcessing Node
454  # the nodeType is going to be one of Reader, Worker, Writer
455  # qPair is going to be a tuple of ( qin, qout )
456  # for sending and receiving
457  # if nodeType is "Writer", it will be a list of qPairs,
458  # as there's one queue-in from each Worker
459  #
460  # params is a tuple of (nWorkers, config, log)
461 
462  self.nodeType = nodeType
463  current_process().name = nodeType
464 
465  # Synchronisation Event() objects for keeping track of the system
466  self.initEvent, eventLoopSyncer, self.finalEvent = events
467  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
468 
469  # necessary for knowledge of the system
470  self.nWorkers, self.sEvent, self.config, self.log = params
471  self.subworkers = subworkers
472  self.nodeID = nodeID
473  self.msgFormat = self.config["MessageSvc"].Format
474 
475  # describe the state of the node by the current Event Number
476  self.currentEvent = None
477 
478  # Unpack the Queues : (events, histos, filerecords)
479  self.queues = queues
480  self.num = 0
481 
482  ks = self.config.keys()
483  self.app = None
484  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
485  for k in list:
486  if k in ks:
487  self.app = k
488 
489  def Start(self):
490  # define the separate process
491  qPair, histq, fq = self.queues
492 
493  # Set up the Queue Mechanisms ( Event Communicators )
494  if self.nodeType == "Reader" or self.nodeType == "Worker":
495  # Reader or Worker Node
496  qin, qout = qPair
497  self.evcom = EventCommunicator(self, qin, qout)
498  else:
499  # Writer : many queues in, no queue out
500  assert self.nodeType == "Writer"
501  self.evcoms = []
502  qsin = qPair[0]
503  for q in qsin:
504  ec = EventCommunicator(self, q, None)
505  self.evcoms.append(ec)
506  # Histogram Queue
507  self.hq = histq
508  # FileRecords Queue
509  self.fq = fq
510 
511  # Universal Counters (available to all nodes)
512  # Use sensibly!!!
513  self.nIn = 0
514  self.nOut = 0
515 
516  # Status Flag (possibly remove later)
517  self.stat = SUCCESS
518 
519  # Set logger name
520  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
521 
522  # Heuristic variables
523  # time for init, run, final, firstEventTime, totalTime
524  self.iTime = 0.0
525  self.rTime = 0.0
526  self.fTime = 0.0
527  self.firstEvTime = 0.0
528  self.tTime = 0.0
529 
530  self.proc = Process(target=self.Engine)
531  # Fork and start the separate process
532  self.proc.start()
533 
534  def Engine(self):
535  # This will be the method carried out by the Node
536  # Different for all
537  pass
538 
540  # Different for all ; customize Configuration for multicore
541  pass
542 
543  def SetupGaudiPython(self):
544  # This method will initialize the GaudiPython Tools
545  # such as the AppMgr and so on
546  self.a = AppMgr()
547  if SMAPS:
548  from AlgSmapShot import SmapShot
549 
550  smapsLog = self.nodeType + "-" + str(self.nodeID) + ".smp"
551  ss = SmapShot(logname=smapsLog)
552  self.a.addAlgorithm(ss)
553  self.evt = self.a.evtsvc()
554  self.hvt = self.a.histsvc()
555  self.fsr = self.a.filerecordsvc()
556  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
557  self.pers = self.a.service("EventPersistencySvc", "IAddressCreator")
558  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
559  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
560  return SUCCESS
561 
562  def StartGaudiPython(self):
563  self.a.initialize()
564  self.a.start()
565  return SUCCESS
566 
567  def LoadTES(self, tbufferfile):
568  root = gbl.DataObject()
569  setOwnership(root, False)
570  self.evt.setRoot("/Event", root)
571  self.ts.loadBuffer(tbufferfile)
572 
573  def getEventNumber(self):
574  if self.app != "Gauss":
575  # Using getList or getHistoNames can result in the EventSelector
576  # re-initialising connection to RootDBase, which costs a lot of
577  # time... try to build a set of Header paths??
578 
579  # First Attempt : Unpacked Event Data
580  lst = ["/Event/Gen/Header", "/Event/Rec/Header"]
581  for l in lst:
582  path = l
583  try:
584  n = self.evt[path].evtNumber()
585 
586  return n
587  except:
588  # No evt number at this path
589  continue
590 
591  # second attepmt : try DAQ/RawEvent data
592  # The Evt Number is in bank type 16, bank 0, data pt 4
593  try:
594  n = self.evt["/Event/DAQ/RawEvent"].banks(16)[0].data()[4]
595 
596  return n
597  except:
598  pass
599 
600  # Default Action
601  if self.nIn > 0 or self.nOut > 0:
602  pass
603  else:
604  self.log.warning("Could not determine Event Number")
605  return -1
606  else:
607  if self.nodeID == -1:
608  self.num = self.num + 1
609 
610  return self.num
611 
612  def IdentifyWriters(self):
613  #
614  # Identify Writers in the Configuration
615  #
616  d = {}
617  keys = ["events", "records", "tuples", "histos"]
618  for k in keys:
619  d[k] = []
620 
621  # Identify Writers and Classify
622  wkeys = WRITERTYPES.keys()
623  for v in self.config.values():
624  if v.__class__.__name__ in wkeys:
625  writerType = WRITERTYPES[v.__class__.__name__]
626  d[writerType].append(MiniWriter(v, writerType, self.config))
627  if self.nodeID == 0:
628  self.log.info("Writer Found : %s" % (v.name()))
629 
630  # Now Check for the Histogram Service
631  if "HistogramPersistencySvc" in self.config.keys():
632  hfile = self.config["HistogramPersistencySvc"].getProp("OutputFile")
633  d["histos"].append(hfile)
634  return d
635 
636  def dumpHistograms(self):
637  """
638  Method used by the GaudiPython algorithm CollectHistos
639  to obtain a dictionary of form { path : object }
640  representing the Histogram Store
641  """
642  nlist = self.hvt.getHistoNames()
643  histDict = {}
644  objects = 0
645  histos = 0
646  if nlist:
647  for n in nlist:
648  o = self.hvt[n]
649  if type(o) in aidatypes:
650  o = aida2root(o)
651  histos += 1
652  else:
653  objects += 1
654  histDict[n] = o
655  else:
656  print("WARNING : no histograms to recover?")
657  return histDict
658 
659  def Initialize(self):
660  start = time.time()
661  self.processConfiguration()
662  self.SetupGaudiPython()
663  self.initEvent.set()
664  self.StartGaudiPython()
665 
666  if self.app == "Gauss":
667 
668  tool = self.a.tool("ToolSvc.EvtCounter")
669  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
670  else:
671  self.cntr = None
672 
673  self.iTime = time.time() - start
674 
675  def Finalize(self):
676  start = time.time()
677  self.a.stop()
678  self.a.finalize()
679  self.log.info("%s-%i Finalized" % (self.nodeType, self.nodeID))
680  self.finalEvent.set()
681  self.fTime = time.time() - start
682 
683  def Report(self):
684  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
685  allTime = "Alive Time : %5.2f" % (self.tTime)
686  initTime = "Init Time : %5.2f" % (self.iTime)
687  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
688  runTime = "Run Time : %5.2f" % (self.rTime)
689  finTime = "Finalise Time : %5.2f" % (self.fTime)
690  tup = (allTime, initTime, frstTime, runTime, finTime)
691  for t in tup:
692  self.log.info(t)
693  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
694  # and report from the TESSerializer
695  self.TS.Report()
696 
697 
698 # =============================================================================
699 
700 
702  def __init__(self, queues, events, params, subworkers):
703  GMPComponent.__init__(self, "Reader", -1, queues, events, params, subworkers)
704 
706  # Reader :
707  # No algorithms
708  # No output
709  # No histos
710  self.config["ApplicationMgr"].TopAlg = []
711  self.config["ApplicationMgr"].OutStream = []
712  if "HistogramPersistencySvc" in self.config.keys():
713  self.config["HistogramPersistencySvc"].OutputFile = ""
714  self.config["MessageSvc"].Format = "%-13s " % "[Reader]" + self.msgFormat
715  self.evtMax = self.config["ApplicationMgr"].EvtMax
716 
717  def DumpEvent(self):
718  tb = TBufferFile(TBuffer.kWrite)
719  # print '----Reader dumping Buffer!!!'
720  self.ts.dumpBuffer(tb)
721  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
722  return tb
723 
724  def DoFirstEvent(self):
725  # Do First Event ------------------------------------------------------
726  # Check Termination Criteria
727  startFirst = time.time()
728  self.log.info("Reader : First Event")
729  if self.nOut == self.evtMax:
730  self.log.info("evtMax( %i ) reached" % (self.evtMax))
731  self.lastEvent.set()
732  return SUCCESS
733  else:
734  # Continue to read, dump and send event
735  self.a.run(1)
736  if not bool(self.evt["/Event"]):
737  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
738  self.lastEvent.set()
739  return SUCCESS
740  else:
741  # Popluate TESSerializer list and send Event
742  if self.app == "Gauss":
743  lst = self.evt.getHistoNames()
744  else:
745  try:
746  lst = self.evt.getList()
747  if self.app == "DaVinci":
748  daqnode = self.evt.retrieveObject("/Event/DAQ").registry()
749  setOwnership(daqnode, False)
750  self.evt.getList(daqnode, lst, daqnode.address().par())
751  except:
752  self.log.critical("Reader could not acquire TES List!")
753  self.lastEvent.set()
754  return FAILURE
755  self.log.info("Reader : TES List : %i items" % (len(lst)))
756  for l in lst:
757  self.ts.addItem(l)
759  tb = self.TS.Dump()
760  self.log.info("First Event Sent")
761  self.evcom.send((self.currentEvent, tb))
762  self.nOut += 1
763  self.eventLoopSyncer.set()
764  self.evt.clearStore()
765  self.firstEvTime = time.time() - startFirst
766  return SUCCESS
767 
768  def Engine(self):
769  # rename process
770  import ctypes
771  import os
772 
773  libc = ctypes.CDLL("libc.so.6")
774  name = str(self.nodeType) + str(self.nodeID) + "\0"
775  libc.prctl(15, name, 0, 0, 0)
776 
777  startEngine = time.time()
778  self.log.name = "Reader"
779  self.log.info("Reader Process starting")
780 
781  self.Initialize()
782 
783  # add the Histogram Collection Algorithm
784  self.a.addAlgorithm(CollectHistograms(self))
785 
786  self.log.info("Reader Beginning Distribution")
787  sc = self.DoFirstEvent()
788  if sc.isSuccess():
789  self.log.info("Reader First Event OK")
790  else:
791  self.log.critical("Reader Failed on First Event")
792  self.stat = FAILURE
793 
794  # Do All Others -------------------------------------------------------
795  while True:
796  # Check Termination Criteria
797  if self.nOut == self.evtMax:
798  self.log.info("evtMax( %i ) reached" % (self.evtMax))
799  break
800  # Check Health
801  if not self.stat.isSuccess():
802  self.log.critical("Reader is Damaged!")
803  break
804  # Continue to read, dump and send event
805  t = time.time()
806  self.a.run(1)
807  self.rTime += time.time() - t
808  if not bool(self.evt["/Event"]):
809  self.log.warning("No More Events! (So Far : %i)" % (self.nOut))
810  break
811  self.currentEvent = self.getEventNumber()
812  tb = self.TS.Dump()
813  self.evcom.send((self.currentEvent, tb))
814  # clean up
815  self.nOut += 1
816  self.eventLoopSyncer.set()
817  self.evt.clearStore()
818  self.log.info("Setting <Last> Event")
819  self.lastEvent.set()
820 
821  # Finalize
822  self.log.info("Reader : Event Distribution complete.")
823  self.evcom.finalize()
824  self.Finalize()
825  self.tTime = time.time() - startEngine
826  self.Report()
827 
828 
829 # =============================================================================
830 
831 
833  def __init__(self, workerID, queues, events, params, subworkers):
834  GMPComponent.__init__(
835  self, "Worker", workerID, queues, events, params, subworkers
836  )
837  # Identify the writer streams
839  # Identify the accept/veto checks for each event
840  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
841  self.log.info("Subworker-%i Created OK" % (self.nodeID))
842  self.eventOutput = True
843 
844  def Engine(self):
845  # rename process
846  import ctypes
847  import os
848 
849  libc = ctypes.CDLL("libc.so.6")
850  name = str(self.nodeType) + str(self.nodeID) + "\0"
851  libc.prctl(15, name, 0, 0, 0)
852 
853  self.initEvent.set()
854  startEngine = time.time()
855  msg = self.a.service("MessageSvc")
856  msg.Format = "%-13s " % ("[" + self.log.name + "]") + self.msgFormat
857 
858  self.log.name = "Worker-%i" % (self.nodeID)
859  self.log.info("Subworker %i starting Engine" % (self.nodeID))
861 
862  # populate the TESSerializer itemlist
863  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
864 
865  nEventWriters = len(self.writerDict["events"])
866  self.a.addAlgorithm(CollectHistograms(self))
867 
868  # Begin processing
869  Go = True
870  while Go:
871  packet = self.evcom.receive()
872  if packet:
873  pass
874  else:
875  continue
876  if packet == "FINISHED":
877  break
878  evtNumber, tbin = packet # unpack
879  if self.cntr != None:
880 
881  self.cntr.setEventCounter(evtNumber)
882 
883  self.nIn += 1
884  self.TS.Load(tbin)
885 
886  t = time.time()
887  sc = self.a.executeEvent()
888  if self.nIn == 1:
889  self.firstEvTime = time.time() - t
890  else:
891  self.rTime += time.time() - t
892  if sc.isSuccess():
893  pass
894  else:
895  self.log.name = "Worker-%i" % (self.nodeID)
896  self.log.warning("Did not Execute Event")
897  self.evt.clearStore()
898  continue
899  if self.isEventPassed():
900  pass
901  else:
902  self.log.name = "Worker-%i" % (self.nodeID)
903  self.log.warning("Event did not pass : %i" % (evtNumber))
904  self.evt.clearStore()
905  continue
906  if self.eventOutput:
907  # It may be the case of generating Event Tags; hence
908  # no event output
910  tb = self.TS.Dump()
911  self.evcom.send((self.currentEvent, tb))
912  self.nOut += 1
913  self.inc.fireIncident(gbl.Incident("Subworker", "EndEvent"))
914  self.eventLoopSyncer.set()
915  self.evt.clearStore()
916  self.log.name = "Worker-%i" % (self.nodeID)
917  self.log.info("Setting <Last> Event %s" % (self.nodeID))
918  self.lastEvent.set()
919 
920  self.evcom.finalize()
921  # Now send the FileRecords and stop/finalize the appMgr
922  self.filerecordsAgent.SendFileRecords()
923  self.tTime = time.time() - startEngine
924  self.Finalize()
925  self.Report()
926  # self.finalEvent.set()
927 
928  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
929  self.a = a
930  self.evt = evt
931  self.hvt = hvt
932  self.fsr = fsr
933  # self.inc = inc
934  self.inc = self.a.service("IncidentSvc", "IIncidentSvc")
935  self.pers = pers
936  self.ts = ts
937  self.cntr = cntr
938  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID, self.log)
939 
940  def getCheckAlgs(self):
941  """
942  For some output writers, a check is performed to see if the event has
943  executed certain algorithms.
944  These reside in the AcceptAlgs property for those writers
945  """
946  acc = []
947  req = []
948  vet = []
949  for m in self.writerDict["events"]:
950  if hasattr(m.w, "AcceptAlgs"):
951  acc += m.w.AcceptAlgs
952  if hasattr(m.w, "RequireAlgs"):
953  req += m.w.RequireAlgs
954  if hasattr(m.w, "VetoAlgs"):
955  vet += m.w.VetoAlgs
956  return (acc, req, vet)
957 
958  def checkExecutedPassed(self, algName):
959  if (
960  self.a.algorithm(algName)._ialg.isExecuted()
961  and self.a.algorithm(algName)._ialg.filterPassed()
962  ):
963  return True
964  else:
965  return False
966 
967  def isEventPassed(self):
968  """
969  Check the algorithm status for an event.
970  Depending on output writer settings, the event
971  may be declined based on various criteria.
972  This is a transcript of the check that occurs in GaudiSvc::OutputStream
973  """
974  passed = False
975 
976  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
977  if self.acceptAlgs:
978  for name in self.acceptAlgs:
979  if self.checkExecutedPassed(name):
980  passed = True
981  break
982  else:
983  passed = True
984 
985  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
986  for name in self.requireAlgs:
987  if self.checkExecutedPassed(name):
988  pass
989  else:
990  self.log.info("Evt declined (requireAlgs) : %s" % (name))
991  passed = False
992 
993  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
994  for name in self.vetoAlgs:
995  if self.checkExecutedPassed(name):
996  pass
997  else:
998  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
999  passed = False
1000  return passed
1001 
1002 
1003 # =============================================================================
1004 
1005 
1007  def __init__(self, workerID, queues, events, params, subworkers):
1008  GMPComponent.__init__(
1009  self, "Worker", workerID, queues, events, params, subworkers
1010  )
1011  # Identify the writer streams
1013  # Identify the accept/veto checks for each event
1014  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
1015  self.log.name = "Worker-%i" % (self.nodeID)
1016  self.log.info("Worker-%i Created OK" % (self.nodeID))
1017  self.eventOutput = True
1018 
1020 
1021  # Worker :
1022  # No input
1023  # No output
1024  # No Histos
1025  self.config["EventSelector"].Input = []
1026  self.config["ApplicationMgr"].OutStream = []
1027  if "HistogramPersistencySvc" in self.config.keys():
1028  self.config["HistogramPersistencySvc"].OutputFile = ""
1029  formatHead = "[Worker-%i]" % (self.nodeID)
1030  self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1031 
1032  for key, lst in self.writerDict.iteritems():
1033  self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1034 
1035  for m in self.writerDict["tuples"]:
1036  # rename Tuple output file with an appendix
1037  # based on worker id, for merging later
1038  newName = m.getNewName(".", ".w%i." % (self.nodeID))
1039  self.config[m.key].Output = newName
1040 
1041  # Suppress INFO Output for all but Worker-0
1042  # if self.nodeID == 0 :
1043  # pass
1044  # else :
1045  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1046 
1047  if self.app == "Gauss":
1048  try:
1049  if "ToolSvc.EvtCounter" not in self.config:
1050  from Configurables import EvtCounter
1051 
1052  counter = EvtCounter()
1053  else:
1054  counter = self.config["ToolSvc.EvtCounter"]
1055  counter.UseIncident = False
1056  except:
1057  # ignore errors when trying to change the configuration of the EvtCounter
1058  self.log.warning("Cannot configure EvtCounter")
1059 
1060  def Engine(self):
1061 
1062  # rename process
1063  import ctypes
1064  import os
1065 
1066  libc = ctypes.CDLL("libc.so.6")
1067  name = str(self.nodeType) + str(self.nodeID) + "\0"
1068  libc.prctl(15, name, 0, 0, 0)
1069 
1070  startEngine = time.time()
1071  self.log.info("Worker %i starting Engine" % (self.nodeID))
1072  self.Initialize()
1074 
1075  # populate the TESSerializer itemlist
1076  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1077 
1078  nEventWriters = len(self.writerDict["events"])
1079  if nEventWriters:
1080  itemList = set()
1081  optItemList = set()
1082  for m in self.writerDict["events"]:
1083  for item in m.ItemList:
1084  hsh = item.find("#")
1085  if hsh != -1:
1086  item = item[:hsh]
1087  itemList.add(item)
1088  for item in m.OptItemList:
1089  hsh = item.find("#")
1090  if hsh != -1:
1091  item = item[:hsh]
1092  optItemList.add(item)
1093  # If an item is mandatory and optional, keep it only in the optional list
1094  itemList -= optItemList
1095  for item in sorted(itemList):
1096  self.log.info(" adding ItemList Item to ts : %s" % (item))
1097  self.ts.addItem(item)
1098  for item in sorted(optItemList):
1099  self.log.info(" adding Optional Item to ts : %s" % (item))
1100  self.ts.addOptItem(item)
1101  else:
1102  self.log.info("There is no Event Output for this app")
1103  self.eventOutput = False
1104 
1105  # Begin processing
1106  Go = True
1107  while Go:
1108  packet = self.evcom.receive()
1109  if packet:
1110  pass
1111  else:
1112  continue
1113  if packet == "FINISHED":
1114  break
1115  evtNumber, tbin = packet # unpack
1116  if self.cntr != None:
1117  self.cntr.setEventCounter(evtNumber)
1118 
1119  # subworkers are forked before the first event is processed
1120  if self.nIn == 0:
1121  self.log.info("Fork new subworkers")
1122 
1123  # Fork subworkers and share services
1124  for k in self.subworkers:
1125  k.SetServices(
1126  self.a,
1127  self.evt,
1128  self.hvt,
1129  self.fsr,
1130  self.inc,
1131  self.pers,
1132  self.ts,
1133  self.cntr,
1134  )
1135  k.Start()
1136  self.a.addAlgorithm(CollectHistograms(self))
1137  self.nIn += 1
1138  self.TS.Load(tbin)
1139 
1140  t = time.time()
1141  sc = self.a.executeEvent()
1142  if self.nIn == 1:
1143  self.firstEvTime = time.time() - t
1144  else:
1145  self.rTime += time.time() - t
1146  if sc.isSuccess():
1147  pass
1148  else:
1149  self.log.warning("Did not Execute Event")
1150  self.evt.clearStore()
1151  continue
1152  if self.isEventPassed():
1153  pass
1154  else:
1155  self.log.warning("Event did not pass : %i" % (evtNumber))
1156  self.evt.clearStore()
1157  continue
1158  if self.eventOutput:
1159  # It may be the case of generating Event Tags; hence
1160  # no event output
1162  tb = self.TS.Dump()
1163  self.evcom.send((self.currentEvent, tb))
1164  self.nOut += 1
1165  self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1166  self.eventLoopSyncer.set()
1167  self.evt.clearStore()
1168  self.log.info("Setting <Last> Event")
1169  self.lastEvent.set()
1170 
1171  self.evcom.finalize()
1172  self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1173  # Now send the FileRecords and stop/finalize the appMgr
1174  self.filerecordsAgent.SendFileRecords()
1175  self.Finalize()
1176  self.tTime = time.time() - startEngine
1177  self.Report()
1178 
1179  for k in self.subworkers:
1180  self.log.info("Join subworkers")
1181  k.proc.join()
1182 
1183  def getCheckAlgs(self):
1184  """
1185  For some output writers, a check is performed to see if the event has
1186  executed certain algorithms.
1187  These reside in the AcceptAlgs property for those writers
1188  """
1189  acc = []
1190  req = []
1191  vet = []
1192  for m in self.writerDict["events"]:
1193  if hasattr(m.w, "AcceptAlgs"):
1194  acc += m.w.AcceptAlgs
1195  if hasattr(m.w, "RequireAlgs"):
1196  req += m.w.RequireAlgs
1197  if hasattr(m.w, "VetoAlgs"):
1198  vet += m.w.VetoAlgs
1199  return (acc, req, vet)
1200 
1201  def checkExecutedPassed(self, algName):
1202  if (
1203  self.a.algorithm(algName)._ialg.isExecuted()
1204  and self.a.algorithm(algName)._ialg.filterPassed()
1205  ):
1206  return True
1207  else:
1208  return False
1209 
1210  def isEventPassed(self):
1211  """
1212  Check the algorithm status for an event.
1213  Depending on output writer settings, the event
1214  may be declined based on various criteria.
1215  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1216  """
1217  passed = False
1218 
1219  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1220  if self.acceptAlgs:
1221  for name in self.acceptAlgs:
1222  if self.checkExecutedPassed(name):
1223  passed = True
1224  break
1225  else:
1226  passed = True
1227 
1228  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1229  for name in self.requireAlgs:
1230  if self.checkExecutedPassed(name):
1231  pass
1232  else:
1233  self.log.info("Evt declined (requireAlgs) : %s" % (name))
1234  passed = False
1235 
1236  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1237  for name in self.vetoAlgs:
1238  if self.checkExecutedPassed(name):
1239  pass
1240  else:
1241  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1242  passed = False
1243  return passed
1244 
1245 
1246 # =============================================================================
1247 
1248 
1250  def __init__(self, queues, events, params, subworkers):
1251  GMPComponent.__init__(self, "Writer", -2, queues, events, params, subworkers)
1252  # Identify the writer streams
1254  # This keeps track of workers as they finish
1255  self.status = [False] * self.nWorkers
1256  self.log.name = "Writer--2"
1257 
1259  # Writer :
1260  # No input
1261  # No Algs
1262  self.config["ApplicationMgr"].TopAlg = []
1263  self.config["EventSelector"].Input = []
1264 
1265  self.config["MessageSvc"].Format = "%-13s " % "[Writer]" + self.msgFormat
1266 
1267  def Engine(self):
1268  # rename process
1269  import ctypes
1270  import os
1271 
1272  libc = ctypes.CDLL("libc.so.6")
1273  name = str(self.nodeType) + str(self.nodeID) + "\0"
1274  libc.prctl(15, name, 0, 0, 0)
1275 
1276  startEngine = time.time()
1277  self.Initialize()
1278  self.histoAgent = HistoAgent(self)
1280 
1281  # Begin processing
1282  Go = True
1283  current = -1
1284  stopCriteria = self.nWorkers
1285  while Go:
1286  current = (current + 1) % self.nWorkers
1287  packet = self.evcoms[current].receive(timeout=0.01)
1288  if packet == None:
1289  continue
1290  if packet == "FINISHED":
1291  self.log.info("Writer got FINISHED flag : Worker %i" % (current))
1292 
1293  self.status[current] = True
1294  if all(self.status):
1295  self.log.info("FINISHED recd from all workers, break loop")
1296  break
1297  continue
1298  # otherwise, continue as normal
1299  self.nIn += 1 # update central count (maybe needed by FSR store)
1300  evtNumber, tbin = packet # unpack
1301  self.TS.Load(tbin)
1302  t = time.time()
1303  self.a.executeEvent()
1304  self.rTime += time.time() - t
1306  self.evt.clearStore()
1307  self.eventLoopSyncer.set()
1308  self.log.name = "Writer--2"
1309  self.log.info("Setting <Last> Event")
1310  self.lastEvent.set()
1311 
1312  # finalisation steps
1313  [e.finalize() for e in self.evcoms]
1314  # Now do Histograms
1315  sc = self.histoAgent.Receive()
1316  sc = self.histoAgent.RebuildHistoStore()
1317  if sc.isSuccess():
1318  self.log.info("Histo Store rebuilt ok")
1319  else:
1320  self.log.warning("Histo Store Error in Rebuild")
1321 
1322  # Now do FileRecords
1323  sc = self.filerecordsAgent.Receive()
1324  self.filerecordsAgent.Rebuild()
1325  self.Finalize()
1326  # self.rTime = time.time()-startEngine
1327  self.Report()
1328 
1329 
1330 # =============================================================================
1331 
1332 # =============================================================================
1333 
1334 
1335 class Coord(object):
1336  def __init__(self, nWorkers, config, log):
1337 
1338  self.log = log
1339  self.config = config
1340  # set up Logging
1341  self.log.name = "GaudiPython-Parallel-Logger"
1342  self.log.info("GaudiPython Parallel Process Co-ordinator beginning")
1343 
1344  if nWorkers == -1:
1345  # The user has requested all available cpus in the machine
1346  self.nWorkers = cpu_count()
1347  else:
1348  self.nWorkers = nWorkers
1349 
1350  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1351  self.hq = JoinableQueue() # for Histogram data
1352  self.fq = JoinableQueue() # for FileRecords data
1353 
1354  # Make a Syncer for Initalise, Run, and Finalise
1355  self.sInit = Syncer(
1356  self.nWorkers, self.log, limit=WAIT_INITIALISE, step=STEP_INITIALISE
1357  )
1358  self.sRun = Syncer(
1359  self.nWorkers,
1360  self.log,
1361  manyEvents=True,
1362  limit=WAIT_SINGLE_EVENT,
1363  step=STEP_EVENT,
1364  firstEvent=WAIT_FIRST_EVENT,
1365  )
1366  self.sFin = Syncer(
1367  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE
1368  )
1369  # and one final one for Histogram Transfer
1370  self.histSyncEvent = Event()
1371 
1372  # params are common to al subprocesses
1373  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1374 
1375  self.subworkers = []
1376  # Declare SubProcesses!
1377  for i in range(1, self.nWorkers):
1378  sub = Subworker(
1379  i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers
1380  )
1381  self.subworkers.append(sub)
1382  self.reader = Reader(
1383  self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers
1384  )
1385  self.workers = []
1386  wk = Worker(
1387  0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers
1388  )
1389  self.writer = Writer(
1390  self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers
1391  )
1392 
1393  self.system = []
1394  self.system.append(self.writer)
1395  self.system.append(wk)
1396  self.system.append(self.reader)
1397 
1398  def getSyncEvents(self, nodeID):
1399  init = self.sInit.d[nodeID].event
1400  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1401  fin = self.sFin.d[nodeID].event
1402  return (init, run, fin)
1403 
1404  def getQueues(self, nodeID):
1405  eventQ = self.qs[nodeID]
1406  histQ = self.hq
1407  fsrQ = self.fq
1408  return (eventQ, histQ, fsrQ)
1409 
1410  def Go(self):
1411 
1412  # Initialise
1413  self.log.name = "GaudiPython-Parallel-Logger"
1414  self.log.info("INITIALISING SYSTEM")
1415 
1416  # Start reader, writer and main worker
1417  for p in self.system:
1418  p.Start()
1419 
1420  sc = self.sInit.syncAll(step="Initialise")
1421  if sc == SUCCESS:
1422  pass
1423  else:
1424  self.Terminate()
1425  return FAILURE
1426 
1427  # Run
1428  self.log.name = "GaudiPython-Parallel-Logger"
1429  self.log.info("RUNNING SYSTEM")
1430  sc = self.sRun.syncAll(step="Run")
1431  if sc == SUCCESS:
1432  pass
1433  else:
1434  self.Terminate()
1435  return FAILURE
1436 
1437  # Finalise
1438  self.log.name = "GaudiPython-Parallel-Logger"
1439  self.log.info("FINALISING SYSTEM")
1440  sc = self.sFin.syncAll(step="Finalise")
1441  if sc == SUCCESS:
1442  pass
1443  else:
1444  self.Terminate()
1445  return FAILURE
1446 
1447  # if we've got this far, finally report SUCCESS
1448  self.log.info("Cleanly join all Processes")
1449  self.Stop()
1450  self.log.info("Report Total Success to Main.py")
1451  return SUCCESS
1452 
1453  def Terminate(self):
1454  # Brutally kill sub-processes
1455  children = multiprocessing.active_children()
1456  for i in children:
1457  i.terminate()
1458 
1459  # self.writer.proc.terminate()
1460  # [ w.proc.terminate() for w in self.workers]
1461  # self.reader.proc.terminate()
1462 
1463  def Stop(self):
1464  # procs should be joined in reverse order to launch
1465  self.system.reverse()
1466  for s in self.system:
1467  s.proc.join()
1468  return SUCCESS
1469 
1470  def SetupQueues(self):
1471  # This method will set up the network of Queues needed
1472  # N Queues = nWorkers + 1
1473  # Each Worker has a Queue in, and a Queue out
1474  # Reader has Queue out only
1475  # Writer has nWorkers Queues in
1476 
1477  # one queue from Reader-Workers
1478  rwk = JoinableQueue()
1479  # one queue from each worker to writer
1480  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1481  d = {}
1482  d[-1] = (None, rwk) # Reader
1483  d[-2] = (workersWriter, None) # Writer
1484  for i in range(self.nWorkers):
1485  d[i] = (rwk, workersWriter[i])
1486  return d
1487 
1488 
1489 # ============================= EOF ===========================================
GaudiMP.GMPBase.CollectHistograms
Definition: GMPBase.py:219
GaudiMP.GMPBase.Coord.workers
workers
Definition: GMPBase.py:1385
AlgSequencer.all
all
Definition: AlgSequencer.py:61
GaudiMP.GMPBase.GMPComponent.num
num
Definition: GMPBase.py:480
GaudiMP.GMPBase.TESSerializer.nOut
nOut
Definition: GMPBase.py:371
GaudiMP.GMPBase.TESSerializer.__init__
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:365
GaudiMP.GMPBase.Coord.fq
fq
Definition: GMPBase.py:1352
GaudiMP.GMPBase.EventCommunicator.receive
def receive(self, timeout=None)
Definition: GMPBase.py:309
GaudiMP.GMPBase.GMPComponent.nodeID
nodeID
Definition: GMPBase.py:472
GaudiMP.GMPBase.GMPComponent.__init__
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:452
GaudiMP.GMPBase.MiniWriter.OptItemList
OptItemList
Definition: GMPBase.py:133
GaudiMP.GMPBase.EventCommunicator.log
log
Definition: GMPBase.py:275
GaudiMP.GMPBase.GMPComponent.tTime
tTime
Definition: GMPBase.py:528
GaudiMP.GMPBase.Coord.SetupQueues
def SetupQueues(self)
Definition: GMPBase.py:1470
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:240
GaudiMP.GMPBase.GMPComponent.getEventNumber
def getEventNumber(self)
Definition: GMPBase.py:573
GaudiMP.GMPBase.Writer.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1258
GaudiMP.GMPBase.GMPComponent.IdentifyWriters
def IdentifyWriters(self)
Definition: GMPBase.py:612
GaudiMP.GMPBase.TESSerializer.tLoad
tLoad
Definition: GMPBase.py:373
GaudiPython::PyAlgorithm::execute
StatusCode execute() override
Definition: Algorithm.cpp:110
GaudiMP.GMPBase.EventCommunicator.send
def send(self, item)
Definition: GMPBase.py:294
GaudiMP.GMPBase.Worker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1073
GaudiMP.GMPBase.Coord.sRun
sRun
Definition: GMPBase.py:1358
GaudiMP.GMPBase.TESSerializer.buffersIn
buffersIn
Definition: GMPBase.py:368
GaudiMP.GMPBase.GMPComponent.TS
TS
Definition: GMPBase.py:559
GaudiMP.GMPBase.GMPComponent.stat
stat
Definition: GMPBase.py:517
GaudiMP.GMPBase.Subworker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:860
GaudiMP.GMPBase.GMPComponent.dumpHistograms
def dumpHistograms(self)
Definition: GMPBase.py:636
GaudiMP.GMPBase.EventCommunicator.sizeSent
sizeSent
Definition: GMPBase.py:289
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
GaudiMP.GMPBase.TESSerializer
Definition: GMPBase.py:364
GaudiMP.GMPBase.GMPComponent.nIn
nIn
Definition: GMPBase.py:513
GaudiMP.GMPBase.MiniWriter.wType
wType
Definition: GMPBase.py:126
GaudiMP.GMPBase.GMPComponent
Definition: GMPBase.py:445
GaudiMP.GMPBase.Worker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:1014
GaudiMP.GMPBase.Reader.evtMax
evtMax
Definition: GMPBase.py:715
GaudiMP.GMPBase.MiniWriter.ItemList
ItemList
Definition: GMPBase.py:132
GaudiMP.GMPBase.EventCommunicator.maxsize
maxsize
Definition: GMPBase.py:277
GaudiMP.GMPBase.EventCommunicator.statistics
def statistics(self)
Definition: GMPBase.py:351
GaudiMP.GMPBase.MiniWriter.getNewName
def getNewName(self, replaceThis, withThis, extra="")
Definition: GMPBase.py:155
GaudiMP.GMPBase.Worker.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1019
GaudiMP.GMPBase.Coord.qs
qs
Definition: GMPBase.py:1350
GaudiMP.GMPBase.GMPComponent.Initialize
def Initialize(self)
Definition: GMPBase.py:659
GaudiMP.GMPBase.Subworker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:940
GaudiMP.GMPBase.Coord.nWorkers
nWorkers
Definition: GMPBase.py:1346
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:873
GaudiMP.GMPBase.EventCommunicator.__init__
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:273
GaudiMP.GMPBase.GMPComponent.fTime
fTime
Definition: GMPBase.py:526
GaudiMP.GMPBase.Writer
Definition: GMPBase.py:1249
max
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:225
GaudiMP.GMPBase.Coord
Definition: GMPBase.py:1335
GaudiPython::PyAlgorithm
Definition: Algorithm.h:40
GaudiMP.GMPBase.Subworker
Definition: GMPBase.py:832
GaudiMP.GMPBase.GMPComponent.hq
hq
Definition: GMPBase.py:507
GaudiMP.GMPBase.Coord.hq
hq
Definition: GMPBase.py:1351
GaudiPython.Bindings.setOwnership
setOwnership
Definition: Bindings.py:136
GaudiMP.GMPBase.Reader.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:705
GaudiMP.GMPBase.Coord.Terminate
def Terminate(self)
Definition: GMPBase.py:1453
GaudiMP.GMPBase.Worker.Engine
def Engine(self)
Definition: GMPBase.py:1060
GaudiMP.GMPBase.Worker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:1007
GaudiMP.GMPBase.Writer.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1250
GaudiMP.GMPBase.Subworker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:833
GaudiMP.GMPBase.Subworker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:840
GaudiMP.GMPBase.GMPComponent.pers
pers
Definition: GMPBase.py:557
GaudiMP.GMPBase.GMPComponent.hvt
hvt
Definition: GMPBase.py:554
GaudiMP.GMPBase.TESSerializer.T
T
Definition: GMPBase.py:366
GaudiMP.GMPBase.TESSerializer.log
log
Definition: GMPBase.py:377
GaudiMP.GMPBase.Worker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:1210
GaudiMP.GMPBase.GMPComponent.iTime
iTime
Definition: GMPBase.py:524
GaudiMP.GMPBase.TESSerializer.nodeID
nodeID
Definition: GMPBase.py:376
GaudiMP.GMPBase.GMPComponent.currentEvent
currentEvent
Definition: GMPBase.py:476
GaudiMP.GMPBase.GMPComponent.app
app
Definition: GMPBase.py:483
compareOutputFiles.par
string par
Definition: compareOutputFiles.py:483
GaudiMP.GMPBase.Writer.histoAgent
histoAgent
Definition: GMPBase.py:1278
GaudiMP.GMPBase.GMPComponent.firstEvTime
firstEvTime
Definition: GMPBase.py:527
GaudiMP.GMPBase.EventCommunicator.qin
qin
Definition: GMPBase.py:279
GaudiMP.GMPBase.Reader.DumpEvent
def DumpEvent(self)
Definition: GMPBase.py:717
GaudiMP.GMPBase.TESSerializer.buffersOut
buffersOut
Definition: GMPBase.py:369
GaudiPluginService.cpluginsvc.registry
def registry()
Definition: cpluginsvc.py:84
GaudiMP.GMPBase.MiniWriter.woutput
woutput
Definition: GMPBase.py:137
GaudiMP.GMPBase.GMPComponent.proc
proc
Definition: GMPBase.py:530
GaudiMP.GMPBase.Coord.config
config
Definition: GMPBase.py:1339
bug_34121.tool
tool
Definition: bug_34121.py:17
GaudiMP.GMPBase.GMPComponent.inc
inc
Definition: GMPBase.py:556
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:444
GaudiMP.GMPBase.MiniWriter.getItemLists
def getItemLists(self, config)
Definition: GMPBase.py:176
GaudiMP.GMPBase.GMPComponent.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:539
GaudiMP.pTools.Syncer
Definition: pTools.py:630
GaudiMP.GMPBase.Reader.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:702
GaudiMP.GMPBase.MiniWriter.w
w
Definition: GMPBase.py:125
GaudiMP.GMPBase.Writer.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1279
GaudiMP.GMPBase.Coord.sFin
sFin
Definition: GMPBase.py:1366
GaudiMP.GMPBase.Coord.reader
reader
Definition: GMPBase.py:1382
GaudiMP.GMPBase.Subworker.eventOutput
eventOutput
Definition: GMPBase.py:842
GaudiMP.GMPBase.GMPComponent.lastEvent
lastEvent
Definition: GMPBase.py:467
GaudiMP.GMPBase.GMPComponent.nOut
nOut
Definition: GMPBase.py:514
GaudiMP.GMPBase.Worker.eventOutput
eventOutput
Definition: GMPBase.py:1017
GaudiMP.GMPBase.GMPComponent.finalEvent
finalEvent
Definition: GMPBase.py:466
GaudiMP.GMPBase.MiniWriter.key
key
Definition: GMPBase.py:130
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:584
Gaudi.Configuration
Definition: Configuration.py:1
GaudiMP.GMPBase.EventCommunicator.allrecv
allrecv
Definition: GMPBase.py:284
GaudiMP.GMPBase.EventCommunicator.qoutTime
qoutTime
Definition: GMPBase.py:292
GaudiMP.GMPBase.EventCommunicator.finalize
def finalize(self)
Definition: GMPBase.py:330
GaudiMP.GMPBase.CollectHistograms.__init__
def __init__(self, gmpcomponent)
Definition: GMPBase.py:229
GaudiMP.GMPBase.TESSerializer.Dump
def Dump(self)
Definition: GMPBase.py:389
GaudiMP.GMPBase.TESSerializer.tDump
tDump
Definition: GMPBase.py:372
GaudiMP.GMPBase.Worker.writerDict
writerDict
Definition: GMPBase.py:1012
GaudiMP.GMPBase.TESSerializer.nIn
nIn
Definition: GMPBase.py:370
GaudiMP.GMPBase.EventCommunicator.qout
qout
Definition: GMPBase.py:280
GaudiMP.GMPBase.GMPComponent.Engine
def Engine(self)
Definition: GMPBase.py:534
GaudiMP.GMPBase.Coord.Stop
def Stop(self)
Definition: GMPBase.py:1463
GaudiMP.GMPBase.MiniWriter.set
def set(self, key, output)
Definition: GMPBase.py:193
GaudiMP.GMPBase.EventCommunicator.qinTime
qinTime
Definition: GMPBase.py:291
GaudiMP.GMPBase.Reader.DoFirstEvent
def DoFirstEvent(self)
Definition: GMPBase.py:724
GaudiMP.GMPBase.Coord.__init__
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1336
GaudiMP.pTools.HistoAgent
Definition: pTools.py:61
GaudiMP.GMPBase.Writer.status
status
Definition: GMPBase.py:1255
GaudiMP.GMPBase.GMPComponent.evcoms
evcoms
Definition: GMPBase.py:501
GaudiMP.GMPBase.TESSerializer.Report
def Report(self)
Definition: GMPBase.py:398
GaudiMP.GMPBase.Worker
Definition: GMPBase.py:1006
GaudiMP.GMPBase.GMPComponent.fq
fq
Definition: GMPBase.py:509
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:545
GaudiMP.GMPBase.TESSerializer.nodeType
nodeType
Definition: GMPBase.py:375
GaudiMP.GMPBase.MiniWriter.datasvcName
datasvcName
Definition: GMPBase.py:138
GaudiMP.GMPBase.GMPComponent.SetupGaudiPython
def SetupGaudiPython(self)
Definition: GMPBase.py:543
GaudiMP.GMPBase.EventCommunicator._gmpc
_gmpc
Definition: GMPBase.py:274
GaudiMP.GMPBase.GMPComponent.queues
queues
Definition: GMPBase.py:479
GaudiMP.GMPBase.Coord.histSyncEvent
histSyncEvent
Definition: GMPBase.py:1370
GaudiMP.GMPBase.Subworker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:967
GaudiMP.GMPBase.EventCommunicator.allsent
allsent
Definition: GMPBase.py:283
GaudiMP.GMPBase.MiniWriter.__init__
def __init__(self, writer, wType, config)
Definition: GMPBase.py:124
GaudiMP.GMPBase.Writer.Engine
def Engine(self)
Definition: GMPBase.py:1267
GaudiMP.GMPBase.GMPComponent.fsr
fsr
Definition: GMPBase.py:555
GaudiMP.GMPBase.EventCommunicator.nSent
nSent
Definition: GMPBase.py:287
gaudirun.type
type
Definition: gaudirun.py:160
GaudiMP.GMPBase.CollectHistograms.log
log
Definition: GMPBase.py:232
GaudiMP.GMPBase.aida2root
aida2root
Definition: GMPBase.py:84
GaudiMP.GMPBase.GMPComponent.ts
ts
Definition: GMPBase.py:558
GaudiMP.GMPBase.Coord.subworkers
subworkers
Definition: GMPBase.py:1375
GaudiMP.GMPBase.EventCommunicator
Definition: GMPBase.py:269
GaudiMP.GMPBase.Subworker.SetServices
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:928
GaudiMP.GMPBase.Coord.getQueues
def getQueues(self, nodeID)
Definition: GMPBase.py:1404
GaudiMP.GMPBase.Coord.sInit
sInit
Definition: GMPBase.py:1355
GaudiMP.GMPBase.GMPComponent.rTime
rTime
Definition: GMPBase.py:525
GaudiMP.GMPBase.MiniWriter.wName
wName
Definition: GMPBase.py:135
GaudiMP.GMPBase.Coord.getSyncEvents
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1398
GaudiMP.GMPBase.Reader.Engine
def Engine(self)
Definition: GMPBase.py:768
GaudiMP.GMPBase.GMPComponent.subworkers
subworkers
Definition: GMPBase.py:471
GaudiMP.GMPBase.Coord.Go
def Go(self)
Definition: GMPBase.py:1410
GaudiMP.GMPBase.Reader
Definition: GMPBase.py:701
GaudiPython.Bindings.InterfaceCast
Definition: Bindings.py:146
GaudiMP.GMPBase.MiniWriter.__repr__
def __repr__(self)
Definition: GMPBase.py:198
GaudiMP.GMPBase.Coord.log
log
Definition: GMPBase.py:1338
GaudiMP.GMPBase.GMPComponent.nodeType
nodeType
Definition: GMPBase.py:462
GaudiMP.GMPBase.GMPComponent.evcom
evcom
Definition: GMPBase.py:497
GaudiMP.GMPBase.Writer.writerDict
writerDict
Definition: GMPBase.py:1253
GaudiMP.GMPBase.Coord.writer
writer
Definition: GMPBase.py:1389
GaudiMP.GMPBase.TESSerializer.Load
def Load(self, tbuf)
Definition: GMPBase.py:379
GaudiMP.GMPBase.GMPComponent.Report
def Report(self)
Definition: GMPBase.py:683
GaudiMP.GMPBase.Worker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1201
GaudiPython::PyAlgorithm::finalize
StatusCode finalize() override
Definition: Algorithm.cpp:114
GaudiMP.GMPBase.EventCommunicator.sizeRecv
sizeRecv
Definition: GMPBase.py:290
GaudiMP.GMPBase.MiniWriter
Definition: GMPBase.py:115
GaudiMP.GMPBase.MiniWriter.svcOutput
svcOutput
Definition: GMPBase.py:139
GaudiMP.GMPBase.GMPComponent.Finalize
def Finalize(self)
Definition: GMPBase.py:675
GaudiMP.GMPBase.GMPComponent.a
a
Definition: GMPBase.py:546
GaudiMP.pTools
Definition: pTools.py:1
GaudiMP.GMPBase.Coord.system
system
Definition: GMPBase.py:1393
GaudiMP.GMPBase.Subworker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:958
GaudiMP.GMPBase.Worker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:1183
GaudiMP.GMPBase.Subworker.writerDict
writerDict
Definition: GMPBase.py:838
GaudiMP.GMPBase.GMPComponent.msgFormat
msgFormat
Definition: GMPBase.py:473
IOTest.start
def start
Definition: IOTest.py:113
GaudiMP.GMPBase.GMPComponent.evt
evt
Definition: GMPBase.py:553
GaudiMP.GMPBase.EventCommunicator.nRecv
nRecv
Definition: GMPBase.py:288
GaudiMP.GMPBase.CollectHistograms._gmpc
_gmpc
Definition: GMPBase.py:231
GaudiMP.GMPBase.MiniWriter.output
output
Definition: GMPBase.py:131
GaudiMP.GMPBase.GMPComponent.log
log
Definition: GMPBase.py:470
GaudiMP.GMPBase.Subworker.Engine
def Engine(self)
Definition: GMPBase.py:844
GaudiMP.GMPBase.GMPComponent.cntr
cntr
Definition: GMPBase.py:669
GaudiMP.GMPBase.GMPComponent.StartGaudiPython
def StartGaudiPython(self)
Definition: GMPBase.py:562
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:173
GaudiMP.GMPBase.TESSerializer.evt
evt
Definition: GMPBase.py:367
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:102
GaudiMP.GMPBase.GMPComponent.Start
def Start(self)
Definition: GMPBase.py:489
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiMP.GMPBase.GMPComponent.LoadTES
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:567