Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT import TBufferFile, TBuffer
4 import multiprocessing
5 from multiprocessing import Process, Queue, JoinableQueue, Event
6 from multiprocessing import cpu_count, current_process
7 from multiprocessing.queues import Empty
8 from pTools import *
9 import time
10 import sys
11 import os
12 from ROOT import TParallelMergingFile
13 # This script contains the bases for the Gaudi MultiProcessing (GMP)
14 # classes
15 
16 # There are three classes :
17 # Reader
18 # Worker
19 # Writer
20 
21 # Each class needs to perform communication with the others
22 # For this, we need a means of communication, which will be based on
23 # the python multiprocessing package
24 # This is provided in SPI pytools package
25 # cmt line : use pytools v1.1 LCG_Interfaces
26 # The PYTHONPATH env variable may need to be modified, as this might
27 # still point to 1.0_python2.5
28 
29 # Each class will need Queues, and a defined method for using these
30 # queues.
31 # For example, as long as there is something in the Queue, both ends
32 # of the queue must be open
33 # Also, there needs to be proper Termination flags and criteria
34 # The System should be error proof.
35 
36 # Constants -------------------------------------------------------------------
37 NAP = 0.001
38 MB = 1024.0 * 1024.0
39 # waits to guard against hanging, in seconds
40 WAIT_INITIALISE = 60 * 5
41 WAIT_FIRST_EVENT = 60 * 3
42 WAIT_SINGLE_EVENT = 60 * 6
43 WAIT_FINALISE = 60 * 2
44 STEP_INITIALISE = 10
45 STEP_EVENT = 2
46 STEP_FINALISE = 10
47 
48 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
49 SMAPS = False
50 
51 # -----------------------------------------------------------------------------
52 
53 # definitions
54 # ----------
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # used to check which type of histo we are dealing with
59 # i.e. if currentHisto in aidatypes : pass
60 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
61  gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
62  gbl.AIDA.IBaseHistogram) # extra?
63 
64 # similar to aidatypes
65 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
66 
67 # Types of OutputStream in Gaudi
68 WRITERTYPES = {
69  'EvtCollectionStream': "tuples",
70  'InputCopyStream': "events",
71  'OutputStream': "events",
72  'RecordStream': "records",
73  'RunRecordStream': "records",
74  'SequentialOutputStream': "events",
75  'TagCollectionStream': "tuples"
76 }
77 
78 # =============================================================================
79 
80 
81 class MiniWriter(object):
82  '''
83  A class to represent a writer in the GaudiPython configuration
84  It can be non-trivial to access the name of the output file; it may be
85  specified in the DataSvc, or just on the writer, may be a list, or string
86  Also, there are three different types of writer (events, records, tuples)
87  so this bootstrap class provides easy access to this info while configuring
88  '''
89 
90  def __init__(self, writer, wType, config):
91  self.w = writer
92  self.wType = wType
93  # set parameters for directly accessing the correct
94  # part of the configuration, so that later, we can do
95  # config[ key ].Output = modified(output)
96  self.key = None
97  self.output = None
98  self.ItemList = None
99  self.OptItemList = None
100  #
101  self.wName = writer.getName()
102  # Now process the Writer, find where the output is named
103  self.woutput = None
104  self.datasvcName = None
105  self.svcOutput = None
106  if hasattr(self.w, "Output"):
107  self.woutput = self.w.Output
108  self.getItemLists(config)
109  self.set(self.wName, self.w.Output)
110  return
111  else:
112  # if there is no output file, get it via the datasvc
113  # (every writer always has a datasvc property)
114  self.datasvcName = self.w.EvtDataSvc
115  datasvc = config[self.datasvcName]
116  if hasattr(datasvc, "Output"):
117  self.getItemLists(config)
118  self.set(self.datasvcName, datasvc.Output)
119  return
120 
121  def getNewName(self, replaceThis, withThis, extra=''):
122  # replace one pattern in the output name string
123  # with another, and return the Output name
124  # It *might* be in a list, so check for this
125  #
126  # @param extra : might need to add ROOT flags
127  # i.e.: OPT='RECREATE', or such
128  assert replaceThis.__class__.__name__ == 'str'
129  assert withThis.__class__.__name__ == 'str'
130  old = self.output
131  lst = False
132  if old.__class__.__name__ == 'list':
133  old = self.output[0]
134  lst = True
135  new = old.replace(replaceThis, withThis)
136  new += extra
137  if lst:
138  return [new]
139  else:
140  return new
141 
142  def getItemLists(self, config):
143  # the item list
144  if hasattr(self.w, "ItemList"):
145  self.ItemList = self.w.ItemList
146  else:
147  datasvc = config[self.w.EvtDataSvc]
148  if hasattr(datasvc, "ItemList"):
149  self.ItemList = datasvc.ItemList
150  # The option item list; possibly not a valid variable
151  if hasattr(self.w, "OptItemList"):
152  self.OptItemList = self.w.OptItemList
153  else:
154  datasvc = config[self.w.EvtDataSvc]
155  if hasattr(datasvc, "OptItemList"):
156  self.OptItemList = datasvc.OptItemList
157  return
158 
159  def set(self, key, output):
160  self.key = key
161  self.output = output
162  return
163 
164  def __repr__(self):
165  s = ""
166  line = '-' * 80
167  s += (line + '\n')
168  s += "Writer : %s\n" % (self.wName)
169  s += "Writer Type : %s\n" % (self.wType)
170  s += "Writer Output : %s\n" % (self.output)
171  s += "DataSvc : %s\n" % (self.datasvcName)
172  s += "DataSvc Output : %s\n" % (self.svcOutput)
173  s += '\n'
174  s += "Key for config : %s\n" % (self.key)
175  s += "Output File : %s\n" % (self.output)
176  s += "ItemList : %s\n" % (self.ItemList)
177  s += "OptItemList : %s\n" % (self.OptItemList)
178  s += (line + '\n')
179  return s
180 
181 
182 # =============================================================================
183 
184 
186  '''
187  GaudiPython algorithm used to clean up histos on the Reader and Workers
188  Only has a finalize method()
189  This retrieves a dictionary of path:histo objects and sends it to the
190  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
191  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
192  at the writer end, there will be an error.
193  '''
194 
195  def __init__(self, gmpcomponent):
196  PyAlgorithm.__init__(self)
197  self._gmpc = gmpcomponent
198  self.log = self._gmpc.log
199  return None
200 
201  def execute(self):
202  return SUCCESS
203 
204  def finalize(self):
205  self.log.info(
206  'CollectHistograms Finalise (%s)' % (self._gmpc.nodeType))
207  self._gmpc.hDict = self._gmpc.dumpHistograms()
208  ks = self._gmpc.hDict.keys()
209  self.log.info('%i Objects in Histogram Store' % (len(ks)))
210 
211  # crashes occurred due to Memory Error during the sending of hundreds
212  # histos on slc5 machines, so instead, break into chunks
213  # send 100 at a time
214  chunk = 100
215  reps = len(ks) / chunk + 1
216  for i in xrange(reps):
217  someKeys = ks[i * chunk:(i + 1) * chunk]
218  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
219  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
220  # "finished" Notification
221  self.log.debug('Signalling end of histos to Writer')
222  self._gmpc.hq.put('HISTOS_SENT')
223  self.log.debug('Waiting on Sync Event')
224  self._gmpc.sEvent.wait()
225  self.log.debug('Histo Sync Event set, clearing and returning')
226  self._gmpc.hvt.clearStore()
227  root = gbl.DataObject()
228  setOwnership(root, False)
229  self._gmpc.hvt.setRoot('/stat', root)
230  return SUCCESS
231 
232 
233 # =============================================================================
234 
235 
236 class EventCommunicator(object):
237  # This class is responsible for communicating Gaudi Events via Queues
238  # Events are communicated as TBufferFiles, filled either by the
239  # TESSerializer, or the GaudiSvc, "IPCSvc"
240  def __init__(self, GMPComponent, qin, qout):
241  self._gmpc = GMPComponent
242  self.log = self._gmpc.log
243  # maximum capacity of Queues
244  self.maxsize = 50
245  # central variables : Queues
246  self.qin = qin
247  self.qout = qout
248 
249  # flags
250  self.allsent = False
251  self.allrecv = False
252 
253  # statistics storage
254  self.nSent = 0 # counter : items sent
255  self.nRecv = 0 # counter : items received
256  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
257  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
258  self.qinTime = 0 # time : receiving from self.qin
259  self.qoutTime = 0 # time : sending on qout
260 
261  def send(self, item):
262  # This class manages the sending of a TBufferFile Event to a Queue
263  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
264  assert item.__class__.__name__ == 'tuple'
265  startTransmission = time.time()
266  self.qout.put(item)
267  # allow the background thread to feed the Queue; not 100% guaranteed to
268  # finish before next line
269  while self.qout._buffer:
270  time.sleep(NAP)
271  self.qoutTime += time.time() - startTransmission
272  self.sizeSent += item[1].Length()
273  self.nSent += 1
274  return SUCCESS
275 
276  def receive(self, timeout=None):
277  # Receive items from self.qin
278  startWait = time.time()
279  try:
280  itemIn = self.qin.get(timeout=timeout)
281  except Empty:
282  return None
283  self.qinTime += time.time() - startWait
284  self.nRecv += 1
285  if itemIn.__class__.__name__ == 'tuple':
286  self.sizeRecv += itemIn[1].Length()
287  else:
288  self.nRecv -= 1
289  try:
290  self.qin.task_done()
291  except:
292  self._gmpc.log.warning(
293  'TASK_DONE called too often by : %s' % (self._gmpc.nodeType))
294  return itemIn
295 
296  def finalize(self):
297  self.log.info('Finalize Event Communicator : %s %s' %
298  (self._gmpc, self._gmpc.nodeType))
299  # Reader sends one flag for each worker
300  # Workers send one flag each
301  # Writer sends nothing (it's at the end of the chain)
302  if self._gmpc.nodeType == 'Reader':
303  downstream = self._gmpc.nWorkers
304  elif self._gmpc.nodeType == 'Writer':
305  downstream = 0
306  elif self._gmpc.nodeType == 'Worker':
307  downstream = 1
308 
309  for i in xrange(downstream):
310  self.qout.put('FINISHED')
311  if self._gmpc.nodeType != 'Writer':
312  self.qout.join()
313  # Now some reporting...
314  self.statistics()
315 
316  def statistics(self):
317  self.log.name = '%s-%i Audit ' % (self._gmpc.nodeType,
318  self._gmpc.nodeID)
319  self.log.info('Items Sent : %i' % (self.nSent))
320  self.log.info('Items Received : %i' % (self.nRecv))
321  self.log.info('Data Sent : %i' % (self.sizeSent))
322  self.log.info('Data Received : %i' % (self.sizeRecv))
323  self.log.info('Q-out Time : %5.2f' % (self.qoutTime))
324  self.log.info('Q-in Time : %5.2f' % (self.qinTime))
325 
326 
327 # =============================================================================
328 
329 
330 class TESSerializer(object):
331  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
332  self.T = gaudiTESSerializer
333  self.evt = evtDataSvc
334  self.buffersIn = []
335  self.buffersOut = []
336  self.nIn = 0
337  self.nOut = 0
338  self.tDump = 0.0
339  self.tLoad = 0.0
340  # logging
341  self.nodeType = nodeType
342  self.nodeID = nodeID
343  self.log = log
344 
345  def Load(self, tbuf):
346  root = gbl.DataObject()
347  setOwnership(root, False)
348  self.evt.setRoot('/Event', root)
349  t = time.time()
350  self.T.loadBuffer(tbuf)
351  self.tLoad += (time.time() - t)
352  self.nIn += 1
353  self.buffersIn.append(tbuf.Length())
354 
355  def Dump(self):
356  t = time.time()
357  tb = TBufferFile(TBuffer.kWrite)
358  self.T.dumpBuffer(tb)
359  self.tDump += (time.time() - t)
360  self.nOut += 1
361  self.buffersOut.append(tb.Length())
362  return tb
363 
364  def Report(self):
365  evIn = "Events Loaded : %i" % (self.nIn)
366  evOut = "Events Dumped : %i" % (self.nOut)
367  din = sum(self.buffersIn)
368  dataIn = "Data Loaded : %i" % (din)
369  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
370  if self.nIn:
371  avgIn = "Avg Buf Loaded : %5.2f Mb"\
372  % (din / (self.nIn * MB))
373  maxIn = "Max Buf Loaded : %5.2f Mb"\
374  % (max(self.buffersIn) / MB)
375  else:
376  avgIn = "Avg Buf Loaded : N/A"
377  maxIn = "Max Buf Loaded : N/A"
378  dout = sum(self.buffersOut)
379  dataOut = "Data Dumped : %i" % (dout)
380  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
381  if self.nOut:
382  avgOut = "Avg Buf Dumped : %5.2f Mb"\
383  % (din / (self.nOut * MB))
384  maxOut = "Max Buf Dumped : %5.2f Mb"\
385  % (max(self.buffersOut) / MB)
386  else:
387  avgOut = "Avg Buf Dumped : N/A"
388  maxOut = "Max Buf Dumped : N/A"
389  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
390  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
391 
392  lines = evIn,\
393  evOut,\
394  dataIn,\
395  dataInMb,\
396  avgIn,\
397  maxIn,\
398  dataOut,\
399  dataOutMb,\
400  avgOut,\
401  maxOut,\
402  dumpTime,\
403  loadTime
404  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
405  for line in lines:
406  self.log.info(line)
407  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
408 
409 
410 # =============================================================================
411 
412 
413 class GMPComponent(object):
414  # This class will be the template for Reader, Worker and Writer
415  # containing all common components
416  # nodeId will be a numerical identifier for the node
417  # -1 for reader
418  # -2 for writer
419  # 0,...,nWorkers-1 for the Workers
420  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
421  # declare a Gaudi MultiProcessing Node
422  # the nodeType is going to be one of Reader, Worker, Writer
423  # qPair is going to be a tuple of ( qin, qout )
424  # for sending and receiving
425  # if nodeType is "Writer", it will be a list of qPairs,
426  # as there's one queue-in from each Worker
427  #
428  # params is a tuple of (nWorkers, config, log)
429 
430  self.nodeType = nodeType
431  current_process().name = nodeType
432 
433  # Synchronisation Event() objects for keeping track of the system
434  self.initEvent, eventLoopSyncer, self.finalEvent = events
435  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
436 
437  # necessary for knowledge of the system
438  self.nWorkers, self.sEvent, self.config, self.log = params
439  self.subworkers = subworkers
440  self.nodeID = nodeID
441  self.msgFormat = self.config['MessageSvc'].Format
442 
443  # describe the state of the node by the current Event Number
444  self.currentEvent = None
445 
446  # Unpack the Queues : (events, histos, filerecords)
447  self.queues = queues
448  self.num = 0
449 
450  ks = self.config.keys()
451  self.app = None
452  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
453  for k in list:
454  if k in ks:
455  self.app = k
456 
457  def Start(self):
458  # define the separate process
459  qPair, histq, fq = self.queues
460 
461  # Set up the Queue Mechanisms ( Event Communicators )
462  if self.nodeType == 'Reader' or self.nodeType == 'Worker':
463  # Reader or Worker Node
464  qin, qout = qPair
465  self.evcom = EventCommunicator(self, qin, qout)
466  else:
467  # Writer : many queues in, no queue out
468  assert self.nodeType == 'Writer'
469  self.evcoms = []
470  qsin = qPair[0]
471  for q in qsin:
472  ec = EventCommunicator(self, q, None)
473  self.evcoms.append(ec)
474  # Histogram Queue
475  self.hq = histq
476  # FileRecords Queue
477  self.fq = fq
478 
479  # Universal Counters (available to all nodes)
480  # Use sensibly!!!
481  self.nIn = 0
482  self.nOut = 0
483 
484  # Status Flag (possibly remove later)
485  self.stat = SUCCESS
486 
487  # Set logger name
488  self.log.name = '%s-%i' % (self.nodeType, self.nodeID)
489 
490  # Heuristic variables
491  # time for init, run, final, firstEventTime, totalTime
492  self.iTime = 0.0
493  self.rTime = 0.0
494  self.fTime = 0.0
495  self.firstEvTime = 0.0
496  self.tTime = 0.0
497 
498  self.proc = Process(target=self.Engine)
499  # Fork and start the separate process
500  self.proc.start()
501 
502  def Engine(self):
503  # This will be the method carried out by the Node
504  # Different for all
505  pass
506 
508  # Different for all ; customize Configuration for multicore
509  pass
510 
511  def SetupGaudiPython(self):
512  # This method will initialize the GaudiPython Tools
513  # such as the AppMgr and so on
514  self.a = AppMgr()
515  if SMAPS:
516  from AlgSmapShot import SmapShot
517  smapsLog = self.nodeType + '-' + str(self.nodeID) + '.smp'
518  ss = SmapShot(logname=smapsLog)
519  self.a.addAlgorithm(ss)
520  self.evt = self.a.evtsvc()
521  self.hvt = self.a.histsvc()
522  self.fsr = self.a.filerecordsvc()
523  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
524  self.pers = self.a.service('EventPersistencySvc', 'IAddressCreator')
525  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
526  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
527  self.log)
528  return SUCCESS
529 
530  def StartGaudiPython(self):
531  self.a.initialize()
532  self.a.start()
533  return SUCCESS
534 
535  def LoadTES(self, tbufferfile):
536  root = gbl.DataObject()
537  setOwnership(root, False)
538  self.evt.setRoot('/Event', root)
539  self.ts.loadBuffer(tbufferfile)
540 
541  def getEventNumber(self):
542  if self.app != 'Gauss':
543  # Using getList or getHistoNames can result in the EventSelector
544  # re-initialising connection to RootDBase, which costs a lot of
545  # time... try to build a set of Header paths??
546 
547  # First Attempt : Unpacked Event Data
548  lst = ['/Event/Gen/Header', '/Event/Rec/Header']
549  for l in lst:
550  path = l
551  try:
552  n = self.evt[path].evtNumber()
553 
554  return n
555  except:
556  # No evt number at this path
557  continue
558 
559  # second attepmt : try DAQ/RawEvent data
560  # The Evt Number is in bank type 16, bank 0, data pt 4
561  try:
562  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
563 
564  return n
565  except:
566  pass
567 
568  # Default Action
569  if self.nIn > 0 or self.nOut > 0:
570  pass
571  else:
572  self.log.warning('Could not determine Event Number')
573  return -1
574  else:
575  if self.nodeID == -1:
576  self.num = self.num + 1
577 
578  return self.num
579 
580  def IdentifyWriters(self):
581  #
582  # Identify Writers in the Configuration
583  #
584  d = {}
585  keys = ["events", "records", "tuples", "histos"]
586  for k in keys:
587  d[k] = []
588 
589  # Identify Writers and Classify
590  wkeys = WRITERTYPES.keys()
591  for v in self.config.values():
592  if v.__class__.__name__ in wkeys:
593  writerType = WRITERTYPES[v.__class__.__name__]
594  d[writerType].append(MiniWriter(v, writerType, self.config))
595  if self.nodeID == 0:
596  self.log.info('Writer Found : %s' % (v.name()))
597 
598  # Now Check for the Histogram Service
599  if 'HistogramPersistencySvc' in self.config.keys():
600  hfile = self.config['HistogramPersistencySvc'].getProp(
601  'OutputFile')
602  d["histos"].append(hfile)
603  return d
604 
605  def dumpHistograms(self):
606  '''
607  Method used by the GaudiPython algorithm CollectHistos
608  to obtain a dictionary of form { path : object }
609  representing the Histogram Store
610  '''
611  nlist = self.hvt.getHistoNames()
612  histDict = {}
613  objects = 0
614  histos = 0
615  if nlist:
616  for n in nlist:
617  o = self.hvt[n]
618  if type(o) in aidatypes:
619  o = aida2root(o)
620  histos += 1
621  else:
622  objects += 1
623  histDict[n] = o
624  else:
625  print 'WARNING : no histograms to recover?'
626  return histDict
627 
628  def Initialize(self):
629  start = time.time()
630  self.processConfiguration()
631  self.SetupGaudiPython()
632  self.initEvent.set()
633  self.StartGaudiPython()
634 
635  if self.app == 'Gauss':
636 
637  tool = self.a.tool("ToolSvc.EvtCounter")
638  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
639  else:
640  self.cntr = None
641 
642  self.iTime = time.time() - start
643 
644  def Finalize(self):
645  start = time.time()
646  self.a.stop()
647  self.a.finalize()
648  self.log.info('%s-%i Finalized' % (self.nodeType, self.nodeID))
649  self.finalEvent.set()
650  self.fTime = time.time() - start
651 
652  def Report(self):
653  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
654  allTime = "Alive Time : %5.2f" % (self.tTime)
655  initTime = "Init Time : %5.2f" % (self.iTime)
656  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
657  runTime = "Run Time : %5.2f" % (self.rTime)
658  finTime = "Finalise Time : %5.2f" % (self.fTime)
659  tup = (allTime, initTime, frstTime, runTime, finTime)
660  for t in tup:
661  self.log.info(t)
662  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
663  # and report from the TESSerializer
664  self.TS.Report()
665 
666 
667 # =============================================================================
668 
669 
671  def __init__(self, queues, events, params, subworkers):
672  GMPComponent.__init__(self, 'Reader', -1, queues, events, params,
673  subworkers)
674 
676  # Reader :
677  # No algorithms
678  # No output
679  # No histos
680  self.config['ApplicationMgr'].TopAlg = []
681  self.config['ApplicationMgr'].OutStream = []
682  if "HistogramPersistencySvc" in self.config.keys():
683  self.config['HistogramPersistencySvc'].OutputFile = ''
684  self.config['MessageSvc'].Format = '%-13s ' % '[Reader]' + \
685  self.msgFormat
686  self.evtMax = self.config['ApplicationMgr'].EvtMax
687 
688  def DumpEvent(self):
689  tb = TBufferFile(TBuffer.kWrite)
690  # print '----Reader dumping Buffer!!!'
691  self.ts.dumpBuffer(tb)
692  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
693  return tb
694 
695  def DoFirstEvent(self):
696  # Do First Event ------------------------------------------------------
697  # Check Termination Criteria
698  startFirst = time.time()
699  self.log.info('Reader : First Event')
700  if self.nOut == self.evtMax:
701  self.log.info('evtMax( %i ) reached' % (self.evtMax))
702  self.lastEvent.set()
703  return SUCCESS
704  else:
705  # Continue to read, dump and send event
706  self.a.run(1)
707  if not bool(self.evt['/Event']):
708  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
709  self.lastEvent.set()
710  return SUCCESS
711  else:
712  # Popluate TESSerializer list and send Event
713  if self.app == "Gauss":
714  lst = self.evt.getHistoNames()
715  else:
716  try:
717  lst = self.evt.getList()
718  if self.app == "DaVinci":
719  daqnode = self.evt.retrieveObject(
720  '/Event/DAQ').registry()
721  setOwnership(daqnode, False)
722  self.evt.getList(daqnode, lst,
723  daqnode.address().par())
724  except:
725  self.log.critical('Reader could not acquire TES List!')
726  self.lastEvent.set()
727  return FAILURE
728  self.log.info('Reader : TES List : %i items' % (len(lst)))
729  for l in lst:
730  self.ts.addItem(l)
732  tb = self.TS.Dump()
733  self.log.info('First Event Sent')
734  self.evcom.send((self.currentEvent, tb))
735  self.nOut += 1
736  self.eventLoopSyncer.set()
737  self.evt.clearStore()
738  self.firstEvTime = time.time() - startFirst
739  return SUCCESS
740 
741  def Engine(self):
742  # rename process
743  import os
744  import ctypes
745  libc = ctypes.CDLL('libc.so.6')
746  name = str(self.nodeType) + str(self.nodeID) + '\0'
747  libc.prctl(15, name, 0, 0, 0)
748 
749  startEngine = time.time()
750  self.log.name = 'Reader'
751  self.log.info('Reader Process starting')
752 
753  self.Initialize()
754 
755  # add the Histogram Collection Algorithm
756  self.a.addAlgorithm(CollectHistograms(self))
757 
758  self.log.info('Reader Beginning Distribution')
759  sc = self.DoFirstEvent()
760  if sc.isSuccess():
761  self.log.info('Reader First Event OK')
762  else:
763  self.log.critical('Reader Failed on First Event')
764  self.stat = FAILURE
765 
766  # Do All Others -------------------------------------------------------
767  while True:
768  # Check Termination Criteria
769  if self.nOut == self.evtMax:
770  self.log.info('evtMax( %i ) reached' % (self.evtMax))
771  break
772  # Check Health
773  if not self.stat.isSuccess():
774  self.log.critical('Reader is Damaged!')
775  break
776  # Continue to read, dump and send event
777  t = time.time()
778  self.a.run(1)
779  self.rTime += (time.time() - t)
780  if not bool(self.evt['/Event']):
781  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
782  break
783  self.currentEvent = self.getEventNumber()
784  tb = self.TS.Dump()
785  self.evcom.send((self.currentEvent, tb))
786  # clean up
787  self.nOut += 1
788  self.eventLoopSyncer.set()
789  self.evt.clearStore()
790  self.log.info('Setting <Last> Event')
791  self.lastEvent.set()
792 
793  # Finalize
794  self.log.info('Reader : Event Distribution complete.')
795  self.evcom.finalize()
796  self.Finalize()
797  self.tTime = time.time() - startEngine
798  self.Report()
799 
800 
801 # =============================================================================
802 
803 
805  def __init__(self, workerID, queues, events, params, subworkers):
806  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
807  subworkers)
808  # Identify the writer streams
810  # Identify the accept/veto checks for each event
811  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
812  self.log.info("Subworker-%i Created OK" % (self.nodeID))
813  self.eventOutput = True
814 
815  def Engine(self):
816  # rename process
817  import os
818  import ctypes
819  libc = ctypes.CDLL('libc.so.6')
820  name = str(self.nodeType) + str(self.nodeID) + '\0'
821  libc.prctl(15, name, 0, 0, 0)
822 
823  self.initEvent.set()
824  startEngine = time.time()
825  msg = self.a.service('MessageSvc')
826  msg.Format = '%-13s ' % ('[' + self.log.name + ']') + self.msgFormat
827 
828  self.log.name = "Worker-%i" % (self.nodeID)
829  self.log.info("Subworker %i starting Engine" % (self.nodeID))
831 
832  # populate the TESSerializer itemlist
833  self.log.info(
834  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
835 
836  nEventWriters = len(self.writerDict["events"])
837  self.a.addAlgorithm(CollectHistograms(self))
838 
839  # Begin processing
840  Go = True
841  while Go:
842  packet = self.evcom.receive()
843  if packet:
844  pass
845  else:
846  continue
847  if packet == 'FINISHED':
848  break
849  evtNumber, tbin = packet # unpack
850  if self.cntr != None:
851 
852  self.cntr.setEventCounter(evtNumber)
853 
854  self.nIn += 1
855  self.TS.Load(tbin)
856 
857  t = time.time()
858  sc = self.a.executeEvent()
859  if self.nIn == 1:
860  self.firstEvTime = time.time() - t
861  else:
862  self.rTime += (time.time() - t)
863  if sc.isSuccess():
864  pass
865  else:
866  self.log.name = "Worker-%i" % (self.nodeID)
867  self.log.warning('Did not Execute Event')
868  self.evt.clearStore()
869  continue
870  if self.isEventPassed():
871  pass
872  else:
873  self.log.name = "Worker-%i" % (self.nodeID)
874  self.log.warning('Event did not pass : %i' % (evtNumber))
875  self.evt.clearStore()
876  continue
877  if self.eventOutput:
878  # It may be the case of generating Event Tags; hence
879  # no event output
881  tb = self.TS.Dump()
882  self.evcom.send((self.currentEvent, tb))
883  self.nOut += 1
884  self.inc.fireIncident(gbl.Incident('Subworker', 'EndEvent'))
885  self.eventLoopSyncer.set()
886  self.evt.clearStore()
887  self.log.name = "Worker-%i" % (self.nodeID)
888  self.log.info('Setting <Last> Event %s' % (self.nodeID))
889  self.lastEvent.set()
890 
891  self.evcom.finalize()
892  # Now send the FileRecords and stop/finalize the appMgr
893  self.filerecordsAgent.SendFileRecords()
894  self.tTime = time.time() - startEngine
895  self.Finalize()
896  self.Report()
897  # self.finalEvent.set()
898 
899  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
900  self.a = a
901  self.evt = evt
902  self.hvt = hvt
903  self.fsr = fsr
904  #self.inc = inc
905  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
906  self.pers = pers
907  self.ts = ts
908  self.cntr = cntr
909  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
910  self.log)
911 
912  def getCheckAlgs(self):
913  '''
914  For some output writers, a check is performed to see if the event has
915  executed certain algorithms.
916  These reside in the AcceptAlgs property for those writers
917  '''
918  acc = []
919  req = []
920  vet = []
921  for m in self.writerDict["events"]:
922  if hasattr(m.w, 'AcceptAlgs'):
923  acc += m.w.AcceptAlgs
924  if hasattr(m.w, 'RequireAlgs'):
925  req += m.w.RequireAlgs
926  if hasattr(m.w, 'VetoAlgs'):
927  vet += m.w.VetoAlgs
928  return (acc, req, vet)
929 
930  def checkExecutedPassed(self, algName):
931  if self.a.algorithm(algName)._ialg.isExecuted()\
932  and self.a.algorithm(algName)._ialg.filterPassed():
933  return True
934  else:
935  return False
936 
937  def isEventPassed(self):
938  '''
939  Check the algorithm status for an event.
940  Depending on output writer settings, the event
941  may be declined based on various criteria.
942  This is a transcript of the check that occurs in GaudiSvc::OutputStream
943  '''
944  passed = False
945 
946  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
947  if self.acceptAlgs:
948  for name in self.acceptAlgs:
949  if self.checkExecutedPassed(name):
950  passed = True
951  break
952  else:
953  passed = True
954 
955  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
956  for name in self.requireAlgs:
957  if self.checkExecutedPassed(name):
958  pass
959  else:
960  self.log.info('Evt declined (requireAlgs) : %s' % (name))
961  passed = False
962 
963  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
964  for name in self.vetoAlgs:
965  if self.checkExecutedPassed(name):
966  pass
967  else:
968  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
969  passed = False
970  return passed
971 
972 
973 # =============================================================================
974 
975 
977  def __init__(self, workerID, queues, events, params, subworkers):
978  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
979  subworkers)
980  # Identify the writer streams
982  # Identify the accept/veto checks for each event
983  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
984  self.log.name = "Worker-%i" % (self.nodeID)
985  self.log.info("Worker-%i Created OK" % (self.nodeID))
986  self.eventOutput = True
987 
989 
990  # Worker :
991  # No input
992  # No output
993  # No Histos
994  self.config['EventSelector'].Input = []
995  self.config['ApplicationMgr'].OutStream = []
996  if "HistogramPersistencySvc" in self.config.keys():
997  self.config['HistogramPersistencySvc'].OutputFile = ''
998  formatHead = '[Worker-%i]' % (self.nodeID)
999  self.config['MessageSvc'].Format = '%-13s ' % formatHead + \
1000  self.msgFormat
1001 
1002  for key, lst in self.writerDict.iteritems():
1003  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1004 
1005  for m in self.writerDict["tuples"]:
1006  # rename Tuple output file with an appendix
1007  # based on worker id, for merging later
1008  newName = m.getNewName('.', '.w%i.' % (self.nodeID))
1009  self.config[m.key].Output = newName
1010 
1011  # Suppress INFO Output for all but Worker-0
1012  # if self.nodeID == 0 :
1013  # pass
1014  # else :
1015  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1016 
1017  if self.app == "Gauss":
1018  try:
1019  if "ToolSvc.EvtCounter" not in self.config:
1020  from Configurables import EvtCounter
1021  counter = EvtCounter()
1022  else:
1023  counter = self.config["ToolSvc.EvtCounter"]
1024  counter.UseIncident = False
1025  except:
1026  # ignore errors when trying to change the configuration of the EvtCounter
1027  self.log.warning('Cannot configure EvtCounter')
1028 
1029  def Engine(self):
1030 
1031  # rename process
1032  import os
1033  import ctypes
1034  libc = ctypes.CDLL('libc.so.6')
1035  name = str(self.nodeType) + str(self.nodeID) + '\0'
1036  libc.prctl(15, name, 0, 0, 0)
1037 
1038  startEngine = time.time()
1039  self.log.info("Worker %i starting Engine" % (self.nodeID))
1040  self.Initialize()
1042 
1043  # populate the TESSerializer itemlist
1044  self.log.info(
1045  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
1046 
1047  nEventWriters = len(self.writerDict["events"])
1048  if nEventWriters:
1049  itemList = set()
1050  optItemList = set()
1051  for m in self.writerDict["events"]:
1052  for item in m.ItemList:
1053  hsh = item.find('#')
1054  if hsh != -1:
1055  item = item[:hsh]
1056  itemList.add(item)
1057  for item in m.OptItemList:
1058  hsh = item.find('#')
1059  if hsh != -1:
1060  item = item[:hsh]
1061  optItemList.add(item)
1062  # If an item is mandatory and optional, keep it only in the optional list
1063  itemList -= optItemList
1064  for item in sorted(itemList):
1065  self.log.info(' adding ItemList Item to ts : %s' % (item))
1066  self.ts.addItem(item)
1067  for item in sorted(optItemList):
1068  self.log.info(' adding Optional Item to ts : %s' % (item))
1069  self.ts.addOptItem(item)
1070  else:
1071  self.log.info('There is no Event Output for this app')
1072  self.eventOutput = False
1073 
1074  # Begin processing
1075  Go = True
1076  while Go:
1077  packet = self.evcom.receive()
1078  if packet:
1079  pass
1080  else:
1081  continue
1082  if packet == 'FINISHED':
1083  break
1084  evtNumber, tbin = packet # unpack
1085  if self.cntr != None:
1086  self.cntr.setEventCounter(evtNumber)
1087 
1088  # subworkers are forked before the first event is processed
1089  # reader-thread for ConDB must be closed and reopened in each subworker
1090  # this is done by disconnect()
1091  if self.nIn == 0:
1092 
1093  self.log.info("Fork new subworkers and disconnect from CondDB")
1094  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1095  condDB.disconnect()
1096 
1097  # Fork subworkers and share services
1098  for k in self.subworkers:
1099  k.SetServices(self.a, self.evt, self.hvt, self.fsr,
1100  self.inc, self.pers, self.ts, self.cntr)
1101  k.Start()
1102  self.a.addAlgorithm(CollectHistograms(self))
1103  self.nIn += 1
1104  self.TS.Load(tbin)
1105 
1106  t = time.time()
1107  sc = self.a.executeEvent()
1108  if self.nIn == 1:
1109  self.firstEvTime = time.time() - t
1110  else:
1111  self.rTime += (time.time() - t)
1112  if sc.isSuccess():
1113  pass
1114  else:
1115  self.log.warning('Did not Execute Event')
1116  self.evt.clearStore()
1117  continue
1118  if self.isEventPassed():
1119  pass
1120  else:
1121  self.log.warning('Event did not pass : %i' % (evtNumber))
1122  self.evt.clearStore()
1123  continue
1124  if self.eventOutput:
1125  # It may be the case of generating Event Tags; hence
1126  # no event output
1128  tb = self.TS.Dump()
1129  self.evcom.send((self.currentEvent, tb))
1130  self.nOut += 1
1131  self.inc.fireIncident(gbl.Incident('Worker', 'EndEvent'))
1132  self.eventLoopSyncer.set()
1133  self.evt.clearStore()
1134  self.log.info('Setting <Last> Event')
1135  self.lastEvent.set()
1136 
1137  self.evcom.finalize()
1138  self.log.info('Worker-%i Finished Processing Events' % (self.nodeID))
1139  # Now send the FileRecords and stop/finalize the appMgr
1140  self.filerecordsAgent.SendFileRecords()
1141  self.Finalize()
1142  self.tTime = time.time() - startEngine
1143  self.Report()
1144 
1145  for k in self.subworkers:
1146  self.log.info('Join subworkers')
1147  k.proc.join()
1148 
1149  def getCheckAlgs(self):
1150  '''
1151  For some output writers, a check is performed to see if the event has
1152  executed certain algorithms.
1153  These reside in the AcceptAlgs property for those writers
1154  '''
1155  acc = []
1156  req = []
1157  vet = []
1158  for m in self.writerDict["events"]:
1159  if hasattr(m.w, 'AcceptAlgs'):
1160  acc += m.w.AcceptAlgs
1161  if hasattr(m.w, 'RequireAlgs'):
1162  req += m.w.RequireAlgs
1163  if hasattr(m.w, 'VetoAlgs'):
1164  vet += m.w.VetoAlgs
1165  return (acc, req, vet)
1166 
1167  def checkExecutedPassed(self, algName):
1168  if self.a.algorithm(algName)._ialg.isExecuted()\
1169  and self.a.algorithm(algName)._ialg.filterPassed():
1170  return True
1171  else:
1172  return False
1173 
1174  def isEventPassed(self):
1175  '''
1176  Check the algorithm status for an event.
1177  Depending on output writer settings, the event
1178  may be declined based on various criteria.
1179  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1180  '''
1181  passed = False
1182 
1183  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1184  if self.acceptAlgs:
1185  for name in self.acceptAlgs:
1186  if self.checkExecutedPassed(name):
1187  passed = True
1188  break
1189  else:
1190  passed = True
1191 
1192  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
1193  for name in self.requireAlgs:
1194  if self.checkExecutedPassed(name):
1195  pass
1196  else:
1197  self.log.info('Evt declined (requireAlgs) : %s' % (name))
1198  passed = False
1199 
1200  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
1201  for name in self.vetoAlgs:
1202  if self.checkExecutedPassed(name):
1203  pass
1204  else:
1205  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
1206  passed = False
1207  return passed
1208 
1209 
1210 # =============================================================================
1211 
1212 
1214  def __init__(self, queues, events, params, subworkers):
1215  GMPComponent.__init__(self, 'Writer', -2, queues, events, params,
1216  subworkers)
1217  # Identify the writer streams
1219  # This keeps track of workers as they finish
1220  self.status = [False] * self.nWorkers
1221  self.log.name = "Writer--2"
1222 
1224  # Writer :
1225  # No input
1226  # No Algs
1227  self.config['ApplicationMgr'].TopAlg = []
1228  self.config['EventSelector'].Input = []
1229 
1230  self.config['MessageSvc'].Format = '%-13s ' % '[Writer]' + \
1231  self.msgFormat
1232 
1233  # Now process the output writers
1234  for key, lst in self.writerDict.iteritems():
1235  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1236 
1237  # Modify the name of the output file to reflect that it came
1238  # from a parallel processing
1239  #
1240  # Event Writers
1241  for m in self.writerDict["events"]:
1242  self.log.debug('Processing Event Writer : %s' % (m))
1243  newName = m.getNewName('.', '.p%i.' % self.nWorkers)
1244  self.config[m.key].Output = newName
1245 
1246  # Now, if there are no event writers, the FileRecords file
1247  # will fail to open, as it only opens an UPDATE version
1248  # of the existing Event Output File
1249  # So, if there are no event writers, edit the string of the
1250  # FileRecord Writer
1251 
1252  # FileRecords Writers
1253  for m in self.writerDict["records"]:
1254  self.log.debug('Processing FileRecords Writer: %s' % (m))
1255  newName = m.getNewName(
1256  '.', '.p%i.' % self.nWorkers, extra=" OPT='RECREATE'")
1257  self.config[m.key].Output = newName
1258 
1259  # same for histos
1260  hs = "HistogramPersistencySvc"
1261  n = None
1262  if hs in self.config.keys():
1263  n = self.config[hs].OutputFile
1264  if n:
1265  newName = self.config[hs].OutputFile.replace(
1266  '.', '.p%i.' % (self.nWorkers))
1267  self.config[hs].OutputFile = newName
1268 
1269  def Engine(self):
1270  # rename process
1271  import os
1272  import ctypes
1273  libc = ctypes.CDLL('libc.so.6')
1274  name = str(self.nodeType) + str(self.nodeID) + '\0'
1275  libc.prctl(15, name, 0, 0, 0)
1276 
1277  startEngine = time.time()
1278  self.Initialize()
1279  self.histoAgent = HistoAgent(self)
1281 
1282  # Begin processing
1283  Go = True
1284  current = -1
1285  stopCriteria = self.nWorkers
1286  while Go:
1287  current = (current + 1) % self.nWorkers
1288  packet = self.evcoms[current].receive(timeout=0.01)
1289  if packet == None:
1290  continue
1291  if packet == 'FINISHED':
1292  self.log.info(
1293  'Writer got FINISHED flag : Worker %i' % (current))
1294 
1295  self.status[current] = True
1296  if all(self.status):
1297  self.log.info('FINISHED recd from all workers, break loop')
1298  break
1299  continue
1300  # otherwise, continue as normal
1301  self.nIn += 1 # update central count (maybe needed by FSR store)
1302  evtNumber, tbin = packet # unpack
1303  self.TS.Load(tbin)
1304  t = time.time()
1305  self.a.executeEvent()
1306  self.rTime += (time.time() - t)
1308  self.evt.clearStore()
1309  self.eventLoopSyncer.set()
1310  self.log.name = "Writer--2"
1311  self.log.info('Setting <Last> Event')
1312  self.lastEvent.set()
1313 
1314  # finalisation steps
1315  [e.finalize() for e in self.evcoms]
1316  # Now do Histograms
1317  sc = self.histoAgent.Receive()
1318  sc = self.histoAgent.RebuildHistoStore()
1319  if sc.isSuccess():
1320  self.log.info('Histo Store rebuilt ok')
1321  else:
1322  self.log.warning('Histo Store Error in Rebuild')
1323 
1324  # Now do FileRecords
1325  sc = self.filerecordsAgent.Receive()
1326  self.filerecordsAgent.Rebuild()
1327  self.Finalize()
1328  #self.rTime = time.time()-startEngine
1329  self.Report()
1330 
1331 
1332 # =============================================================================
1333 
1334 # =============================================================================
1335 
1336 
1337 class Coord(object):
1338  def __init__(self, nWorkers, config, log):
1339 
1340  self.log = log
1341  self.config = config
1342  # set up Logging
1343  self.log.name = 'GaudiPython-Parallel-Logger'
1344  self.log.info('GaudiPython Parallel Process Co-ordinator beginning')
1345 
1346  if nWorkers == -1:
1347  # The user has requested all available cpus in the machine
1348  self.nWorkers = cpu_count()
1349  else:
1350  self.nWorkers = nWorkers
1351 
1352  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1353  self.hq = JoinableQueue() # for Histogram data
1354  self.fq = JoinableQueue() # for FileRecords data
1355 
1356  # Make a Syncer for Initalise, Run, and Finalise
1357  self.sInit = Syncer(
1358  self.nWorkers,
1359  self.log,
1360  limit=WAIT_INITIALISE,
1361  step=STEP_INITIALISE)
1362  self.sRun = Syncer(
1363  self.nWorkers,
1364  self.log,
1365  manyEvents=True,
1366  limit=WAIT_SINGLE_EVENT,
1367  step=STEP_EVENT,
1368  firstEvent=WAIT_FIRST_EVENT)
1369  self.sFin = Syncer(
1370  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1371  # and one final one for Histogram Transfer
1372  self.histSyncEvent = Event()
1373 
1374  # params are common to al subprocesses
1375  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1376 
1377  self.subworkers = []
1378  # Declare SubProcesses!
1379  for i in range(1, self.nWorkers):
1380  sub = Subworker(i, self.getQueues(i), self.getSyncEvents(i),
1381  params, self.subworkers)
1382  self.subworkers.append(sub)
1383  self.reader = Reader(
1384  self.getQueues(-1), self.getSyncEvents(-1), params,
1385  self.subworkers)
1386  self.workers = []
1387  wk = Worker(0, self.getQueues(0), self.getSyncEvents(0), params,
1388  self.subworkers)
1389  self.writer = Writer(
1390  self.getQueues(-2), self.getSyncEvents(-2), params,
1391  self.subworkers)
1392 
1393  self.system = []
1394  self.system.append(self.writer)
1395  self.system.append(wk)
1396  self.system.append(self.reader)
1397 
1398  def getSyncEvents(self, nodeID):
1399  init = self.sInit.d[nodeID].event
1400  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1401  fin = self.sFin.d[nodeID].event
1402  return (init, run, fin)
1403 
1404  def getQueues(self, nodeID):
1405  eventQ = self.qs[nodeID]
1406  histQ = self.hq
1407  fsrQ = self.fq
1408  return (eventQ, histQ, fsrQ)
1409 
1410  def Go(self):
1411 
1412  # Initialise
1413  self.log.name = 'GaudiPython-Parallel-Logger'
1414  self.log.info('INITIALISING SYSTEM')
1415 
1416  # Start reader, writer and main worker
1417  for p in self.system:
1418  p.Start()
1419 
1420  sc = self.sInit.syncAll(step="Initialise")
1421  if sc == SUCCESS:
1422  pass
1423  else:
1424  self.Terminate()
1425  return FAILURE
1426 
1427  # Run
1428  self.log.name = 'GaudiPython-Parallel-Logger'
1429  self.log.info('RUNNING SYSTEM')
1430  sc = self.sRun.syncAll(step="Run")
1431  if sc == SUCCESS:
1432  pass
1433  else:
1434  self.Terminate()
1435  return FAILURE
1436 
1437  # Finalise
1438  self.log.name = 'GaudiPython-Parallel-Logger'
1439  self.log.info('FINALISING SYSTEM')
1440  sc = self.sFin.syncAll(step="Finalise")
1441  if sc == SUCCESS:
1442  pass
1443  else:
1444  self.Terminate()
1445  return FAILURE
1446 
1447  # if we've got this far, finally report SUCCESS
1448  self.log.info("Cleanly join all Processes")
1449  self.Stop()
1450  self.log.info("Report Total Success to Main.py")
1451  return SUCCESS
1452 
1453  def Terminate(self):
1454  # Brutally kill sub-processes
1455  children = multiprocessing.active_children()
1456  for i in children:
1457  i.terminate()
1458 
1459  # self.writer.proc.terminate()
1460  #[ w.proc.terminate() for w in self.workers]
1461  # self.reader.proc.terminate()
1462 
1463  def Stop(self):
1464  # procs should be joined in reverse order to launch
1465  self.system.reverse()
1466  for s in self.system:
1467  s.proc.join()
1468  return SUCCESS
1469 
1470  def SetupQueues(self):
1471  # This method will set up the network of Queues needed
1472  # N Queues = nWorkers + 1
1473  # Each Worker has a Queue in, and a Queue out
1474  # Reader has Queue out only
1475  # Writer has nWorkers Queues in
1476 
1477  # one queue from Reader-Workers
1478  rwk = JoinableQueue()
1479  # one queue from each worker to writer
1480  workersWriter = [JoinableQueue() for i in xrange(self.nWorkers)]
1481  d = {}
1482  d[-1] = (None, rwk) # Reader
1483  d[-2] = (workersWriter, None) # Writer
1484  for i in xrange(self.nWorkers):
1485  d[i] = (rwk, workersWriter[i])
1486  return d
1487 
1488 
1489 # ============================= EOF ===========================================
def Load(self, tbuf)
Definition: GMPBase.py:345
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:420
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:671
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:121
StatusCode finalize() override
Definition: Algorithm.cpp:94
StatusCode execute() override
Definition: Algorithm.cpp:90
def SetupQueues(self)
Definition: GMPBase.py:1470
def receive(self, timeout=None)
Definition: GMPBase.py:276
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:240
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
Definition: GMPBase.py:195
def DumpEvent(self)
Definition: GMPBase.py:688
def DoFirstEvent(self)
Definition: GMPBase.py:695
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:805
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:215
def Terminate(self)
Definition: GMPBase.py:1453
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:930
def processConfiguration(self)
Definition: GMPBase.py:988
def getCheckAlgs(self)
Definition: GMPBase.py:1149
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:535
def getItemLists(self, config)
Definition: GMPBase.py:142
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1214
def set(self, key, output)
Definition: GMPBase.py:159
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1338
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
Definition: GMPBase.py:90
def processConfiguration(self)
Definition: GMPBase.py:507
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1398
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:899
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:977
Python Algorithm base class.
Definition: Algorithm.h:30
def isEventPassed(self)
Definition: GMPBase.py:1174
def processConfiguration(self)
Definition: GMPBase.py:675
def getQueues(self, nodeID)
Definition: GMPBase.py:1404
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1167
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:331
def processConfiguration(self)
Definition: GMPBase.py:1223