The Gaudi Framework  v29r0 (ff2e7097)
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT import TBufferFile, TBuffer
4 import multiprocessing
5 from multiprocessing import Process, Queue, JoinableQueue, Event
6 from multiprocessing import cpu_count, current_process
7 from multiprocessing.queues import Empty
8 from pTools import *
9 import time
10 import sys
11 import os
12 from ROOT import TParallelMergingFile
13 # This script contains the bases for the Gaudi MultiProcessing (GMP)
14 # classes
15 
16 # There are three classes :
17 # Reader
18 # Worker
19 # Writer
20 
21 # Each class needs to perform communication with the others
22 # For this, we need a means of communication, which will be based on
23 # the python multiprocessing package
24 # This is provided in SPI pytools package
25 # cmt line : use pytools v1.1 LCG_Interfaces
26 # The PYTHONPATH env variable may need to be modified, as this might
27 # still point to 1.0_python2.5
28 
29 # Each class will need Queues, and a defined method for using these
30 # queues.
31 # For example, as long as there is something in the Queue, both ends
32 # of the queue must be open
33 # Also, there needs to be proper Termination flags and criteria
34 # The System should be error proof.
35 
36 
37 # Constants -------------------------------------------------------------------
38 NAP = 0.001
39 MB = 1024.0 * 1024.0
40 # waits to guard against hanging, in seconds
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
45 STEP_INITIALISE = 10
46 STEP_EVENT = 2
47 STEP_FINALISE = 10
48 
49 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
50 SMAPS = False
51 
52 # -----------------------------------------------------------------------------
53 
54 # definitions
55 # ----------
56 # used to convert stored histos (in AIDA format) to ROOT format
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
58 
59 # used to check which type of histo we are dealing with
60 # i.e. if currentHisto in aidatypes : pass
61 aidatypes = (gbl.AIDA.IHistogram,
62  gbl.AIDA.IHistogram1D,
63  gbl.AIDA.IHistogram2D,
64  gbl.AIDA.IHistogram3D,
65  gbl.AIDA.IProfile1D,
66  gbl.AIDA.IProfile2D,
67  gbl.AIDA.IBaseHistogram) # extra?
68 
69 # similar to aidatypes
70 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
71 
72 # Types of OutputStream in Gaudi
73 WRITERTYPES = {'EvtCollectionStream': "tuples",
74  'InputCopyStream': "events",
75  'OutputStream': "events",
76  'RecordStream': "records",
77  'RunRecordStream': "records",
78  'SequentialOutputStream': "events",
79  'TagCollectionStream': "tuples"}
80 
81 # =============================================================================
82 
83 
84 class MiniWriter(object):
85  '''
86  A class to represent a writer in the GaudiPython configuration
87  It can be non-trivial to access the name of the output file; it may be
88  specified in the DataSvc, or just on the writer, may be a list, or string
89  Also, there are three different types of writer (events, records, tuples)
90  so this bootstrap class provides easy access to this info while configuring
91  '''
92 
93  def __init__(self, writer, wType, config):
94  self.w = writer
95  self.wType = wType
96  # set parameters for directly accessing the correct
97  # part of the configuration, so that later, we can do
98  # config[ key ].Output = modified(output)
99  self.key = None
100  self.output = None
101  self.ItemList = None
102  self.OptItemList = None
103  #
104  self.wName = writer.getName()
105  # Now process the Writer, find where the output is named
106  self.woutput = None
107  self.datasvcName = None
108  self.svcOutput = None
109  if hasattr(self.w, "Output"):
110  self.woutput = self.w.Output
111  self.getItemLists(config)
112  self.set(self.wName, self.w.Output)
113  return
114  else:
115  # if there is no output file, get it via the datasvc
116  # (every writer always has a datasvc property)
117  self.datasvcName = self.w.EvtDataSvc
118  datasvc = config[self.datasvcName]
119  if hasattr(datasvc, "Output"):
120  self.getItemLists(config)
121  self.set(self.datasvcName, datasvc.Output)
122  return
123 
124  def getNewName(self, replaceThis, withThis, extra=''):
125  # replace one pattern in the output name string
126  # with another, and return the Output name
127  # It *might* be in a list, so check for this
128  #
129  # @param extra : might need to add ROOT flags
130  # i.e.: OPT='RECREATE', or such
131  assert replaceThis.__class__.__name__ == 'str'
132  assert withThis.__class__.__name__ == 'str'
133  old = self.output
134  lst = False
135  if old.__class__.__name__ == 'list':
136  old = self.output[0]
137  lst = True
138  new = old.replace(replaceThis, withThis)
139  new += extra
140  if lst:
141  return [new]
142  else:
143  return new
144 
145  def getItemLists(self, config):
146  # the item list
147  if hasattr(self.w, "ItemList"):
148  self.ItemList = self.w.ItemList
149  else:
150  datasvc = config[self.w.EvtDataSvc]
151  if hasattr(datasvc, "ItemList"):
152  self.ItemList = datasvc.ItemList
153  # The option item list; possibly not a valid variable
154  if hasattr(self.w, "OptItemList"):
155  self.OptItemList = self.w.OptItemList
156  else:
157  datasvc = config[self.w.EvtDataSvc]
158  if hasattr(datasvc, "OptItemList"):
159  self.OptItemList = datasvc.OptItemList
160  return
161 
162  def set(self, key, output):
163  self.key = key
164  self.output = output
165  return
166 
167  def __repr__(self):
168  s = ""
169  line = '-' * 80
170  s += (line + '\n')
171  s += "Writer : %s\n" % (self.wName)
172  s += "Writer Type : %s\n" % (self.wType)
173  s += "Writer Output : %s\n" % (self.output)
174  s += "DataSvc : %s\n" % (self.datasvcName)
175  s += "DataSvc Output : %s\n" % (self.svcOutput)
176  s += '\n'
177  s += "Key for config : %s\n" % (self.key)
178  s += "Output File : %s\n" % (self.output)
179  s += "ItemList : %s\n" % (self.ItemList)
180  s += "OptItemList : %s\n" % (self.OptItemList)
181  s += (line + '\n')
182  return s
183 
184 # =============================================================================
185 
186 
188  '''
189  GaudiPython algorithm used to clean up histos on the Reader and Workers
190  Only has a finalize method()
191  This retrieves a dictionary of path:histo objects and sends it to the
192  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
193  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
194  at the writer end, there will be an error.
195  '''
196 
197  def __init__(self, gmpcomponent):
198  PyAlgorithm.__init__(self)
199  self._gmpc = gmpcomponent
200  self.log = self._gmpc.log
201  return None
202 
203  def execute(self):
204  return SUCCESS
205 
206  def finalize(self):
207  self.log.info('CollectHistograms Finalise (%s)' %
208  (self._gmpc.nodeType))
209  self._gmpc.hDict = self._gmpc.dumpHistograms()
210  ks = self._gmpc.hDict.keys()
211  self.log.info('%i Objects in Histogram Store' % (len(ks)))
212 
213  # crashes occurred due to Memory Error during the sending of hundreds
214  # histos on slc5 machines, so instead, break into chunks
215  # send 100 at a time
216  chunk = 100
217  reps = len(ks) / chunk + 1
218  for i in xrange(reps):
219  someKeys = ks[i * chunk: (i + 1) * chunk]
220  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
221  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
222  # "finished" Notification
223  self.log.debug('Signalling end of histos to Writer')
224  self._gmpc.hq.put('HISTOS_SENT')
225  self.log.debug('Waiting on Sync Event')
226  self._gmpc.sEvent.wait()
227  self.log.debug('Histo Sync Event set, clearing and returning')
228  self._gmpc.hvt.clearStore()
229  root = gbl.DataObject()
230  setOwnership(root, False)
231  self._gmpc.hvt.setRoot('/stat', root)
232  return SUCCESS
233 
234 # =============================================================================
235 
236 
237 class EventCommunicator(object):
238  # This class is responsible for communicating Gaudi Events via Queues
239  # Events are communicated as TBufferFiles, filled either by the
240  # TESSerializer, or the GaudiSvc, "IPCSvc"
241  def __init__(self, GMPComponent, qin, qout):
242  self._gmpc = GMPComponent
243  self.log = self._gmpc.log
244  # maximum capacity of Queues
245  self.maxsize = 50
246  # central variables : Queues
247  self.qin = qin
248  self.qout = qout
249 
250  # flags
251  self.allsent = False
252  self.allrecv = False
253 
254  # statistics storage
255  self.nSent = 0 # counter : items sent
256  self.nRecv = 0 # counter : items received
257  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
258  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
259  self.qinTime = 0 # time : receiving from self.qin
260  self.qoutTime = 0 # time : sending on qout
261 
262  def send(self, item):
263  # This class manages the sending of a TBufferFile Event to a Queue
264  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
265  assert item.__class__.__name__ == 'tuple'
266  startTransmission = time.time()
267  self.qout.put(item)
268  # allow the background thread to feed the Queue; not 100% guaranteed to
269  # finish before next line
270  while self.qout._buffer:
271  time.sleep(NAP)
272  self.qoutTime += time.time() - startTransmission
273  self.sizeSent += item[1].Length()
274  self.nSent += 1
275  return SUCCESS
276 
277  def receive(self, timeout=None):
278  # Receive items from self.qin
279  startWait = time.time()
280  try:
281  itemIn = self.qin.get(timeout=timeout)
282  except Empty:
283  return None
284  self.qinTime += time.time() - startWait
285  self.nRecv += 1
286  if itemIn.__class__.__name__ == 'tuple':
287  self.sizeRecv += itemIn[1].Length()
288  else:
289  self.nRecv -= 1
290  try:
291  self.qin.task_done()
292  except:
293  self._gmpc.log.warning('TASK_DONE called too often by : %s'
294  % (self._gmpc.nodeType))
295  return itemIn
296 
297  def finalize(self):
298  self.log.info('Finalize Event Communicator : %s %s' %
299  (self._gmpc, self._gmpc.nodeType))
300  # Reader sends one flag for each worker
301  # Workers send one flag each
302  # Writer sends nothing (it's at the end of the chain)
303  if self._gmpc.nodeType == 'Reader':
304  downstream = self._gmpc.nWorkers
305  elif self._gmpc.nodeType == 'Writer':
306  downstream = 0
307  elif self._gmpc.nodeType == 'Worker':
308  downstream = 1
309 
310  for i in xrange(downstream):
311  self.qout.put('FINISHED')
312  if self._gmpc.nodeType != 'Writer':
313  self.qout.join()
314  # Now some reporting...
315  self.statistics()
316 
317  def statistics(self):
318  self.log.name = '%s-%i Audit ' % (self._gmpc.nodeType,
319  self._gmpc.nodeID)
320  self.log.info('Items Sent : %i' % (self.nSent))
321  self.log.info('Items Received : %i' % (self.nRecv))
322  self.log.info('Data Sent : %i' % (self.sizeSent))
323  self.log.info('Data Received : %i' % (self.sizeRecv))
324  self.log.info('Q-out Time : %5.2f' % (self.qoutTime))
325  self.log.info('Q-in Time : %5.2f' % (self.qinTime))
326 
327 # =============================================================================
328 
329 
330 class TESSerializer(object):
331  def __init__(self, gaudiTESSerializer, evtDataSvc,
332  nodeType, nodeID, log):
333  self.T = gaudiTESSerializer
334  self.evt = evtDataSvc
335  self.buffersIn = []
336  self.buffersOut = []
337  self.nIn = 0
338  self.nOut = 0
339  self.tDump = 0.0
340  self.tLoad = 0.0
341  # logging
342  self.nodeType = nodeType
343  self.nodeID = nodeID
344  self.log = log
345 
346  def Load(self, tbuf):
347  root = gbl.DataObject()
348  setOwnership(root, False)
349  self.evt.setRoot('/Event', root)
350  t = time.time()
351  self.T.loadBuffer(tbuf)
352  self.tLoad += (time.time() - t)
353  self.nIn += 1
354  self.buffersIn.append(tbuf.Length())
355 
356  def Dump(self):
357  t = time.time()
358  tb = TBufferFile(TBuffer.kWrite)
359  self.T.dumpBuffer(tb)
360  self.tDump += (time.time() - t)
361  self.nOut += 1
362  self.buffersOut.append(tb.Length())
363  return tb
364 
365  def Report(self):
366  evIn = "Events Loaded : %i" % (self.nIn)
367  evOut = "Events Dumped : %i" % (self.nOut)
368  din = sum(self.buffersIn)
369  dataIn = "Data Loaded : %i" % (din)
370  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
371  if self.nIn:
372  avgIn = "Avg Buf Loaded : %5.2f Mb"\
373  % (din / (self.nIn * MB))
374  maxIn = "Max Buf Loaded : %5.2f Mb"\
375  % (max(self.buffersIn) / MB)
376  else:
377  avgIn = "Avg Buf Loaded : N/A"
378  maxIn = "Max Buf Loaded : N/A"
379  dout = sum(self.buffersOut)
380  dataOut = "Data Dumped : %i" % (dout)
381  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
382  if self.nOut:
383  avgOut = "Avg Buf Dumped : %5.2f Mb"\
384  % (din / (self.nOut * MB))
385  maxOut = "Max Buf Dumped : %5.2f Mb"\
386  % (max(self.buffersOut) / MB)
387  else:
388  avgOut = "Avg Buf Dumped : N/A"
389  maxOut = "Max Buf Dumped : N/A"
390  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
391  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
392 
393  lines = evIn,\
394  evOut,\
395  dataIn,\
396  dataInMb,\
397  avgIn,\
398  maxIn,\
399  dataOut,\
400  dataOutMb,\
401  avgOut,\
402  maxOut,\
403  dumpTime,\
404  loadTime
405  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
406  for line in lines:
407  self.log.info(line)
408  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
409 
410 # =============================================================================
411 
412 
413 class GMPComponent(object):
414  # This class will be the template for Reader, Worker and Writer
415  # containing all common components
416  # nodeId will be a numerical identifier for the node
417  # -1 for reader
418  # -2 for writer
419  # 0,...,nWorkers-1 for the Workers
420  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
421  # declare a Gaudi MultiProcessing Node
422  # the nodeType is going to be one of Reader, Worker, Writer
423  # qPair is going to be a tuple of ( qin, qout )
424  # for sending and receiving
425  # if nodeType is "Writer", it will be a list of qPairs,
426  # as there's one queue-in from each Worker
427  #
428  # params is a tuple of (nWorkers, config, log)
429 
430  self.nodeType = nodeType
431  current_process().name = nodeType
432 
433  # Synchronisation Event() objects for keeping track of the system
434  self.initEvent, eventLoopSyncer, self.finalEvent = events
435  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
436 
437  # necessary for knowledge of the system
438  self.nWorkers, self.sEvent, self.config, self.log = params
439  self.subworkers = subworkers
440  self.nodeID = nodeID
441 
442  # describe the state of the node by the current Event Number
443  self.currentEvent = None
444 
445  # Unpack the Queues : (events, histos, filerecords)
446  self.queues = queues
447  self.num = 0
448 
449  ks = self.config.keys()
450  self.app = None
451  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
452  for k in list:
453  if k in ks:
454  self.app = k
455 
456  def Start(self):
457  # define the separate process
458  qPair, histq, fq = self.queues
459 
460  # Set up the Queue Mechanisms ( Event Communicators )
461  if self.nodeType == 'Reader' or self.nodeType == 'Worker':
462  # Reader or Worker Node
463  qin, qout = qPair
464  self.evcom = EventCommunicator(self, qin, qout)
465  else:
466  # Writer : many queues in, no queue out
467  assert self.nodeType == 'Writer'
468  self.evcoms = []
469  qsin = qPair[0]
470  for q in qsin:
471  ec = EventCommunicator(self, q, None)
472  self.evcoms.append(ec)
473  # Histogram Queue
474  self.hq = histq
475  # FileRecords Queue
476  self.fq = fq
477 
478  # Universal Counters (available to all nodes)
479  # Use sensibly!!!
480  self.nIn = 0
481  self.nOut = 0
482 
483  # Status Flag (possibly remove later)
484  self.stat = SUCCESS
485 
486  # Set logger name
487  self.log.name = '%s-%i' % (self.nodeType, self.nodeID)
488 
489  # Heuristic variables
490  # time for init, run, final, firstEventTime, totalTime
491  self.iTime = 0.0
492  self.rTime = 0.0
493  self.fTime = 0.0
494  self.firstEvTime = 0.0
495  self.tTime = 0.0
496 
497  self.proc = Process(target=self.Engine)
498  # Fork and start the separate process
499  self.proc.start()
500 
501  def Engine(self):
502  # This will be the method carried out by the Node
503  # Different for all
504  pass
505 
507  # Different for all ; customize Configuration for multicore
508  pass
509 
510  def SetupGaudiPython(self):
511  # This method will initialize the GaudiPython Tools
512  # such as the AppMgr and so on
513  self.a = AppMgr()
514  if SMAPS:
515  from AlgSmapShot import SmapShot
516  smapsLog = self.nodeType + '-' + str(self.nodeID) + '.smp'
517  ss = SmapShot(logname=smapsLog)
518  self.a.addAlgorithm(ss)
519  self.evt = self.a.evtsvc()
520  self.hvt = self.a.histsvc()
521  self.fsr = self.a.filerecordsvc()
522  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
523  self.pers = self.a.service('EventPersistencySvc', 'IAddressCreator')
524  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
525  self.TS = TESSerializer(self.ts, self.evt,
526  self.nodeType, self.nodeID, self.log)
527  return SUCCESS
528 
529  def StartGaudiPython(self):
530  self.a.initialize()
531  self.a.start()
532  return SUCCESS
533 
534  def LoadTES(self, tbufferfile):
535  root = gbl.DataObject()
536  setOwnership(root, False)
537  self.evt.setRoot('/Event', root)
538  self.ts.loadBuffer(tbufferfile)
539 
540  def getEventNumber(self):
541  if self.app != 'Gauss':
542  # Using getList or getHistoNames can result in the EventSelector
543  # re-initialising connection to RootDBase, which costs a lot of
544  # time... try to build a set of Header paths??
545 
546  # First Attempt : Unpacked Event Data
547  lst = ['/Event/Gen/Header',
548  '/Event/Rec/Header']
549  for l in lst:
550  path = l
551  try:
552  n = self.evt[path].evtNumber()
553 
554  return n
555  except:
556  # No evt number at this path
557  continue
558 
559  # second attepmt : try DAQ/RawEvent data
560  # The Evt Number is in bank type 16, bank 0, data pt 4
561  try:
562  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
563 
564  return n
565  except:
566  pass
567 
568  # Default Action
569  if self.nIn > 0 or self.nOut > 0:
570  pass
571  else:
572  self.log.warning('Could not determine Event Number')
573  return -1
574  else:
575  if self.nodeID == -1:
576  self.num = self.num + 1
577 
578  return self.num
579 
580  def IdentifyWriters(self):
581  #
582  # Identify Writers in the Configuration
583  #
584  d = {}
585  keys = ["events", "records", "tuples", "histos"]
586  for k in keys:
587  d[k] = []
588 
589  # Identify Writers and Classify
590  wkeys = WRITERTYPES.keys()
591  for v in self.config.values():
592  if v.__class__.__name__ in wkeys:
593  writerType = WRITERTYPES[v.__class__.__name__]
594  d[writerType].append(MiniWriter(v, writerType, self.config))
595  if self.nodeID == 0:
596  self.log.info('Writer Found : %s' % (v.name()))
597 
598  # Now Check for the Histogram Service
599  if 'HistogramPersistencySvc' in self.config.keys():
600  hfile = self.config['HistogramPersistencySvc'].getProp(
601  'OutputFile')
602  d["histos"].append(hfile)
603  return d
604 
605  def dumpHistograms(self):
606  '''
607  Method used by the GaudiPython algorithm CollectHistos
608  to obtain a dictionary of form { path : object }
609  representing the Histogram Store
610  '''
611  nlist = self.hvt.getHistoNames()
612  histDict = {}
613  objects = 0
614  histos = 0
615  if nlist:
616  for n in nlist:
617  o = self.hvt[n]
618  if type(o) in aidatypes:
619  o = aida2root(o)
620  histos += 1
621  else:
622  objects += 1
623  histDict[n] = o
624  else:
625  print 'WARNING : no histograms to recover?'
626  return histDict
627 
628  def Initialize(self):
629  start = time.time()
630  self.processConfiguration()
631  self.SetupGaudiPython()
632  self.initEvent.set()
633  self.StartGaudiPython()
634 
635  if self.app == 'Gauss':
636 
637  tool = self.a.tool("ToolSvc.EvtCounter")
638  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
639  else:
640  self.cntr = None
641 
642  self.iTime = time.time() - start
643 
644  def Finalize(self):
645  start = time.time()
646  self.a.stop()
647  self.a.finalize()
648  self.log.info('%s-%i Finalized' % (self.nodeType, self.nodeID))
649  self.finalEvent.set()
650  self.fTime = time.time() - start
651 
652  def Report(self):
653  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
654  allTime = "Alive Time : %5.2f" % (self.tTime)
655  initTime = "Init Time : %5.2f" % (self.iTime)
656  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
657  runTime = "Run Time : %5.2f" % (self.rTime)
658  finTime = "Finalise Time : %5.2f" % (self.fTime)
659  tup = (allTime, initTime, frstTime, runTime, finTime)
660  for t in tup:
661  self.log.info(t)
662  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
663  # and report from the TESSerializer
664  self.TS.Report()
665 
666 # =============================================================================
667 
668 
670  def __init__(self, queues, events, params, subworkers):
671  GMPComponent.__init__(self, 'Reader', -1, queues,
672  events, params, subworkers)
673 
675  # Reader :
676  # No algorithms
677  # No output
678  # No histos
679  self.config['ApplicationMgr'].TopAlg = []
680  self.config['ApplicationMgr'].OutStream = []
681  if "HistogramPersistencySvc" in self.config.keys():
682  self.config['HistogramPersistencySvc'].OutputFile = ''
683  self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
684  self.evtMax = self.config['ApplicationMgr'].EvtMax
685 
686  def DumpEvent(self):
687  tb = TBufferFile(TBuffer.kWrite)
688  # print '----Reader dumping Buffer!!!'
689  self.ts.dumpBuffer(tb)
690  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
691  return tb
692 
693  def DoFirstEvent(self):
694  # Do First Event ------------------------------------------------------
695  # Check Termination Criteria
696  startFirst = time.time()
697  self.log.info('Reader : First Event')
698  if self.nOut == self.evtMax:
699  self.log.info('evtMax( %i ) reached' % (self.evtMax))
700  self.lastEvent.set()
701  return SUCCESS
702  else:
703  # Continue to read, dump and send event
704  self.a.run(1)
705  if not bool(self.evt['/Event']):
706  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
707  self.lastEvent.set()
708  return SUCCESS
709  else:
710  # Popluate TESSerializer list and send Event
711  if self.app == "Gauss":
712  lst = self.evt.getHistoNames()
713  else:
714  try:
715  lst = self.evt.getList()
716  if self.app == "DaVinci":
717  daqnode = self.evt.retrieveObject(
718  '/Event/DAQ').registry()
719  setOwnership(daqnode, False)
720  self.evt.getList(
721  daqnode, lst, daqnode.address().par())
722  except:
723  self.log.critical('Reader could not acquire TES List!')
724  self.lastEvent.set()
725  return FAILURE
726  self.log.info('Reader : TES List : %i items' % (len(lst)))
727  for l in lst:
728  self.ts.addItem(l)
730  tb = self.TS.Dump()
731  self.log.info('First Event Sent')
732  self.evcom.send((self.currentEvent, tb))
733  self.nOut += 1
734  self.eventLoopSyncer.set()
735  self.evt.clearStore()
736  self.firstEvTime = time.time() - startFirst
737  return SUCCESS
738 
739  def Engine(self):
740  # rename process
741  import os
742  import ctypes
743  libc = ctypes.CDLL('libc.so.6')
744  name = str(self.nodeType) + str(self.nodeID) + '\0'
745  libc.prctl(15, name, 0, 0, 0)
746 
747  startEngine = time.time()
748  self.log.name = 'Reader'
749  self.log.info('Reader Process starting')
750 
751  self.Initialize()
752 
753  # add the Histogram Collection Algorithm
754  self.a.addAlgorithm(CollectHistograms(self))
755 
756  self.log.info('Reader Beginning Distribution')
757  sc = self.DoFirstEvent()
758  if sc.isSuccess():
759  self.log.info('Reader First Event OK')
760  else:
761  self.log.critical('Reader Failed on First Event')
762  self.stat = FAILURE
763 
764  # Do All Others -------------------------------------------------------
765  while True:
766  # Check Termination Criteria
767  if self.nOut == self.evtMax:
768  self.log.info('evtMax( %i ) reached' % (self.evtMax))
769  break
770  # Check Health
771  if not self.stat.isSuccess():
772  self.log.critical('Reader is Damaged!')
773  break
774  # Continue to read, dump and send event
775  t = time.time()
776  self.a.run(1)
777  self.rTime += (time.time() - t)
778  if not bool(self.evt['/Event']):
779  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
780  break
781  self.currentEvent = self.getEventNumber()
782  tb = self.TS.Dump()
783  self.evcom.send((self.currentEvent, tb))
784  # clean up
785  self.nOut += 1
786  self.eventLoopSyncer.set()
787  self.evt.clearStore()
788  self.log.info('Setting <Last> Event')
789  self.lastEvent.set()
790 
791  # Finalize
792  self.log.info('Reader : Event Distribution complete.')
793  self.evcom.finalize()
794  self.Finalize()
795  self.tTime = time.time() - startEngine
796  self.Report()
797 
798 # =============================================================================
799 
800 
802  def __init__(self, workerID, queues, events, params, subworkers):
803  GMPComponent.__init__(self, 'Worker', workerID,
804  queues, events, params, subworkers)
805  # Identify the writer streams
807  # Identify the accept/veto checks for each event
808  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
809  self.log.info("Subworker-%i Created OK" % (self.nodeID))
810  self.eventOutput = True
811 
812  def Engine(self):
813  # rename process
814  import os
815  import ctypes
816  libc = ctypes.CDLL('libc.so.6')
817  name = str(self.nodeType) + str(self.nodeID) + '\0'
818  libc.prctl(15, name, 0, 0, 0)
819 
820  self.initEvent.set()
821  startEngine = time.time()
822  msg = self.a.service('MessageSvc')
823  msg.Format = '[' + self.log.name + '] % F%18W%S%7W%R%T %0W%M'
824 
825  self.log.name = "Worker-%i" % (self.nodeID)
826  self.log.info("Subworker %i starting Engine" % (self.nodeID))
828 
829  # populate the TESSerializer itemlist
830  self.log.info('EVT WRITERS ON WORKER : %i'
831  % (len(self.writerDict['events'])))
832 
833  nEventWriters = len(self.writerDict["events"])
834  self.a.addAlgorithm(CollectHistograms(self))
835 
836  # Begin processing
837  Go = True
838  while Go:
839  packet = self.evcom.receive()
840  if packet:
841  pass
842  else:
843  continue
844  if packet == 'FINISHED':
845  break
846  evtNumber, tbin = packet # unpack
847  if self.cntr != None:
848 
849  self.cntr.setEventCounter(evtNumber)
850 
851  self.nIn += 1
852  self.TS.Load(tbin)
853 
854  t = time.time()
855  sc = self.a.executeEvent()
856  if self.nIn == 1:
857  self.firstEvTime = time.time() - t
858  else:
859  self.rTime += (time.time() - t)
860  if sc.isSuccess():
861  pass
862  else:
863  self.log.name = "Worker-%i" % (self.nodeID)
864  self.log.warning('Did not Execute Event')
865  self.evt.clearStore()
866  continue
867  if self.isEventPassed():
868  pass
869  else:
870  self.log.name = "Worker-%i" % (self.nodeID)
871  self.log.warning('Event did not pass : %i' % (evtNumber))
872  self.evt.clearStore()
873  continue
874  if self.eventOutput:
875  # It may be the case of generating Event Tags; hence
876  # no event output
878  tb = self.TS.Dump()
879  self.evcom.send((self.currentEvent, tb))
880  self.nOut += 1
881  self.inc.fireIncident(gbl.Incident('Subworker', 'EndEvent'))
882  self.eventLoopSyncer.set()
883  self.evt.clearStore()
884  self.log.name = "Worker-%i" % (self.nodeID)
885  self.log.info('Setting <Last> Event %s' % (self.nodeID))
886  self.lastEvent.set()
887 
888  self.evcom.finalize()
889  # Now send the FileRecords and stop/finalize the appMgr
890  self.filerecordsAgent.SendFileRecords()
891  self.tTime = time.time() - startEngine
892  self.Finalize()
893  self.Report()
894  # self.finalEvent.set()
895 
896  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
897  self.a = a
898  self.evt = evt
899  self.hvt = hvt
900  self.fsr = fsr
901  #self.inc = inc
902  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
903  self.pers = pers
904  self.ts = ts
905  self.cntr = cntr
906  self.TS = TESSerializer(self.ts, self.evt,
907  self.nodeType, self.nodeID, self.log)
908 
909  def getCheckAlgs(self):
910  '''
911  For some output writers, a check is performed to see if the event has
912  executed certain algorithms.
913  These reside in the AcceptAlgs property for those writers
914  '''
915  acc = []
916  req = []
917  vet = []
918  for m in self.writerDict["events"]:
919  if hasattr(m.w, 'AcceptAlgs'):
920  acc += m.w.AcceptAlgs
921  if hasattr(m.w, 'RequireAlgs'):
922  req += m.w.RequireAlgs
923  if hasattr(m.w, 'VetoAlgs'):
924  vet += m.w.VetoAlgs
925  return (acc, req, vet)
926 
927  def checkExecutedPassed(self, algName):
928  if self.a.algorithm(algName)._ialg.isExecuted()\
929  and self.a.algorithm(algName)._ialg.filterPassed():
930  return True
931  else:
932  return False
933 
934  def isEventPassed(self):
935  '''
936  Check the algorithm status for an event.
937  Depending on output writer settings, the event
938  may be declined based on various criteria.
939  This is a transcript of the check that occurs in GaudiSvc::OutputStream
940  '''
941  passed = False
942 
943  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
944  if self.acceptAlgs:
945  for name in self.acceptAlgs:
946  if self.checkExecutedPassed(name):
947  passed = True
948  break
949  else:
950  passed = True
951 
952  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
953  for name in self.requireAlgs:
954  if self.checkExecutedPassed(name):
955  pass
956  else:
957  self.log.info('Evt declined (requireAlgs) : %s' % (name))
958  passed = False
959 
960  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
961  for name in self.vetoAlgs:
962  if self.checkExecutedPassed(name):
963  pass
964  else:
965  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
966  passed = False
967  return passed
968 
969 # =============================================================================
970 
971 
973  def __init__(self, workerID, queues, events, params, subworkers):
974  GMPComponent.__init__(self, 'Worker', workerID,
975  queues, events, params, subworkers)
976  # Identify the writer streams
978  # Identify the accept/veto checks for each event
979  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
980  self.log.name = "Worker-%i" % (self.nodeID)
981  self.log.info("Worker-%i Created OK" % (self.nodeID))
982  self.eventOutput = True
983 
985 
986  # Worker :
987  # No input
988  # No output
989  # No Histos
990  self.config['EventSelector'].Input = []
991  self.config['ApplicationMgr'].OutStream = []
992  if "HistogramPersistencySvc" in self.config.keys():
993  self.config['HistogramPersistencySvc'].OutputFile = ''
994  formatHead = '[Worker-%i] ' % (self.nodeID)
995  self.config['MessageSvc'].Format = formatHead + \
996  '% F%18W%S%7W%R%T %0W%M'
997 
998  for key, lst in self.writerDict.iteritems():
999  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1000 
1001  for m in self.writerDict["tuples"]:
1002  # rename Tuple output file with an appendix
1003  # based on worker id, for merging later
1004  newName = m.getNewName('.', '.w%i.' % (self.nodeID))
1005  self.config[m.key].Output = newName
1006 
1007  # Suppress INFO Output for all but Worker-0
1008  # if self.nodeID == 0 :
1009  # pass
1010  # else :
1011  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1012 
1013  if self.app == "Gauss":
1014  try:
1015  if "ToolSvc.EvtCounter" not in self.config:
1016  from Configurables import EvtCounter
1017  counter = EvtCounter()
1018  else:
1019  counter = self.config["ToolSvc.EvtCounter"]
1020  counter.UseIncident = False
1021  except:
1022  # ignore errors when trying to change the configuration of the EvtCounter
1023  self.log.warning('Cannot configure EvtCounter')
1024 
1025  def Engine(self):
1026 
1027  # rename process
1028  import os
1029  import ctypes
1030  libc = ctypes.CDLL('libc.so.6')
1031  name = str(self.nodeType) + str(self.nodeID) + '\0'
1032  libc.prctl(15, name, 0, 0, 0)
1033 
1034  startEngine = time.time()
1035  self.log.info("Worker %i starting Engine" % (self.nodeID))
1036  self.Initialize()
1038 
1039  # populate the TESSerializer itemlist
1040  self.log.info('EVT WRITERS ON WORKER : %i'
1041  % (len(self.writerDict['events'])))
1042 
1043  nEventWriters = len(self.writerDict["events"])
1044  if nEventWriters:
1045  itemList = set()
1046  optItemList = set()
1047  for m in self.writerDict["events"]:
1048  for item in m.ItemList:
1049  hsh = item.find('#')
1050  if hsh != -1:
1051  item = item[:hsh]
1052  itemList.add(item)
1053  for item in m.OptItemList:
1054  hsh = item.find('#')
1055  if hsh != -1:
1056  item = item[:hsh]
1057  optItemList.add(item)
1058  # If an item is mandatory and optional, keep it only in the optional list
1059  itemList -= optItemList
1060  for item in sorted(itemList):
1061  self.log.info(' adding ItemList Item to ts : %s' % (item))
1062  self.ts.addItem(item)
1063  for item in sorted(optItemList):
1064  self.log.info(' adding Optional Item to ts : %s' % (item))
1065  self.ts.addOptItem(item)
1066  else:
1067  self.log.info('There is no Event Output for this app')
1068  self.eventOutput = False
1069 
1070  # Begin processing
1071  Go = True
1072  while Go:
1073  packet = self.evcom.receive()
1074  if packet:
1075  pass
1076  else:
1077  continue
1078  if packet == 'FINISHED':
1079  break
1080  evtNumber, tbin = packet # unpack
1081  if self.cntr != None:
1082  self.cntr.setEventCounter(evtNumber)
1083 
1084  # subworkers are forked before the first event is processed
1085  # reader-thread for ConDB must be closed and reopened in each subworker
1086  # this is done by disconnect()
1087  if self.nIn == 0:
1088 
1089  self.log.info("Fork new subworkers and disconnect from CondDB")
1090  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1091  condDB.disconnect()
1092 
1093  # Fork subworkers and share services
1094  for k in self.subworkers:
1095  k.SetServices(self.a, self.evt, self.hvt, self.fsr,
1096  self.inc, self.pers, self.ts, self.cntr)
1097  k.Start()
1098  self.a.addAlgorithm(CollectHistograms(self))
1099  self.nIn += 1
1100  self.TS.Load(tbin)
1101 
1102  t = time.time()
1103  sc = self.a.executeEvent()
1104  if self.nIn == 1:
1105  self.firstEvTime = time.time() - t
1106  else:
1107  self.rTime += (time.time() - t)
1108  if sc.isSuccess():
1109  pass
1110  else:
1111  self.log.warning('Did not Execute Event')
1112  self.evt.clearStore()
1113  continue
1114  if self.isEventPassed():
1115  pass
1116  else:
1117  self.log.warning('Event did not pass : %i' % (evtNumber))
1118  self.evt.clearStore()
1119  continue
1120  if self.eventOutput:
1121  # It may be the case of generating Event Tags; hence
1122  # no event output
1124  tb = self.TS.Dump()
1125  self.evcom.send((self.currentEvent, tb))
1126  self.nOut += 1
1127  self.inc.fireIncident(gbl.Incident('Worker', 'EndEvent'))
1128  self.eventLoopSyncer.set()
1129  self.evt.clearStore()
1130  self.log.info('Setting <Last> Event')
1131  self.lastEvent.set()
1132 
1133  self.evcom.finalize()
1134  self.log.info('Worker-%i Finished Processing Events' % (self.nodeID))
1135  # Now send the FileRecords and stop/finalize the appMgr
1136  self.filerecordsAgent.SendFileRecords()
1137  self.Finalize()
1138  self.tTime = time.time() - startEngine
1139  self.Report()
1140 
1141  for k in self.subworkers:
1142  self.log.info('Join subworkers')
1143  k.proc.join()
1144 
1145  def getCheckAlgs(self):
1146  '''
1147  For some output writers, a check is performed to see if the event has
1148  executed certain algorithms.
1149  These reside in the AcceptAlgs property for those writers
1150  '''
1151  acc = []
1152  req = []
1153  vet = []
1154  for m in self.writerDict["events"]:
1155  if hasattr(m.w, 'AcceptAlgs'):
1156  acc += m.w.AcceptAlgs
1157  if hasattr(m.w, 'RequireAlgs'):
1158  req += m.w.RequireAlgs
1159  if hasattr(m.w, 'VetoAlgs'):
1160  vet += m.w.VetoAlgs
1161  return (acc, req, vet)
1162 
1163  def checkExecutedPassed(self, algName):
1164  if self.a.algorithm(algName)._ialg.isExecuted()\
1165  and self.a.algorithm(algName)._ialg.filterPassed():
1166  return True
1167  else:
1168  return False
1169 
1170  def isEventPassed(self):
1171  '''
1172  Check the algorithm status for an event.
1173  Depending on output writer settings, the event
1174  may be declined based on various criteria.
1175  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1176  '''
1177  passed = False
1178 
1179  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1180  if self.acceptAlgs:
1181  for name in self.acceptAlgs:
1182  if self.checkExecutedPassed(name):
1183  passed = True
1184  break
1185  else:
1186  passed = True
1187 
1188  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
1189  for name in self.requireAlgs:
1190  if self.checkExecutedPassed(name):
1191  pass
1192  else:
1193  self.log.info('Evt declined (requireAlgs) : %s' % (name))
1194  passed = False
1195 
1196  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
1197  for name in self.vetoAlgs:
1198  if self.checkExecutedPassed(name):
1199  pass
1200  else:
1201  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
1202  passed = False
1203  return passed
1204 
1205 # =============================================================================
1206 
1207 
1209  def __init__(self, queues, events, params, subworkers):
1210  GMPComponent.__init__(self, 'Writer', -2, queues,
1211  events, params, subworkers)
1212  # Identify the writer streams
1214  # This keeps track of workers as they finish
1215  self.status = [False] * self.nWorkers
1216  self.log.name = "Writer--2"
1217 
1219  # Writer :
1220  # No input
1221  # No Algs
1222  self.config['ApplicationMgr'].TopAlg = []
1223  self.config['EventSelector'].Input = []
1224 
1225  self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
1226 
1227  # Now process the output writers
1228  for key, lst in self.writerDict.iteritems():
1229  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1230 
1231  # Modify the name of the output file to reflect that it came
1232  # from a parallel processing
1233  #
1234  # Event Writers
1235  for m in self.writerDict["events"]:
1236  self.log.debug('Processing Event Writer : %s' % (m))
1237  newName = m.getNewName('.', '.p%i.' % self.nWorkers)
1238  self.config[m.key].Output = newName
1239 
1240  # Now, if there are no event writers, the FileRecords file
1241  # will fail to open, as it only opens an UPDATE version
1242  # of the existing Event Output File
1243  # So, if there are no event writers, edit the string of the
1244  # FileRecord Writer
1245 
1246  # FileRecords Writers
1247  for m in self.writerDict["records"]:
1248  self.log.debug('Processing FileRecords Writer: %s' % (m))
1249  newName = m.getNewName('.', '.p%i.' % self.nWorkers,
1250  extra=" OPT='RECREATE'")
1251  self.config[m.key].Output = newName
1252 
1253  # same for histos
1254  hs = "HistogramPersistencySvc"
1255  n = None
1256  if hs in self.config.keys():
1257  n = self.config[hs].OutputFile
1258  if n:
1259  newName = self.config[hs].OutputFile.replace('.',
1260  '.p%i.' % (self.nWorkers))
1261  self.config[hs].OutputFile = newName
1262 
1263  def Engine(self):
1264  # rename process
1265  import os
1266  import ctypes
1267  libc = ctypes.CDLL('libc.so.6')
1268  name = str(self.nodeType) + str(self.nodeID) + '\0'
1269  libc.prctl(15, name, 0, 0, 0)
1270 
1271  startEngine = time.time()
1272  self.Initialize()
1273  self.histoAgent = HistoAgent(self)
1275 
1276  # Begin processing
1277  Go = True
1278  current = -1
1279  stopCriteria = self.nWorkers
1280  while Go:
1281  current = (current + 1) % self.nWorkers
1282  packet = self.evcoms[current].receive(timeout=0.01)
1283  if packet == None:
1284  continue
1285  if packet == 'FINISHED':
1286  self.log.info(
1287  'Writer got FINISHED flag : Worker %i' % (current))
1288 
1289  self.status[current] = True
1290  if all(self.status):
1291  self.log.info('FINISHED recd from all workers, break loop')
1292  break
1293  continue
1294  # otherwise, continue as normal
1295  self.nIn += 1 # update central count (maybe needed by FSR store)
1296  evtNumber, tbin = packet # unpack
1297  self.TS.Load(tbin)
1298  t = time.time()
1299  self.a.executeEvent()
1300  self.rTime += (time.time() - t)
1302  self.evt.clearStore()
1303  self.eventLoopSyncer.set()
1304  self.log.name = "Writer--2"
1305  self.log.info('Setting <Last> Event')
1306  self.lastEvent.set()
1307 
1308  # finalisation steps
1309  [e.finalize() for e in self.evcoms]
1310  # Now do Histograms
1311  sc = self.histoAgent.Receive()
1312  sc = self.histoAgent.RebuildHistoStore()
1313  if sc.isSuccess():
1314  self.log.info('Histo Store rebuilt ok')
1315  else:
1316  self.log.warning('Histo Store Error in Rebuild')
1317 
1318  # Now do FileRecords
1319  sc = self.filerecordsAgent.Receive()
1320  self.filerecordsAgent.Rebuild()
1321  self.Finalize()
1322  #self.rTime = time.time()-startEngine
1323  self.Report()
1324 
1325 # =============================================================================
1326 
1327 
1328 # =============================================================================
1329 
1330 class Coord(object):
1331  def __init__(self, nWorkers, config, log):
1332 
1333  self.log = log
1334  self.config = config
1335  # set up Logging
1336  self.log.name = 'GaudiPython-Parallel-Logger'
1337  self.log.info('GaudiPython Parallel Process Co-ordinator beginning')
1338 
1339  if nWorkers == -1:
1340  # The user has requested all available cpus in the machine
1341  self.nWorkers = cpu_count()
1342  else:
1343  self.nWorkers = nWorkers
1344 
1345  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1346  self.hq = JoinableQueue() # for Histogram data
1347  self.fq = JoinableQueue() # for FileRecords data
1348 
1349  # Make a Syncer for Initalise, Run, and Finalise
1350  self.sInit = Syncer(self.nWorkers, self.log,
1351  limit=WAIT_INITIALISE,
1352  step=STEP_INITIALISE)
1353  self.sRun = Syncer(self.nWorkers, self.log,
1354  manyEvents=True,
1355  limit=WAIT_SINGLE_EVENT,
1356  step=STEP_EVENT,
1357  firstEvent=WAIT_FIRST_EVENT)
1358  self.sFin = Syncer(self.nWorkers, self.log,
1359  limit=WAIT_FINALISE,
1360  step=STEP_FINALISE)
1361  # and one final one for Histogram Transfer
1362  self.histSyncEvent = Event()
1363 
1364  # params are common to al subprocesses
1365  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1366 
1367  self.subworkers = []
1368  # Declare SubProcesses!
1369  for i in range(1, self.nWorkers):
1370  sub = Subworker(i, self.getQueues(
1371  i), self.getSyncEvents(i), params, self.subworkers)
1372  self.subworkers.append(sub)
1373  self.reader = Reader(self.getQueues(-1),
1374  self.getSyncEvents(-1), params, self.subworkers)
1375  self.workers = []
1376  wk = Worker(0, self.getQueues(0), self.getSyncEvents(0),
1377  params, self.subworkers)
1378  self.writer = Writer(self.getQueues(-2),
1379  self.getSyncEvents(-2), params, self.subworkers)
1380 
1381  self.system = []
1382  self.system.append(self.writer)
1383  self.system.append(wk)
1384  self.system.append(self.reader)
1385 
1386  def getSyncEvents(self, nodeID):
1387  init = self.sInit.d[nodeID].event
1388  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1389  fin = self.sFin.d[nodeID].event
1390  return (init, run, fin)
1391 
1392  def getQueues(self, nodeID):
1393  eventQ = self.qs[nodeID]
1394  histQ = self.hq
1395  fsrQ = self.fq
1396  return (eventQ, histQ, fsrQ)
1397 
1398  def Go(self):
1399 
1400  # Initialise
1401  self.log.name = 'GaudiPython-Parallel-Logger'
1402  self.log.info('INITIALISING SYSTEM')
1403 
1404  # Start reader, writer and main worker
1405  for p in self.system:
1406  p.Start()
1407 
1408  sc = self.sInit.syncAll(step="Initialise")
1409  if sc == SUCCESS:
1410  pass
1411  else:
1412  self.Terminate()
1413  return FAILURE
1414 
1415  # Run
1416  self.log.name = 'GaudiPython-Parallel-Logger'
1417  self.log.info('RUNNING SYSTEM')
1418  sc = self.sRun.syncAll(step="Run")
1419  if sc == SUCCESS:
1420  pass
1421  else:
1422  self.Terminate()
1423  return FAILURE
1424 
1425  # Finalise
1426  self.log.name = 'GaudiPython-Parallel-Logger'
1427  self.log.info('FINALISING SYSTEM')
1428  sc = self.sFin.syncAll(step="Finalise")
1429  if sc == SUCCESS:
1430  pass
1431  else:
1432  self.Terminate()
1433  return FAILURE
1434 
1435  # if we've got this far, finally report SUCCESS
1436  self.log.info("Cleanly join all Processes")
1437  self.Stop()
1438  self.log.info("Report Total Success to Main.py")
1439  return SUCCESS
1440 
1441  def Terminate(self):
1442  # Brutally kill sub-processes
1443  children = multiprocessing.active_children()
1444  for i in children:
1445  i.terminate()
1446 
1447  # self.writer.proc.terminate()
1448  #[ w.proc.terminate() for w in self.workers]
1449  # self.reader.proc.terminate()
1450 
1451  def Stop(self):
1452  # procs should be joined in reverse order to launch
1453  self.system.reverse()
1454  for s in self.system:
1455  s.proc.join()
1456  return SUCCESS
1457 
1458  def SetupQueues(self):
1459  # This method will set up the network of Queues needed
1460  # N Queues = nWorkers + 1
1461  # Each Worker has a Queue in, and a Queue out
1462  # Reader has Queue out only
1463  # Writer has nWorkers Queues in
1464 
1465  # one queue from Reader-Workers
1466  rwk = JoinableQueue()
1467  # one queue from each worker to writer
1468  workersWriter = [JoinableQueue() for i in xrange(self.nWorkers)]
1469  d = {}
1470  d[-1] = (None, rwk) # Reader
1471  d[-2] = (workersWriter, None) # Writer
1472  for i in xrange(self.nWorkers):
1473  d[i] = (rwk, workersWriter[i])
1474  return d
1475 
1476 # ============================= EOF ===========================================
def Load(self, tbuf)
Definition: GMPBase.py:346
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:420
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:670
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:124
StatusCode finalize() override
Definition: Algorithm.cpp:99
StatusCode execute() override
Definition: Algorithm.cpp:95
def SetupQueues(self)
Definition: GMPBase.py:1458
def receive(self, timeout=None)
Definition: GMPBase.py:277
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:241
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
Definition: GMPBase.py:197
def DumpEvent(self)
Definition: GMPBase.py:686
def DoFirstEvent(self)
Definition: GMPBase.py:693
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:802
def Terminate(self)
Definition: GMPBase.py:1441
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:927
def processConfiguration(self)
Definition: GMPBase.py:984
def getCheckAlgs(self)
Definition: GMPBase.py:1145
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:534
def getItemLists(self, config)
Definition: GMPBase.py:145
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1209
def set(self, key, output)
Definition: GMPBase.py:162
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1331
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
Definition: GMPBase.py:93
def processConfiguration(self)
Definition: GMPBase.py:506
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1386
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:896
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:973
Python Algorithm base class.
Definition: Algorithm.h:32
def isEventPassed(self)
Definition: GMPBase.py:1170
def processConfiguration(self)
Definition: GMPBase.py:674
def getQueues(self, nodeID)
Definition: GMPBase.py:1392
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1163
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:332
def processConfiguration(self)
Definition: GMPBase.py:1218