The Gaudi Framework  v36r1 (3e2fb5a8)
GMPBase.py
Go to the documentation of this file.
1 
11 from __future__ import print_function
12 from Gaudi.Configuration import *
13 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
14 from ROOT import TBufferFile, TBuffer
15 import multiprocessing
16 from multiprocessing import Process, Queue, JoinableQueue, Event
17 from multiprocessing import cpu_count, current_process
18 from multiprocessing.queues import Empty
19 from GaudiMP.pTools import *
20 import time
21 import sys
22 import os
23 from ROOT import TParallelMergingFile
24 # This script contains the bases for the Gaudi MultiProcessing (GMP)
25 # classes
26 
27 # There are three classes :
28 # Reader
29 # Worker
30 # Writer
31 
32 # Each class needs to perform communication with the others
33 # For this, we need a means of communication, which will be based on
34 # the python multiprocessing package
35 # This is provided in SPI pytools package
36 # cmt line : use pytools v1.1 LCG_Interfaces
37 # The PYTHONPATH env variable may need to be modified, as this might
38 # still point to 1.0_python2.5
39 
40 # Each class will need Queues, and a defined method for using these
41 # queues.
42 # For example, as long as there is something in the Queue, both ends
43 # of the queue must be open
44 # Also, there needs to be proper Termination flags and criteria
45 # The System should be error proof.
46 
47 # Constants -------------------------------------------------------------------
48 NAP = 0.001
49 MB = 1024.0 * 1024.0
50 # waits to guard against hanging, in seconds
51 WAIT_INITIALISE = 60 * 5
52 WAIT_FIRST_EVENT = 60 * 3
53 WAIT_SINGLE_EVENT = 60 * 6
54 WAIT_FINALISE = 60 * 2
55 STEP_INITIALISE = 10
56 STEP_EVENT = 2
57 STEP_FINALISE = 10
58 
59 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
60 SMAPS = False
61 
62 # -----------------------------------------------------------------------------
63 
64 # definitions
65 # ----------
66 # used to convert stored histos (in AIDA format) to ROOT format
67 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
68 
69 # used to check which type of histo we are dealing with
70 # i.e. if currentHisto in aidatypes : pass
71 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
72  gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
73  gbl.AIDA.IBaseHistogram) # extra?
74 
75 # similar to aidatypes
76 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
77 
78 # Types of OutputStream in Gaudi
79 WRITERTYPES = {
80  'EvtCollectionStream': "tuples",
81  'InputCopyStream': "events",
82  'OutputStream': "events",
83  'RecordStream': "records",
84  'RunRecordStream': "records",
85  'SequentialOutputStream': "events",
86  'TagCollectionStream': "tuples"
87 }
88 
89 # =============================================================================
90 
91 
92 class MiniWriter(object):
93  '''
94  A class to represent a writer in the GaudiPython configuration
95  It can be non-trivial to access the name of the output file; it may be
96  specified in the DataSvc, or just on the writer, may be a list, or string
97  Also, there are three different types of writer (events, records, tuples)
98  so this bootstrap class provides easy access to this info while configuring
99  '''
100 
101  def __init__(self, writer, wType, config):
102  self.w = writer
103  self.wType = wType
104  # set parameters for directly accessing the correct
105  # part of the configuration, so that later, we can do
106  # config[ key ].Output = modified(output)
107  self.key = None
108  self.output = None
109  self.ItemList = None
110  self.OptItemList = None
111  #
112  self.wName = writer.getName()
113  # Now process the Writer, find where the output is named
114  self.woutput = None
115  self.datasvcName = None
116  self.svcOutput = None
117  if hasattr(self.w, "Output"):
118  self.woutput = self.w.Output
119  self.getItemLists(config)
120  self.set(self.wName, self.w.Output)
121  return
122  else:
123  # if there is no output file, get it via the datasvc
124  # (every writer always has a datasvc property)
125  self.datasvcName = self.w.EvtDataSvc
126  datasvc = config[self.datasvcName]
127  if hasattr(datasvc, "Output"):
128  self.getItemLists(config)
129  self.set(self.datasvcName, datasvc.Output)
130  return
131 
132  def getNewName(self, replaceThis, withThis, extra=''):
133  # replace one pattern in the output name string
134  # with another, and return the Output name
135  # It *might* be in a list, so check for this
136  #
137  # @param extra : might need to add ROOT flags
138  # i.e.: OPT='RECREATE', or such
139  assert replaceThis.__class__.__name__ == 'str'
140  assert withThis.__class__.__name__ == 'str'
141  old = self.output
142  lst = False
143  if old.__class__.__name__ == 'list':
144  old = self.output[0]
145  lst = True
146  new = old.replace(replaceThis, withThis)
147  new += extra
148  if lst:
149  return [new]
150  else:
151  return new
152 
153  def getItemLists(self, config):
154  # the item list
155  if hasattr(self.w, "ItemList"):
156  self.ItemList = self.w.ItemList
157  else:
158  datasvc = config[self.w.EvtDataSvc]
159  if hasattr(datasvc, "ItemList"):
160  self.ItemList = datasvc.ItemList
161  # The option item list; possibly not a valid variable
162  if hasattr(self.w, "OptItemList"):
163  self.OptItemList = self.w.OptItemList
164  else:
165  datasvc = config[self.w.EvtDataSvc]
166  if hasattr(datasvc, "OptItemList"):
167  self.OptItemList = datasvc.OptItemList
168  return
169 
170  def set(self, key, output):
171  self.key = key
172  self.output = output
173  return
174 
175  def __repr__(self):
176  s = ""
177  line = '-' * 80
178  s += (line + '\n')
179  s += "Writer : %s\n" % (self.wName)
180  s += "Writer Type : %s\n" % (self.wType)
181  s += "Writer Output : %s\n" % (self.output)
182  s += "DataSvc : %s\n" % (self.datasvcName)
183  s += "DataSvc Output : %s\n" % (self.svcOutput)
184  s += '\n'
185  s += "Key for config : %s\n" % (self.key)
186  s += "Output File : %s\n" % (self.output)
187  s += "ItemList : %s\n" % (self.ItemList)
188  s += "OptItemList : %s\n" % (self.OptItemList)
189  s += (line + '\n')
190  return s
191 
192 
193 # =============================================================================
194 
195 
197  '''
198  GaudiPython algorithm used to clean up histos on the Reader and Workers
199  Only has a finalize method()
200  This retrieves a dictionary of path:histo objects and sends it to the
201  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
202  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
203  at the writer end, there will be an error.
204  '''
205 
206  def __init__(self, gmpcomponent):
207  PyAlgorithm.__init__(self)
208  self._gmpc = gmpcomponent
209  self.log = self._gmpc.log
210  return None
211 
212  def execute(self):
213  return SUCCESS
214 
215  def finalize(self):
216  self.log.info(
217  'CollectHistograms Finalise (%s)' % (self._gmpc.nodeType))
218  self._gmpc.hDict = self._gmpc.dumpHistograms()
219  ks = self._gmpc.hDict.keys()
220  self.log.info('%i Objects in Histogram Store' % (len(ks)))
221 
222  # crashes occurred due to Memory Error during the sending of hundreds
223  # histos on slc5 machines, so instead, break into chunks
224  # send 100 at a time
225  chunk = 100
226  reps = len(ks) / chunk + 1
227  for i in range(reps):
228  someKeys = ks[i * chunk:(i + 1) * chunk]
229  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
230  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
231  # "finished" Notification
232  self.log.debug('Signalling end of histos to Writer')
233  self._gmpc.hq.put('HISTOS_SENT')
234  self.log.debug('Waiting on Sync Event')
235  self._gmpc.sEvent.wait()
236  self.log.debug('Histo Sync Event set, clearing and returning')
237  self._gmpc.hvt.clearStore()
238  root = gbl.DataObject()
239  setOwnership(root, False)
240  self._gmpc.hvt.setRoot('/stat', root)
241  return SUCCESS
242 
243 
244 # =============================================================================
245 
246 
247 class EventCommunicator(object):
248  # This class is responsible for communicating Gaudi Events via Queues
249  # Events are communicated as TBufferFiles, filled either by the
250  # TESSerializer, or the GaudiSvc, "IPCSvc"
251  def __init__(self, GMPComponent, qin, qout):
252  self._gmpc = GMPComponent
253  self.log = self._gmpc.log
254  # maximum capacity of Queues
255  self.maxsize = 50
256  # central variables : Queues
257  self.qin = qin
258  self.qout = qout
259 
260  # flags
261  self.allsent = False
262  self.allrecv = False
263 
264  # statistics storage
265  self.nSent = 0 # counter : items sent
266  self.nRecv = 0 # counter : items received
267  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
268  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
269  self.qinTime = 0 # time : receiving from self.qin
270  self.qoutTime = 0 # time : sending on qout
271 
272  def send(self, item):
273  # This class manages the sending of a TBufferFile Event to a Queue
274  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
275  assert item.__class__.__name__ == 'tuple'
276  startTransmission = time.time()
277  self.qout.put(item)
278  # allow the background thread to feed the Queue; not 100% guaranteed to
279  # finish before next line
280  while self.qout._buffer:
281  time.sleep(NAP)
282  self.qoutTime += time.time() - startTransmission
283  self.sizeSent += item[1].Length()
284  self.nSent += 1
285  return SUCCESS
286 
287  def receive(self, timeout=None):
288  # Receive items from self.qin
289  startWait = time.time()
290  try:
291  itemIn = self.qin.get(timeout=timeout)
292  except Empty:
293  return None
294  self.qinTime += time.time() - startWait
295  self.nRecv += 1
296  if itemIn.__class__.__name__ == 'tuple':
297  self.sizeRecv += itemIn[1].Length()
298  else:
299  self.nRecv -= 1
300  try:
301  self.qin.task_done()
302  except:
303  self._gmpc.log.warning(
304  'TASK_DONE called too often by : %s' % (self._gmpc.nodeType))
305  return itemIn
306 
307  def finalize(self):
308  self.log.info('Finalize Event Communicator : %s %s' %
309  (self._gmpc, self._gmpc.nodeType))
310  # Reader sends one flag for each worker
311  # Workers send one flag each
312  # Writer sends nothing (it's at the end of the chain)
313  if self._gmpc.nodeType == 'Reader':
314  downstream = self._gmpc.nWorkers
315  elif self._gmpc.nodeType == 'Writer':
316  downstream = 0
317  elif self._gmpc.nodeType == 'Worker':
318  downstream = 1
319 
320  for i in range(downstream):
321  self.qout.put('FINISHED')
322  if self._gmpc.nodeType != 'Writer':
323  self.qout.join()
324  # Now some reporting...
325  self.statistics()
326 
327  def statistics(self):
328  self.log.name = '%s-%i Audit ' % (self._gmpc.nodeType,
329  self._gmpc.nodeID)
330  self.log.info('Items Sent : %i' % (self.nSent))
331  self.log.info('Items Received : %i' % (self.nRecv))
332  self.log.info('Data Sent : %i' % (self.sizeSent))
333  self.log.info('Data Received : %i' % (self.sizeRecv))
334  self.log.info('Q-out Time : %5.2f' % (self.qoutTime))
335  self.log.info('Q-in Time : %5.2f' % (self.qinTime))
336 
337 
338 # =============================================================================
339 
340 
341 class TESSerializer(object):
342  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
343  self.T = gaudiTESSerializer
344  self.evt = evtDataSvc
345  self.buffersIn = []
346  self.buffersOut = []
347  self.nIn = 0
348  self.nOut = 0
349  self.tDump = 0.0
350  self.tLoad = 0.0
351  # logging
352  self.nodeType = nodeType
353  self.nodeID = nodeID
354  self.log = log
355 
356  def Load(self, tbuf):
357  root = gbl.DataObject()
358  setOwnership(root, False)
359  self.evt.setRoot('/Event', root)
360  t = time.time()
361  self.T.loadBuffer(tbuf)
362  self.tLoad += (time.time() - t)
363  self.nIn += 1
364  self.buffersIn.append(tbuf.Length())
365 
366  def Dump(self):
367  t = time.time()
368  tb = TBufferFile(TBuffer.kWrite)
369  self.T.dumpBuffer(tb)
370  self.tDump += (time.time() - t)
371  self.nOut += 1
372  self.buffersOut.append(tb.Length())
373  return tb
374 
375  def Report(self):
376  evIn = "Events Loaded : %i" % (self.nIn)
377  evOut = "Events Dumped : %i" % (self.nOut)
378  din = sum(self.buffersIn)
379  dataIn = "Data Loaded : %i" % (din)
380  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
381  if self.nIn:
382  avgIn = "Avg Buf Loaded : %5.2f Mb"\
383  % (din / (self.nIn * MB))
384  maxIn = "Max Buf Loaded : %5.2f Mb"\
385  % (max(self.buffersIn) / MB)
386  else:
387  avgIn = "Avg Buf Loaded : N/A"
388  maxIn = "Max Buf Loaded : N/A"
389  dout = sum(self.buffersOut)
390  dataOut = "Data Dumped : %i" % (dout)
391  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
392  if self.nOut:
393  avgOut = "Avg Buf Dumped : %5.2f Mb"\
394  % (din / (self.nOut * MB))
395  maxOut = "Max Buf Dumped : %5.2f Mb"\
396  % (max(self.buffersOut) / MB)
397  else:
398  avgOut = "Avg Buf Dumped : N/A"
399  maxOut = "Max Buf Dumped : N/A"
400  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
401  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
402 
403  lines = evIn,\
404  evOut,\
405  dataIn,\
406  dataInMb,\
407  avgIn,\
408  maxIn,\
409  dataOut,\
410  dataOutMb,\
411  avgOut,\
412  maxOut,\
413  dumpTime,\
414  loadTime
415  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
416  for line in lines:
417  self.log.info(line)
418  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
419 
420 
421 # =============================================================================
422 
423 
424 class GMPComponent(object):
425  # This class will be the template for Reader, Worker and Writer
426  # containing all common components
427  # nodeId will be a numerical identifier for the node
428  # -1 for reader
429  # -2 for writer
430  # 0,...,nWorkers-1 for the Workers
431  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
432  # declare a Gaudi MultiProcessing Node
433  # the nodeType is going to be one of Reader, Worker, Writer
434  # qPair is going to be a tuple of ( qin, qout )
435  # for sending and receiving
436  # if nodeType is "Writer", it will be a list of qPairs,
437  # as there's one queue-in from each Worker
438  #
439  # params is a tuple of (nWorkers, config, log)
440 
441  self.nodeType = nodeType
442  current_process().name = nodeType
443 
444  # Synchronisation Event() objects for keeping track of the system
445  self.initEvent, eventLoopSyncer, self.finalEvent = events
446  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
447 
448  # necessary for knowledge of the system
449  self.nWorkers, self.sEvent, self.config, self.log = params
450  self.subworkers = subworkers
451  self.nodeID = nodeID
452  self.msgFormat = self.config['MessageSvc'].Format
453 
454  # describe the state of the node by the current Event Number
455  self.currentEvent = None
456 
457  # Unpack the Queues : (events, histos, filerecords)
458  self.queues = queues
459  self.num = 0
460 
461  ks = self.config.keys()
462  self.app = None
463  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
464  for k in list:
465  if k in ks:
466  self.app = k
467 
468  def Start(self):
469  # define the separate process
470  qPair, histq, fq = self.queues
471 
472  # Set up the Queue Mechanisms ( Event Communicators )
473  if self.nodeType == 'Reader' or self.nodeType == 'Worker':
474  # Reader or Worker Node
475  qin, qout = qPair
476  self.evcom = EventCommunicator(self, qin, qout)
477  else:
478  # Writer : many queues in, no queue out
479  assert self.nodeType == 'Writer'
480  self.evcoms = []
481  qsin = qPair[0]
482  for q in qsin:
483  ec = EventCommunicator(self, q, None)
484  self.evcoms.append(ec)
485  # Histogram Queue
486  self.hq = histq
487  # FileRecords Queue
488  self.fq = fq
489 
490  # Universal Counters (available to all nodes)
491  # Use sensibly!!!
492  self.nIn = 0
493  self.nOut = 0
494 
495  # Status Flag (possibly remove later)
496  self.stat = SUCCESS
497 
498  # Set logger name
499  self.log.name = '%s-%i' % (self.nodeType, self.nodeID)
500 
501  # Heuristic variables
502  # time for init, run, final, firstEventTime, totalTime
503  self.iTime = 0.0
504  self.rTime = 0.0
505  self.fTime = 0.0
506  self.firstEvTime = 0.0
507  self.tTime = 0.0
508 
509  self.proc = Process(target=self.Engine)
510  # Fork and start the separate process
511  self.proc.start()
512 
513  def Engine(self):
514  # This will be the method carried out by the Node
515  # Different for all
516  pass
517 
519  # Different for all ; customize Configuration for multicore
520  pass
521 
522  def SetupGaudiPython(self):
523  # This method will initialize the GaudiPython Tools
524  # such as the AppMgr and so on
525  self.a = AppMgr()
526  if SMAPS:
527  from AlgSmapShot import SmapShot
528  smapsLog = self.nodeType + '-' + str(self.nodeID) + '.smp'
529  ss = SmapShot(logname=smapsLog)
530  self.a.addAlgorithm(ss)
531  self.evt = self.a.evtsvc()
532  self.hvt = self.a.histsvc()
533  self.fsr = self.a.filerecordsvc()
534  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
535  self.pers = self.a.service('EventPersistencySvc', 'IAddressCreator')
536  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
537  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
538  self.log)
539  return SUCCESS
540 
541  def StartGaudiPython(self):
542  self.a.initialize()
543  self.a.start()
544  return SUCCESS
545 
546  def LoadTES(self, tbufferfile):
547  root = gbl.DataObject()
548  setOwnership(root, False)
549  self.evt.setRoot('/Event', root)
550  self.ts.loadBuffer(tbufferfile)
551 
552  def getEventNumber(self):
553  if self.app != 'Gauss':
554  # Using getList or getHistoNames can result in the EventSelector
555  # re-initialising connection to RootDBase, which costs a lot of
556  # time... try to build a set of Header paths??
557 
558  # First Attempt : Unpacked Event Data
559  lst = ['/Event/Gen/Header', '/Event/Rec/Header']
560  for l in lst:
561  path = l
562  try:
563  n = self.evt[path].evtNumber()
564 
565  return n
566  except:
567  # No evt number at this path
568  continue
569 
570  # second attepmt : try DAQ/RawEvent data
571  # The Evt Number is in bank type 16, bank 0, data pt 4
572  try:
573  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
574 
575  return n
576  except:
577  pass
578 
579  # Default Action
580  if self.nIn > 0 or self.nOut > 0:
581  pass
582  else:
583  self.log.warning('Could not determine Event Number')
584  return -1
585  else:
586  if self.nodeID == -1:
587  self.num = self.num + 1
588 
589  return self.num
590 
591  def IdentifyWriters(self):
592  #
593  # Identify Writers in the Configuration
594  #
595  d = {}
596  keys = ["events", "records", "tuples", "histos"]
597  for k in keys:
598  d[k] = []
599 
600  # Identify Writers and Classify
601  wkeys = WRITERTYPES.keys()
602  for v in self.config.values():
603  if v.__class__.__name__ in wkeys:
604  writerType = WRITERTYPES[v.__class__.__name__]
605  d[writerType].append(MiniWriter(v, writerType, self.config))
606  if self.nodeID == 0:
607  self.log.info('Writer Found : %s' % (v.name()))
608 
609  # Now Check for the Histogram Service
610  if 'HistogramPersistencySvc' in self.config.keys():
611  hfile = self.config['HistogramPersistencySvc'].getProp(
612  'OutputFile')
613  d["histos"].append(hfile)
614  return d
615 
616  def dumpHistograms(self):
617  '''
618  Method used by the GaudiPython algorithm CollectHistos
619  to obtain a dictionary of form { path : object }
620  representing the Histogram Store
621  '''
622  nlist = self.hvt.getHistoNames()
623  histDict = {}
624  objects = 0
625  histos = 0
626  if nlist:
627  for n in nlist:
628  o = self.hvt[n]
629  if type(o) in aidatypes:
630  o = aida2root(o)
631  histos += 1
632  else:
633  objects += 1
634  histDict[n] = o
635  else:
636  print('WARNING : no histograms to recover?')
637  return histDict
638 
639  def Initialize(self):
640  start = time.time()
641  self.processConfiguration()
642  self.SetupGaudiPython()
643  self.initEvent.set()
644  self.StartGaudiPython()
645 
646  if self.app == 'Gauss':
647 
648  tool = self.a.tool("ToolSvc.EvtCounter")
649  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
650  else:
651  self.cntr = None
652 
653  self.iTime = time.time() - start
654 
655  def Finalize(self):
656  start = time.time()
657  self.a.stop()
658  self.a.finalize()
659  self.log.info('%s-%i Finalized' % (self.nodeType, self.nodeID))
660  self.finalEvent.set()
661  self.fTime = time.time() - start
662 
663  def Report(self):
664  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
665  allTime = "Alive Time : %5.2f" % (self.tTime)
666  initTime = "Init Time : %5.2f" % (self.iTime)
667  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
668  runTime = "Run Time : %5.2f" % (self.rTime)
669  finTime = "Finalise Time : %5.2f" % (self.fTime)
670  tup = (allTime, initTime, frstTime, runTime, finTime)
671  for t in tup:
672  self.log.info(t)
673  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
674  # and report from the TESSerializer
675  self.TS.Report()
676 
677 
678 # =============================================================================
679 
680 
682  def __init__(self, queues, events, params, subworkers):
683  GMPComponent.__init__(self, 'Reader', -1, queues, events, params,
684  subworkers)
685 
687  # Reader :
688  # No algorithms
689  # No output
690  # No histos
691  self.config['ApplicationMgr'].TopAlg = []
692  self.config['ApplicationMgr'].OutStream = []
693  if "HistogramPersistencySvc" in self.config.keys():
694  self.config['HistogramPersistencySvc'].OutputFile = ''
695  self.config['MessageSvc'].Format = '%-13s ' % '[Reader]' + \
696  self.msgFormat
697  self.evtMax = self.config['ApplicationMgr'].EvtMax
698 
699  def DumpEvent(self):
700  tb = TBufferFile(TBuffer.kWrite)
701  # print '----Reader dumping Buffer!!!'
702  self.ts.dumpBuffer(tb)
703  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
704  return tb
705 
706  def DoFirstEvent(self):
707  # Do First Event ------------------------------------------------------
708  # Check Termination Criteria
709  startFirst = time.time()
710  self.log.info('Reader : First Event')
711  if self.nOut == self.evtMax:
712  self.log.info('evtMax( %i ) reached' % (self.evtMax))
713  self.lastEvent.set()
714  return SUCCESS
715  else:
716  # Continue to read, dump and send event
717  self.a.run(1)
718  if not bool(self.evt['/Event']):
719  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
720  self.lastEvent.set()
721  return SUCCESS
722  else:
723  # Popluate TESSerializer list and send Event
724  if self.app == "Gauss":
725  lst = self.evt.getHistoNames()
726  else:
727  try:
728  lst = self.evt.getList()
729  if self.app == "DaVinci":
730  daqnode = self.evt.retrieveObject(
731  '/Event/DAQ').registry()
732  setOwnership(daqnode, False)
733  self.evt.getList(daqnode, lst,
734  daqnode.address().par())
735  except:
736  self.log.critical('Reader could not acquire TES List!')
737  self.lastEvent.set()
738  return FAILURE
739  self.log.info('Reader : TES List : %i items' % (len(lst)))
740  for l in lst:
741  self.ts.addItem(l)
743  tb = self.TS.Dump()
744  self.log.info('First Event Sent')
745  self.evcom.send((self.currentEvent, tb))
746  self.nOut += 1
747  self.eventLoopSyncer.set()
748  self.evt.clearStore()
749  self.firstEvTime = time.time() - startFirst
750  return SUCCESS
751 
752  def Engine(self):
753  # rename process
754  import os
755  import ctypes
756  libc = ctypes.CDLL('libc.so.6')
757  name = str(self.nodeType) + str(self.nodeID) + '\0'
758  libc.prctl(15, name, 0, 0, 0)
759 
760  startEngine = time.time()
761  self.log.name = 'Reader'
762  self.log.info('Reader Process starting')
763 
764  self.Initialize()
765 
766  # add the Histogram Collection Algorithm
767  self.a.addAlgorithm(CollectHistograms(self))
768 
769  self.log.info('Reader Beginning Distribution')
770  sc = self.DoFirstEvent()
771  if sc.isSuccess():
772  self.log.info('Reader First Event OK')
773  else:
774  self.log.critical('Reader Failed on First Event')
775  self.stat = FAILURE
776 
777  # Do All Others -------------------------------------------------------
778  while True:
779  # Check Termination Criteria
780  if self.nOut == self.evtMax:
781  self.log.info('evtMax( %i ) reached' % (self.evtMax))
782  break
783  # Check Health
784  if not self.stat.isSuccess():
785  self.log.critical('Reader is Damaged!')
786  break
787  # Continue to read, dump and send event
788  t = time.time()
789  self.a.run(1)
790  self.rTime += (time.time() - t)
791  if not bool(self.evt['/Event']):
792  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
793  break
794  self.currentEvent = self.getEventNumber()
795  tb = self.TS.Dump()
796  self.evcom.send((self.currentEvent, tb))
797  # clean up
798  self.nOut += 1
799  self.eventLoopSyncer.set()
800  self.evt.clearStore()
801  self.log.info('Setting <Last> Event')
802  self.lastEvent.set()
803 
804  # Finalize
805  self.log.info('Reader : Event Distribution complete.')
806  self.evcom.finalize()
807  self.Finalize()
808  self.tTime = time.time() - startEngine
809  self.Report()
810 
811 
812 # =============================================================================
813 
814 
816  def __init__(self, workerID, queues, events, params, subworkers):
817  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
818  subworkers)
819  # Identify the writer streams
821  # Identify the accept/veto checks for each event
822  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
823  self.log.info("Subworker-%i Created OK" % (self.nodeID))
824  self.eventOutput = True
825 
826  def Engine(self):
827  # rename process
828  import os
829  import ctypes
830  libc = ctypes.CDLL('libc.so.6')
831  name = str(self.nodeType) + str(self.nodeID) + '\0'
832  libc.prctl(15, name, 0, 0, 0)
833 
834  self.initEvent.set()
835  startEngine = time.time()
836  msg = self.a.service('MessageSvc')
837  msg.Format = '%-13s ' % ('[' + self.log.name + ']') + self.msgFormat
838 
839  self.log.name = "Worker-%i" % (self.nodeID)
840  self.log.info("Subworker %i starting Engine" % (self.nodeID))
842 
843  # populate the TESSerializer itemlist
844  self.log.info(
845  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
846 
847  nEventWriters = len(self.writerDict["events"])
848  self.a.addAlgorithm(CollectHistograms(self))
849 
850  # Begin processing
851  Go = True
852  while Go:
853  packet = self.evcom.receive()
854  if packet:
855  pass
856  else:
857  continue
858  if packet == 'FINISHED':
859  break
860  evtNumber, tbin = packet # unpack
861  if self.cntr != None:
862 
863  self.cntr.setEventCounter(evtNumber)
864 
865  self.nIn += 1
866  self.TS.Load(tbin)
867 
868  t = time.time()
869  sc = self.a.executeEvent()
870  if self.nIn == 1:
871  self.firstEvTime = time.time() - t
872  else:
873  self.rTime += (time.time() - t)
874  if sc.isSuccess():
875  pass
876  else:
877  self.log.name = "Worker-%i" % (self.nodeID)
878  self.log.warning('Did not Execute Event')
879  self.evt.clearStore()
880  continue
881  if self.isEventPassed():
882  pass
883  else:
884  self.log.name = "Worker-%i" % (self.nodeID)
885  self.log.warning('Event did not pass : %i' % (evtNumber))
886  self.evt.clearStore()
887  continue
888  if self.eventOutput:
889  # It may be the case of generating Event Tags; hence
890  # no event output
892  tb = self.TS.Dump()
893  self.evcom.send((self.currentEvent, tb))
894  self.nOut += 1
895  self.inc.fireIncident(gbl.Incident('Subworker', 'EndEvent'))
896  self.eventLoopSyncer.set()
897  self.evt.clearStore()
898  self.log.name = "Worker-%i" % (self.nodeID)
899  self.log.info('Setting <Last> Event %s' % (self.nodeID))
900  self.lastEvent.set()
901 
902  self.evcom.finalize()
903  # Now send the FileRecords and stop/finalize the appMgr
904  self.filerecordsAgent.SendFileRecords()
905  self.tTime = time.time() - startEngine
906  self.Finalize()
907  self.Report()
908  # self.finalEvent.set()
909 
910  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
911  self.a = a
912  self.evt = evt
913  self.hvt = hvt
914  self.fsr = fsr
915  #self.inc = inc
916  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
917  self.pers = pers
918  self.ts = ts
919  self.cntr = cntr
920  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
921  self.log)
922 
923  def getCheckAlgs(self):
924  '''
925  For some output writers, a check is performed to see if the event has
926  executed certain algorithms.
927  These reside in the AcceptAlgs property for those writers
928  '''
929  acc = []
930  req = []
931  vet = []
932  for m in self.writerDict["events"]:
933  if hasattr(m.w, 'AcceptAlgs'):
934  acc += m.w.AcceptAlgs
935  if hasattr(m.w, 'RequireAlgs'):
936  req += m.w.RequireAlgs
937  if hasattr(m.w, 'VetoAlgs'):
938  vet += m.w.VetoAlgs
939  return (acc, req, vet)
940 
941  def checkExecutedPassed(self, algName):
942  if self.a.algorithm(algName)._ialg.isExecuted()\
943  and self.a.algorithm(algName)._ialg.filterPassed():
944  return True
945  else:
946  return False
947 
948  def isEventPassed(self):
949  '''
950  Check the algorithm status for an event.
951  Depending on output writer settings, the event
952  may be declined based on various criteria.
953  This is a transcript of the check that occurs in GaudiSvc::OutputStream
954  '''
955  passed = False
956 
957  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
958  if self.acceptAlgs:
959  for name in self.acceptAlgs:
960  if self.checkExecutedPassed(name):
961  passed = True
962  break
963  else:
964  passed = True
965 
966  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
967  for name in self.requireAlgs:
968  if self.checkExecutedPassed(name):
969  pass
970  else:
971  self.log.info('Evt declined (requireAlgs) : %s' % (name))
972  passed = False
973 
974  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
975  for name in self.vetoAlgs:
976  if self.checkExecutedPassed(name):
977  pass
978  else:
979  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
980  passed = False
981  return passed
982 
983 
984 # =============================================================================
985 
986 
988  def __init__(self, workerID, queues, events, params, subworkers):
989  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
990  subworkers)
991  # Identify the writer streams
993  # Identify the accept/veto checks for each event
994  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
995  self.log.name = "Worker-%i" % (self.nodeID)
996  self.log.info("Worker-%i Created OK" % (self.nodeID))
997  self.eventOutput = True
998 
1000 
1001  # Worker :
1002  # No input
1003  # No output
1004  # No Histos
1005  self.config['EventSelector'].Input = []
1006  self.config['ApplicationMgr'].OutStream = []
1007  if "HistogramPersistencySvc" in self.config.keys():
1008  self.config['HistogramPersistencySvc'].OutputFile = ''
1009  formatHead = '[Worker-%i]' % (self.nodeID)
1010  self.config['MessageSvc'].Format = '%-13s ' % formatHead + \
1011  self.msgFormat
1012 
1013  for key, lst in self.writerDict.iteritems():
1014  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1015 
1016  for m in self.writerDict["tuples"]:
1017  # rename Tuple output file with an appendix
1018  # based on worker id, for merging later
1019  newName = m.getNewName('.', '.w%i.' % (self.nodeID))
1020  self.config[m.key].Output = newName
1021 
1022  # Suppress INFO Output for all but Worker-0
1023  # if self.nodeID == 0 :
1024  # pass
1025  # else :
1026  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1027 
1028  if self.app == "Gauss":
1029  try:
1030  if "ToolSvc.EvtCounter" not in self.config:
1031  from Configurables import EvtCounter
1032  counter = EvtCounter()
1033  else:
1034  counter = self.config["ToolSvc.EvtCounter"]
1035  counter.UseIncident = False
1036  except:
1037  # ignore errors when trying to change the configuration of the EvtCounter
1038  self.log.warning('Cannot configure EvtCounter')
1039 
1040  def Engine(self):
1041 
1042  # rename process
1043  import os
1044  import ctypes
1045  libc = ctypes.CDLL('libc.so.6')
1046  name = str(self.nodeType) + str(self.nodeID) + '\0'
1047  libc.prctl(15, name, 0, 0, 0)
1048 
1049  startEngine = time.time()
1050  self.log.info("Worker %i starting Engine" % (self.nodeID))
1051  self.Initialize()
1053 
1054  # populate the TESSerializer itemlist
1055  self.log.info(
1056  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
1057 
1058  nEventWriters = len(self.writerDict["events"])
1059  if nEventWriters:
1060  itemList = set()
1061  optItemList = set()
1062  for m in self.writerDict["events"]:
1063  for item in m.ItemList:
1064  hsh = item.find('#')
1065  if hsh != -1:
1066  item = item[:hsh]
1067  itemList.add(item)
1068  for item in m.OptItemList:
1069  hsh = item.find('#')
1070  if hsh != -1:
1071  item = item[:hsh]
1072  optItemList.add(item)
1073  # If an item is mandatory and optional, keep it only in the optional list
1074  itemList -= optItemList
1075  for item in sorted(itemList):
1076  self.log.info(' adding ItemList Item to ts : %s' % (item))
1077  self.ts.addItem(item)
1078  for item in sorted(optItemList):
1079  self.log.info(' adding Optional Item to ts : %s' % (item))
1080  self.ts.addOptItem(item)
1081  else:
1082  self.log.info('There is no Event Output for this app')
1083  self.eventOutput = False
1084 
1085  # Begin processing
1086  Go = True
1087  while Go:
1088  packet = self.evcom.receive()
1089  if packet:
1090  pass
1091  else:
1092  continue
1093  if packet == 'FINISHED':
1094  break
1095  evtNumber, tbin = packet # unpack
1096  if self.cntr != None:
1097  self.cntr.setEventCounter(evtNumber)
1098 
1099  # subworkers are forked before the first event is processed
1100  if self.nIn == 0:
1101  self.log.info("Fork new subworkers")
1102 
1103  # Fork subworkers and share services
1104  for k in self.subworkers:
1105  k.SetServices(self.a, self.evt, self.hvt, self.fsr,
1106  self.inc, self.pers, self.ts, self.cntr)
1107  k.Start()
1108  self.a.addAlgorithm(CollectHistograms(self))
1109  self.nIn += 1
1110  self.TS.Load(tbin)
1111 
1112  t = time.time()
1113  sc = self.a.executeEvent()
1114  if self.nIn == 1:
1115  self.firstEvTime = time.time() - t
1116  else:
1117  self.rTime += (time.time() - t)
1118  if sc.isSuccess():
1119  pass
1120  else:
1121  self.log.warning('Did not Execute Event')
1122  self.evt.clearStore()
1123  continue
1124  if self.isEventPassed():
1125  pass
1126  else:
1127  self.log.warning('Event did not pass : %i' % (evtNumber))
1128  self.evt.clearStore()
1129  continue
1130  if self.eventOutput:
1131  # It may be the case of generating Event Tags; hence
1132  # no event output
1134  tb = self.TS.Dump()
1135  self.evcom.send((self.currentEvent, tb))
1136  self.nOut += 1
1137  self.inc.fireIncident(gbl.Incident('Worker', 'EndEvent'))
1138  self.eventLoopSyncer.set()
1139  self.evt.clearStore()
1140  self.log.info('Setting <Last> Event')
1141  self.lastEvent.set()
1142 
1143  self.evcom.finalize()
1144  self.log.info('Worker-%i Finished Processing Events' % (self.nodeID))
1145  # Now send the FileRecords and stop/finalize the appMgr
1146  self.filerecordsAgent.SendFileRecords()
1147  self.Finalize()
1148  self.tTime = time.time() - startEngine
1149  self.Report()
1150 
1151  for k in self.subworkers:
1152  self.log.info('Join subworkers')
1153  k.proc.join()
1154 
1155  def getCheckAlgs(self):
1156  '''
1157  For some output writers, a check is performed to see if the event has
1158  executed certain algorithms.
1159  These reside in the AcceptAlgs property for those writers
1160  '''
1161  acc = []
1162  req = []
1163  vet = []
1164  for m in self.writerDict["events"]:
1165  if hasattr(m.w, 'AcceptAlgs'):
1166  acc += m.w.AcceptAlgs
1167  if hasattr(m.w, 'RequireAlgs'):
1168  req += m.w.RequireAlgs
1169  if hasattr(m.w, 'VetoAlgs'):
1170  vet += m.w.VetoAlgs
1171  return (acc, req, vet)
1172 
1173  def checkExecutedPassed(self, algName):
1174  if self.a.algorithm(algName)._ialg.isExecuted()\
1175  and self.a.algorithm(algName)._ialg.filterPassed():
1176  return True
1177  else:
1178  return False
1179 
1180  def isEventPassed(self):
1181  '''
1182  Check the algorithm status for an event.
1183  Depending on output writer settings, the event
1184  may be declined based on various criteria.
1185  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1186  '''
1187  passed = False
1188 
1189  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1190  if self.acceptAlgs:
1191  for name in self.acceptAlgs:
1192  if self.checkExecutedPassed(name):
1193  passed = True
1194  break
1195  else:
1196  passed = True
1197 
1198  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
1199  for name in self.requireAlgs:
1200  if self.checkExecutedPassed(name):
1201  pass
1202  else:
1203  self.log.info('Evt declined (requireAlgs) : %s' % (name))
1204  passed = False
1205 
1206  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
1207  for name in self.vetoAlgs:
1208  if self.checkExecutedPassed(name):
1209  pass
1210  else:
1211  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
1212  passed = False
1213  return passed
1214 
1215 
1216 # =============================================================================
1217 
1218 
1220  def __init__(self, queues, events, params, subworkers):
1221  GMPComponent.__init__(self, 'Writer', -2, queues, events, params,
1222  subworkers)
1223  # Identify the writer streams
1225  # This keeps track of workers as they finish
1226  self.status = [False] * self.nWorkers
1227  self.log.name = "Writer--2"
1228 
1230  # Writer :
1231  # No input
1232  # No Algs
1233  self.config['ApplicationMgr'].TopAlg = []
1234  self.config['EventSelector'].Input = []
1235 
1236  self.config['MessageSvc'].Format = '%-13s ' % '[Writer]' + \
1237  self.msgFormat
1238 
1239  def Engine(self):
1240  # rename process
1241  import os
1242  import ctypes
1243  libc = ctypes.CDLL('libc.so.6')
1244  name = str(self.nodeType) + str(self.nodeID) + '\0'
1245  libc.prctl(15, name, 0, 0, 0)
1246 
1247  startEngine = time.time()
1248  self.Initialize()
1249  self.histoAgent = HistoAgent(self)
1251 
1252  # Begin processing
1253  Go = True
1254  current = -1
1255  stopCriteria = self.nWorkers
1256  while Go:
1257  current = (current + 1) % self.nWorkers
1258  packet = self.evcoms[current].receive(timeout=0.01)
1259  if packet == None:
1260  continue
1261  if packet == 'FINISHED':
1262  self.log.info(
1263  'Writer got FINISHED flag : Worker %i' % (current))
1264 
1265  self.status[current] = True
1266  if all(self.status):
1267  self.log.info('FINISHED recd from all workers, break loop')
1268  break
1269  continue
1270  # otherwise, continue as normal
1271  self.nIn += 1 # update central count (maybe needed by FSR store)
1272  evtNumber, tbin = packet # unpack
1273  self.TS.Load(tbin)
1274  t = time.time()
1275  self.a.executeEvent()
1276  self.rTime += (time.time() - t)
1278  self.evt.clearStore()
1279  self.eventLoopSyncer.set()
1280  self.log.name = "Writer--2"
1281  self.log.info('Setting <Last> Event')
1282  self.lastEvent.set()
1283 
1284  # finalisation steps
1285  [e.finalize() for e in self.evcoms]
1286  # Now do Histograms
1287  sc = self.histoAgent.Receive()
1288  sc = self.histoAgent.RebuildHistoStore()
1289  if sc.isSuccess():
1290  self.log.info('Histo Store rebuilt ok')
1291  else:
1292  self.log.warning('Histo Store Error in Rebuild')
1293 
1294  # Now do FileRecords
1295  sc = self.filerecordsAgent.Receive()
1296  self.filerecordsAgent.Rebuild()
1297  self.Finalize()
1298  #self.rTime = time.time()-startEngine
1299  self.Report()
1300 
1301 
1302 # =============================================================================
1303 
1304 # =============================================================================
1305 
1306 
1307 class Coord(object):
1308  def __init__(self, nWorkers, config, log):
1309 
1310  self.log = log
1311  self.config = config
1312  # set up Logging
1313  self.log.name = 'GaudiPython-Parallel-Logger'
1314  self.log.info('GaudiPython Parallel Process Co-ordinator beginning')
1315 
1316  if nWorkers == -1:
1317  # The user has requested all available cpus in the machine
1318  self.nWorkers = cpu_count()
1319  else:
1320  self.nWorkers = nWorkers
1321 
1322  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1323  self.hq = JoinableQueue() # for Histogram data
1324  self.fq = JoinableQueue() # for FileRecords data
1325 
1326  # Make a Syncer for Initalise, Run, and Finalise
1327  self.sInit = Syncer(
1328  self.nWorkers,
1329  self.log,
1330  limit=WAIT_INITIALISE,
1331  step=STEP_INITIALISE)
1332  self.sRun = Syncer(
1333  self.nWorkers,
1334  self.log,
1335  manyEvents=True,
1336  limit=WAIT_SINGLE_EVENT,
1337  step=STEP_EVENT,
1338  firstEvent=WAIT_FIRST_EVENT)
1339  self.sFin = Syncer(
1340  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1341  # and one final one for Histogram Transfer
1342  self.histSyncEvent = Event()
1343 
1344  # params are common to al subprocesses
1345  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1346 
1347  self.subworkers = []
1348  # Declare SubProcesses!
1349  for i in range(1, self.nWorkers):
1350  sub = Subworker(i, self.getQueues(i), self.getSyncEvents(i),
1351  params, self.subworkers)
1352  self.subworkers.append(sub)
1353  self.reader = Reader(
1354  self.getQueues(-1), self.getSyncEvents(-1), params,
1355  self.subworkers)
1356  self.workers = []
1357  wk = Worker(0, self.getQueues(0), self.getSyncEvents(0), params,
1358  self.subworkers)
1359  self.writer = Writer(
1360  self.getQueues(-2), self.getSyncEvents(-2), params,
1361  self.subworkers)
1362 
1363  self.system = []
1364  self.system.append(self.writer)
1365  self.system.append(wk)
1366  self.system.append(self.reader)
1367 
1368  def getSyncEvents(self, nodeID):
1369  init = self.sInit.d[nodeID].event
1370  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1371  fin = self.sFin.d[nodeID].event
1372  return (init, run, fin)
1373 
1374  def getQueues(self, nodeID):
1375  eventQ = self.qs[nodeID]
1376  histQ = self.hq
1377  fsrQ = self.fq
1378  return (eventQ, histQ, fsrQ)
1379 
1380  def Go(self):
1381 
1382  # Initialise
1383  self.log.name = 'GaudiPython-Parallel-Logger'
1384  self.log.info('INITIALISING SYSTEM')
1385 
1386  # Start reader, writer and main worker
1387  for p in self.system:
1388  p.Start()
1389 
1390  sc = self.sInit.syncAll(step="Initialise")
1391  if sc == SUCCESS:
1392  pass
1393  else:
1394  self.Terminate()
1395  return FAILURE
1396 
1397  # Run
1398  self.log.name = 'GaudiPython-Parallel-Logger'
1399  self.log.info('RUNNING SYSTEM')
1400  sc = self.sRun.syncAll(step="Run")
1401  if sc == SUCCESS:
1402  pass
1403  else:
1404  self.Terminate()
1405  return FAILURE
1406 
1407  # Finalise
1408  self.log.name = 'GaudiPython-Parallel-Logger'
1409  self.log.info('FINALISING SYSTEM')
1410  sc = self.sFin.syncAll(step="Finalise")
1411  if sc == SUCCESS:
1412  pass
1413  else:
1414  self.Terminate()
1415  return FAILURE
1416 
1417  # if we've got this far, finally report SUCCESS
1418  self.log.info("Cleanly join all Processes")
1419  self.Stop()
1420  self.log.info("Report Total Success to Main.py")
1421  return SUCCESS
1422 
1423  def Terminate(self):
1424  # Brutally kill sub-processes
1425  children = multiprocessing.active_children()
1426  for i in children:
1427  i.terminate()
1428 
1429  # self.writer.proc.terminate()
1430  #[ w.proc.terminate() for w in self.workers]
1431  # self.reader.proc.terminate()
1432 
1433  def Stop(self):
1434  # procs should be joined in reverse order to launch
1435  self.system.reverse()
1436  for s in self.system:
1437  s.proc.join()
1438  return SUCCESS
1439 
1440  def SetupQueues(self):
1441  # This method will set up the network of Queues needed
1442  # N Queues = nWorkers + 1
1443  # Each Worker has a Queue in, and a Queue out
1444  # Reader has Queue out only
1445  # Writer has nWorkers Queues in
1446 
1447  # one queue from Reader-Workers
1448  rwk = JoinableQueue()
1449  # one queue from each worker to writer
1450  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1451  d = {}
1452  d[-1] = (None, rwk) # Reader
1453  d[-2] = (workersWriter, None) # Writer
1454  for i in range(self.nWorkers):
1455  d[i] = (rwk, workersWriter[i])
1456  return d
1457 
1458 
1459 # ============================= EOF ===========================================
GaudiMP.GMPBase.CollectHistograms
Definition: GMPBase.py:196
GaudiMP.GMPBase.Coord.workers
workers
Definition: GMPBase.py:1356
AlgSequencer.all
all
Definition: AlgSequencer.py:54
GaudiMP.GMPBase.GMPComponent.num
num
Definition: GMPBase.py:459
GaudiMP.GMPBase.TESSerializer.nOut
nOut
Definition: GMPBase.py:348
GaudiMP.GMPBase.TESSerializer.__init__
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:342
GaudiMP.GMPBase.Coord.fq
fq
Definition: GMPBase.py:1324
GaudiMP.GMPBase.EventCommunicator.receive
def receive(self, timeout=None)
Definition: GMPBase.py:287
GaudiMP.GMPBase.GMPComponent.nodeID
nodeID
Definition: GMPBase.py:451
GaudiMP.GMPBase.GMPComponent.__init__
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:431
GaudiMP.GMPBase.MiniWriter.OptItemList
OptItemList
Definition: GMPBase.py:110
GaudiMP.GMPBase.EventCommunicator.log
log
Definition: GMPBase.py:253
GaudiMP.GMPBase.GMPComponent.tTime
tTime
Definition: GMPBase.py:507
GaudiMP.GMPBase.Coord.SetupQueues
def SetupQueues(self)
Definition: GMPBase.py:1440
GaudiMP.pTools.FileRecordsAgent
Definition: pTools.py:219
GaudiMP.GMPBase.GMPComponent.getEventNumber
def getEventNumber(self)
Definition: GMPBase.py:552
GaudiMP.GMPBase.Writer.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:1229
GaudiMP.GMPBase.GMPComponent.IdentifyWriters
def IdentifyWriters(self)
Definition: GMPBase.py:591
GaudiMP.GMPBase.TESSerializer.tLoad
tLoad
Definition: GMPBase.py:350
GaudiPython::PyAlgorithm::execute
StatusCode execute() override
Definition: Algorithm.cpp:110
GaudiMP.GMPBase.EventCommunicator.send
def send(self, item)
Definition: GMPBase.py:272
GaudiMP.GMPBase.Worker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1052
GaudiMP.GMPBase.Coord.sRun
sRun
Definition: GMPBase.py:1332
GaudiMP.GMPBase.TESSerializer.buffersIn
buffersIn
Definition: GMPBase.py:345
GaudiMP.GMPBase.GMPComponent.TS
TS
Definition: GMPBase.py:537
GaudiMP.GMPBase.GMPComponent.stat
stat
Definition: GMPBase.py:496
GaudiMP.GMPBase.Subworker.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:841
GaudiMP.GMPBase.GMPComponent.dumpHistograms
def dumpHistograms(self)
Definition: GMPBase.py:616
GaudiMP.GMPBase.EventCommunicator.sizeSent
sizeSent
Definition: GMPBase.py:267
reverse
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
GaudiMP.GMPBase.TESSerializer
Definition: GMPBase.py:341
GaudiMP.GMPBase.GMPComponent.nIn
nIn
Definition: GMPBase.py:492
GaudiMP.GMPBase.MiniWriter.wType
wType
Definition: GMPBase.py:103
GaudiMP.GMPBase.GMPComponent
Definition: GMPBase.py:424
GaudiMP.GMPBase.Worker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:994
GaudiMP.GMPBase.Reader.evtMax
evtMax
Definition: GMPBase.py:697
GaudiMP.GMPBase.MiniWriter.ItemList
ItemList
Definition: GMPBase.py:109
GaudiMP.GMPBase.EventCommunicator.maxsize
maxsize
Definition: GMPBase.py:255
GaudiMP.GMPBase.EventCommunicator.statistics
def statistics(self)
Definition: GMPBase.py:327
GaudiMP.GMPBase.Worker.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:999
GaudiMP.GMPBase.Coord.qs
qs
Definition: GMPBase.py:1322
GaudiMP.GMPBase.GMPComponent.Initialize
def Initialize(self)
Definition: GMPBase.py:639
GaudiMP.GMPBase.Subworker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:923
GaudiMP.GMPBase.Coord.nWorkers
nWorkers
Definition: GMPBase.py:1318
GaudiPython.Bindings.AppMgr
Definition: Bindings.py:842
GaudiMP.GMPBase.EventCommunicator.__init__
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:251
GaudiMP.GMPBase.GMPComponent.fTime
fTime
Definition: GMPBase.py:505
GaudiMP.GMPBase.Writer
Definition: GMPBase.py:1219
max
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:225
GaudiMP.GMPBase.Coord
Definition: GMPBase.py:1307
GaudiPython::PyAlgorithm
Definition: Algorithm.h:40
GaudiMP.GMPBase.Subworker
Definition: GMPBase.py:815
GaudiMP.GMPBase.GMPComponent.hq
hq
Definition: GMPBase.py:486
GaudiMP.GMPBase.Coord.hq
hq
Definition: GMPBase.py:1323
GaudiPython.Bindings.setOwnership
setOwnership
Definition: Bindings.py:114
GaudiMP.GMPBase.Reader.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:686
GaudiMP.GMPBase.Coord.Terminate
def Terminate(self)
Definition: GMPBase.py:1423
GaudiMP.GMPBase.Worker.Engine
def Engine(self)
Definition: GMPBase.py:1040
GaudiMP.GMPBase.Worker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:988
GaudiMP.GMPBase.Writer.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1220
IOTest.start
start
Definition: IOTest.py:108
GaudiMP.GMPBase.Subworker.__init__
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:816
GaudiMP.GMPBase.Subworker.vetoAlgs
vetoAlgs
Definition: GMPBase.py:822
GaudiMP.GMPBase.GMPComponent.pers
pers
Definition: GMPBase.py:535
GaudiMP.GMPBase.GMPComponent.hvt
hvt
Definition: GMPBase.py:532
GaudiMP.GMPBase.TESSerializer.T
T
Definition: GMPBase.py:343
GaudiMP.GMPBase.TESSerializer.log
log
Definition: GMPBase.py:354
GaudiMP.GMPBase.Worker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:1180
GaudiMP.GMPBase.GMPComponent.iTime
iTime
Definition: GMPBase.py:503
GaudiMP.GMPBase.TESSerializer.nodeID
nodeID
Definition: GMPBase.py:353
GaudiMP.GMPBase.GMPComponent.currentEvent
currentEvent
Definition: GMPBase.py:455
GaudiMP.GMPBase.GMPComponent.app
app
Definition: GMPBase.py:462
compareOutputFiles.par
string par
Definition: compareOutputFiles.py:477
GaudiMP.GMPBase.Writer.histoAgent
histoAgent
Definition: GMPBase.py:1249
GaudiMP.GMPBase.GMPComponent.firstEvTime
firstEvTime
Definition: GMPBase.py:506
GaudiMP.GMPBase.EventCommunicator.qin
qin
Definition: GMPBase.py:257
GaudiMP.GMPBase.Reader.DumpEvent
def DumpEvent(self)
Definition: GMPBase.py:699
GaudiMP.GMPBase.TESSerializer.buffersOut
buffersOut
Definition: GMPBase.py:346
GaudiPluginService.cpluginsvc.registry
def registry()
Definition: cpluginsvc.py:82
GaudiMP.GMPBase.MiniWriter.woutput
woutput
Definition: GMPBase.py:114
GaudiMP.GMPBase.GMPComponent.proc
proc
Definition: GMPBase.py:509
GaudiMP.GMPBase.Coord.config
config
Definition: GMPBase.py:1311
bug_34121.tool
tool
Definition: bug_34121.py:17
GaudiMP.GMPBase.GMPComponent.inc
inc
Definition: GMPBase.py:534
Gaudi::Functional::details::get
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
Definition: FunctionalDetails.h:391
GaudiMP.GMPBase.MiniWriter.getItemLists
def getItemLists(self, config)
Definition: GMPBase.py:153
GaudiMP.GMPBase.GMPComponent.processConfiguration
def processConfiguration(self)
Definition: GMPBase.py:518
GaudiMP.pTools.Syncer
Definition: pTools.py:601
GaudiMP.GMPBase.Reader.__init__
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:682
GaudiMP.GMPBase.MiniWriter.w
w
Definition: GMPBase.py:102
GaudiMP.GMPBase.Writer.filerecordsAgent
filerecordsAgent
Definition: GMPBase.py:1250
GaudiMP.GMPBase.Coord.sFin
sFin
Definition: GMPBase.py:1339
GaudiMP.GMPBase.Coord.reader
reader
Definition: GMPBase.py:1353
GaudiMP.GMPBase.Subworker.eventOutput
eventOutput
Definition: GMPBase.py:824
GaudiMP.GMPBase.GMPComponent.lastEvent
lastEvent
Definition: GMPBase.py:446
GaudiMP.GMPBase.GMPComponent.nOut
nOut
Definition: GMPBase.py:493
GaudiMP.GMPBase.Worker.eventOutput
eventOutput
Definition: GMPBase.py:997
GaudiMP.GMPBase.GMPComponent.finalEvent
finalEvent
Definition: GMPBase.py:445
GaudiMP.GMPBase.MiniWriter.key
key
Definition: GMPBase.py:107
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:562
Gaudi.Configuration
Definition: Configuration.py:1
GaudiMP.GMPBase.EventCommunicator.allrecv
allrecv
Definition: GMPBase.py:262
GaudiMP.GMPBase.EventCommunicator.qoutTime
qoutTime
Definition: GMPBase.py:270
GaudiMP.GMPBase.EventCommunicator.finalize
def finalize(self)
Definition: GMPBase.py:307
GaudiMP.GMPBase.CollectHistograms.__init__
def __init__(self, gmpcomponent)
Definition: GMPBase.py:206
GaudiMP.GMPBase.TESSerializer.Dump
def Dump(self)
Definition: GMPBase.py:366
GaudiMP.GMPBase.TESSerializer.tDump
tDump
Definition: GMPBase.py:349
GaudiMP.GMPBase.Worker.writerDict
writerDict
Definition: GMPBase.py:992
GaudiMP.GMPBase.TESSerializer.nIn
nIn
Definition: GMPBase.py:347
GaudiMP.GMPBase.EventCommunicator.qout
qout
Definition: GMPBase.py:258
GaudiMP.GMPBase.GMPComponent.Engine
def Engine(self)
Definition: GMPBase.py:513
GaudiMP.GMPBase.Coord.Stop
def Stop(self)
Definition: GMPBase.py:1433
GaudiMP.GMPBase.MiniWriter.set
def set(self, key, output)
Definition: GMPBase.py:170
GaudiMP.GMPBase.EventCommunicator.qinTime
qinTime
Definition: GMPBase.py:269
GaudiMP.GMPBase.Reader.DoFirstEvent
def DoFirstEvent(self)
Definition: GMPBase.py:706
GaudiMP.GMPBase.Coord.__init__
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1308
GaudiMP.pTools.HistoAgent
Definition: pTools.py:60
GaudiMP.GMPBase.Writer.status
status
Definition: GMPBase.py:1226
GaudiMP.GMPBase.GMPComponent.evcoms
evcoms
Definition: GMPBase.py:480
GaudiMP.GMPBase.TESSerializer.Report
def Report(self)
Definition: GMPBase.py:375
GaudiMP.GMPBase.Worker
Definition: GMPBase.py:987
GaudiMP.GMPBase.GMPComponent.fq
fq
Definition: GMPBase.py:488
GaudiPython.Pythonizations.iteritems
iteritems
Definition: Pythonizations.py:525
GaudiMP.GMPBase.TESSerializer.nodeType
nodeType
Definition: GMPBase.py:352
GaudiMP.GMPBase.MiniWriter.datasvcName
datasvcName
Definition: GMPBase.py:115
GaudiMP.GMPBase.GMPComponent.SetupGaudiPython
def SetupGaudiPython(self)
Definition: GMPBase.py:522
GaudiMP.GMPBase.EventCommunicator._gmpc
_gmpc
Definition: GMPBase.py:252
GaudiMP.GMPBase.GMPComponent.queues
queues
Definition: GMPBase.py:458
GaudiMP.GMPBase.Coord.histSyncEvent
histSyncEvent
Definition: GMPBase.py:1342
GaudiMP.GMPBase.Subworker.isEventPassed
def isEventPassed(self)
Definition: GMPBase.py:948
GaudiMP.GMPBase.EventCommunicator.allsent
allsent
Definition: GMPBase.py:261
GaudiMP.GMPBase.MiniWriter.__init__
def __init__(self, writer, wType, config)
Definition: GMPBase.py:101
GaudiMP.GMPBase.Writer.Engine
def Engine(self)
Definition: GMPBase.py:1239
GaudiMP.GMPBase.GMPComponent.fsr
fsr
Definition: GMPBase.py:533
GaudiMP.GMPBase.EventCommunicator.nSent
nSent
Definition: GMPBase.py:265
gaudirun.type
type
Definition: gaudirun.py:154
GaudiMP.GMPBase.CollectHistograms.log
log
Definition: GMPBase.py:209
GaudiMP.GMPBase.aida2root
aida2root
Definition: GMPBase.py:67
GaudiMP.GMPBase.GMPComponent.ts
ts
Definition: GMPBase.py:536
GaudiMP.GMPBase.Coord.subworkers
subworkers
Definition: GMPBase.py:1347
GaudiMP.GMPBase.EventCommunicator
Definition: GMPBase.py:247
GaudiMP.GMPBase.Subworker.SetServices
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:910
GaudiMP.GMPBase.Coord.getQueues
def getQueues(self, nodeID)
Definition: GMPBase.py:1374
GaudiMP.GMPBase.Coord.sInit
sInit
Definition: GMPBase.py:1327
GaudiMP.GMPBase.GMPComponent.rTime
rTime
Definition: GMPBase.py:504
GaudiMP.GMPBase.MiniWriter.wName
wName
Definition: GMPBase.py:112
GaudiMP.GMPBase.Coord.getSyncEvents
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1368
GaudiMP.GMPBase.Reader.Engine
def Engine(self)
Definition: GMPBase.py:752
GaudiMP.GMPBase.GMPComponent.subworkers
subworkers
Definition: GMPBase.py:450
GaudiMP.GMPBase.Coord.Go
def Go(self)
Definition: GMPBase.py:1380
GaudiMP.GMPBase.Reader
Definition: GMPBase.py:681
GaudiPython.Bindings.InterfaceCast
Definition: Bindings.py:124
GaudiMP.GMPBase.MiniWriter.__repr__
def __repr__(self)
Definition: GMPBase.py:175
GaudiMP.GMPBase.Coord.log
log
Definition: GMPBase.py:1310
GaudiMP.GMPBase.GMPComponent.nodeType
nodeType
Definition: GMPBase.py:441
GaudiMP.GMPBase.GMPComponent.evcom
evcom
Definition: GMPBase.py:476
GaudiMP.GMPBase.Writer.writerDict
writerDict
Definition: GMPBase.py:1224
GaudiMP.GMPBase.Coord.writer
writer
Definition: GMPBase.py:1359
GaudiMP.GMPBase.TESSerializer.Load
def Load(self, tbuf)
Definition: GMPBase.py:356
GaudiMP.GMPBase.GMPComponent.Report
def Report(self)
Definition: GMPBase.py:663
GaudiMP.GMPBase.Worker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1173
GaudiPython::PyAlgorithm::finalize
StatusCode finalize() override
Definition: Algorithm.cpp:114
GaudiMP.GMPBase.EventCommunicator.sizeRecv
sizeRecv
Definition: GMPBase.py:268
GaudiMP.GMPBase.MiniWriter
Definition: GMPBase.py:92
GaudiMP.GMPBase.MiniWriter.svcOutput
svcOutput
Definition: GMPBase.py:116
GaudiMP.GMPBase.GMPComponent.Finalize
def Finalize(self)
Definition: GMPBase.py:655
GaudiMP.GMPBase.GMPComponent.a
a
Definition: GMPBase.py:525
GaudiMP.pTools
Definition: pTools.py:1
GaudiMP.GMPBase.Coord.system
system
Definition: GMPBase.py:1363
GaudiMP.GMPBase.Subworker.checkExecutedPassed
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:941
GaudiMP.GMPBase.Worker.getCheckAlgs
def getCheckAlgs(self)
Definition: GMPBase.py:1155
GaudiMP.GMPBase.Subworker.writerDict
writerDict
Definition: GMPBase.py:820
GaudiMP.GMPBase.GMPComponent.msgFormat
msgFormat
Definition: GMPBase.py:452
GaudiMP.GMPBase.GMPComponent.evt
evt
Definition: GMPBase.py:531
GaudiMP.GMPBase.EventCommunicator.nRecv
nRecv
Definition: GMPBase.py:266
GaudiMP.GMPBase.CollectHistograms._gmpc
_gmpc
Definition: GMPBase.py:208
GaudiMP.GMPBase.MiniWriter.output
output
Definition: GMPBase.py:108
GaudiMP.GMPBase.MiniWriter.getNewName
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:132
GaudiMP.GMPBase.GMPComponent.log
log
Definition: GMPBase.py:449
GaudiMP.GMPBase.Subworker.Engine
def Engine(self)
Definition: GMPBase.py:826
GaudiMP.GMPBase.GMPComponent.cntr
cntr
Definition: GMPBase.py:649
GaudiMP.GMPBase.GMPComponent.StartGaudiPython
def StartGaudiPython(self)
Definition: GMPBase.py:541
Gaudi::Functional::details::put
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
Definition: FunctionalDetails.h:147
GaudiMP.GMPBase.TESSerializer.evt
evt
Definition: GMPBase.py:344
Gaudi::Functional::details::zip::range
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
Definition: FunctionalDetails.h:97
GaudiMP.GMPBase.GMPComponent.Start
def Start(self)
Definition: GMPBase.py:468
StringKeyEx.keys
list keys
Definition: StringKeyEx.py:67
GaudiMP.GMPBase.GMPComponent.LoadTES
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:546