The Gaudi Framework  v30r3 (a5ef0a68)
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
3 from ROOT import TBufferFile, TBuffer
4 import multiprocessing
5 from multiprocessing import Process, Queue, JoinableQueue, Event
6 from multiprocessing import cpu_count, current_process
7 from multiprocessing.queues import Empty
8 from pTools import *
9 import time
10 import sys
11 import os
12 from ROOT import TParallelMergingFile
13 # This script contains the bases for the Gaudi MultiProcessing (GMP)
14 # classes
15 
16 # There are three classes :
17 # Reader
18 # Worker
19 # Writer
20 
21 # Each class needs to perform communication with the others
22 # For this, we need a means of communication, which will be based on
23 # the python multiprocessing package
24 # This is provided in SPI pytools package
25 # cmt line : use pytools v1.1 LCG_Interfaces
26 # The PYTHONPATH env variable may need to be modified, as this might
27 # still point to 1.0_python2.5
28 
29 # Each class will need Queues, and a defined method for using these
30 # queues.
31 # For example, as long as there is something in the Queue, both ends
32 # of the queue must be open
33 # Also, there needs to be proper Termination flags and criteria
34 # The System should be error proof.
35 
36 
37 # Constants -------------------------------------------------------------------
38 NAP = 0.001
39 MB = 1024.0 * 1024.0
40 # waits to guard against hanging, in seconds
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
45 STEP_INITIALISE = 10
46 STEP_EVENT = 2
47 STEP_FINALISE = 10
48 
49 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
50 SMAPS = False
51 
52 # -----------------------------------------------------------------------------
53 
54 # definitions
55 # ----------
56 # used to convert stored histos (in AIDA format) to ROOT format
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
58 
59 # used to check which type of histo we are dealing with
60 # i.e. if currentHisto in aidatypes : pass
61 aidatypes = (gbl.AIDA.IHistogram,
62  gbl.AIDA.IHistogram1D,
63  gbl.AIDA.IHistogram2D,
64  gbl.AIDA.IHistogram3D,
65  gbl.AIDA.IProfile1D,
66  gbl.AIDA.IProfile2D,
67  gbl.AIDA.IBaseHistogram) # extra?
68 
69 # similar to aidatypes
70 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
71 
72 # Types of OutputStream in Gaudi
73 WRITERTYPES = {'EvtCollectionStream': "tuples",
74  'InputCopyStream': "events",
75  'OutputStream': "events",
76  'RecordStream': "records",
77  'RunRecordStream': "records",
78  'SequentialOutputStream': "events",
79  'TagCollectionStream': "tuples"}
80 
81 # =============================================================================
82 
83 
84 class MiniWriter(object):
85  '''
86  A class to represent a writer in the GaudiPython configuration
87  It can be non-trivial to access the name of the output file; it may be
88  specified in the DataSvc, or just on the writer, may be a list, or string
89  Also, there are three different types of writer (events, records, tuples)
90  so this bootstrap class provides easy access to this info while configuring
91  '''
92 
93  def __init__(self, writer, wType, config):
94  self.w = writer
95  self.wType = wType
96  # set parameters for directly accessing the correct
97  # part of the configuration, so that later, we can do
98  # config[ key ].Output = modified(output)
99  self.key = None
100  self.output = None
101  self.ItemList = None
102  self.OptItemList = None
103  #
104  self.wName = writer.getName()
105  # Now process the Writer, find where the output is named
106  self.woutput = None
107  self.datasvcName = None
108  self.svcOutput = None
109  if hasattr(self.w, "Output"):
110  self.woutput = self.w.Output
111  self.getItemLists(config)
112  self.set(self.wName, self.w.Output)
113  return
114  else:
115  # if there is no output file, get it via the datasvc
116  # (every writer always has a datasvc property)
117  self.datasvcName = self.w.EvtDataSvc
118  datasvc = config[self.datasvcName]
119  if hasattr(datasvc, "Output"):
120  self.getItemLists(config)
121  self.set(self.datasvcName, datasvc.Output)
122  return
123 
124  def getNewName(self, replaceThis, withThis, extra=''):
125  # replace one pattern in the output name string
126  # with another, and return the Output name
127  # It *might* be in a list, so check for this
128  #
129  # @param extra : might need to add ROOT flags
130  # i.e.: OPT='RECREATE', or such
131  assert replaceThis.__class__.__name__ == 'str'
132  assert withThis.__class__.__name__ == 'str'
133  old = self.output
134  lst = False
135  if old.__class__.__name__ == 'list':
136  old = self.output[0]
137  lst = True
138  new = old.replace(replaceThis, withThis)
139  new += extra
140  if lst:
141  return [new]
142  else:
143  return new
144 
145  def getItemLists(self, config):
146  # the item list
147  if hasattr(self.w, "ItemList"):
148  self.ItemList = self.w.ItemList
149  else:
150  datasvc = config[self.w.EvtDataSvc]
151  if hasattr(datasvc, "ItemList"):
152  self.ItemList = datasvc.ItemList
153  # The option item list; possibly not a valid variable
154  if hasattr(self.w, "OptItemList"):
155  self.OptItemList = self.w.OptItemList
156  else:
157  datasvc = config[self.w.EvtDataSvc]
158  if hasattr(datasvc, "OptItemList"):
159  self.OptItemList = datasvc.OptItemList
160  return
161 
162  def set(self, key, output):
163  self.key = key
164  self.output = output
165  return
166 
167  def __repr__(self):
168  s = ""
169  line = '-' * 80
170  s += (line + '\n')
171  s += "Writer : %s\n" % (self.wName)
172  s += "Writer Type : %s\n" % (self.wType)
173  s += "Writer Output : %s\n" % (self.output)
174  s += "DataSvc : %s\n" % (self.datasvcName)
175  s += "DataSvc Output : %s\n" % (self.svcOutput)
176  s += '\n'
177  s += "Key for config : %s\n" % (self.key)
178  s += "Output File : %s\n" % (self.output)
179  s += "ItemList : %s\n" % (self.ItemList)
180  s += "OptItemList : %s\n" % (self.OptItemList)
181  s += (line + '\n')
182  return s
183 
184 # =============================================================================
185 
186 
188  '''
189  GaudiPython algorithm used to clean up histos on the Reader and Workers
190  Only has a finalize method()
191  This retrieves a dictionary of path:histo objects and sends it to the
192  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
193  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
194  at the writer end, there will be an error.
195  '''
196 
197  def __init__(self, gmpcomponent):
198  PyAlgorithm.__init__(self)
199  self._gmpc = gmpcomponent
200  self.log = self._gmpc.log
201  return None
202 
203  def execute(self):
204  return SUCCESS
205 
206  def finalize(self):
207  self.log.info('CollectHistograms Finalise (%s)' %
208  (self._gmpc.nodeType))
209  self._gmpc.hDict = self._gmpc.dumpHistograms()
210  ks = self._gmpc.hDict.keys()
211  self.log.info('%i Objects in Histogram Store' % (len(ks)))
212 
213  # crashes occurred due to Memory Error during the sending of hundreds
214  # histos on slc5 machines, so instead, break into chunks
215  # send 100 at a time
216  chunk = 100
217  reps = len(ks) / chunk + 1
218  for i in xrange(reps):
219  someKeys = ks[i * chunk: (i + 1) * chunk]
220  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
221  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
222  # "finished" Notification
223  self.log.debug('Signalling end of histos to Writer')
224  self._gmpc.hq.put('HISTOS_SENT')
225  self.log.debug('Waiting on Sync Event')
226  self._gmpc.sEvent.wait()
227  self.log.debug('Histo Sync Event set, clearing and returning')
228  self._gmpc.hvt.clearStore()
229  root = gbl.DataObject()
230  setOwnership(root, False)
231  self._gmpc.hvt.setRoot('/stat', root)
232  return SUCCESS
233 
234 # =============================================================================
235 
236 
237 class EventCommunicator(object):
238  # This class is responsible for communicating Gaudi Events via Queues
239  # Events are communicated as TBufferFiles, filled either by the
240  # TESSerializer, or the GaudiSvc, "IPCSvc"
241  def __init__(self, GMPComponent, qin, qout):
242  self._gmpc = GMPComponent
243  self.log = self._gmpc.log
244  # maximum capacity of Queues
245  self.maxsize = 50
246  # central variables : Queues
247  self.qin = qin
248  self.qout = qout
249 
250  # flags
251  self.allsent = False
252  self.allrecv = False
253 
254  # statistics storage
255  self.nSent = 0 # counter : items sent
256  self.nRecv = 0 # counter : items received
257  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
258  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
259  self.qinTime = 0 # time : receiving from self.qin
260  self.qoutTime = 0 # time : sending on qout
261 
262  def send(self, item):
263  # This class manages the sending of a TBufferFile Event to a Queue
264  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
265  assert item.__class__.__name__ == 'tuple'
266  startTransmission = time.time()
267  self.qout.put(item)
268  # allow the background thread to feed the Queue; not 100% guaranteed to
269  # finish before next line
270  while self.qout._buffer:
271  time.sleep(NAP)
272  self.qoutTime += time.time() - startTransmission
273  self.sizeSent += item[1].Length()
274  self.nSent += 1
275  return SUCCESS
276 
277  def receive(self, timeout=None):
278  # Receive items from self.qin
279  startWait = time.time()
280  try:
281  itemIn = self.qin.get(timeout=timeout)
282  except Empty:
283  return None
284  self.qinTime += time.time() - startWait
285  self.nRecv += 1
286  if itemIn.__class__.__name__ == 'tuple':
287  self.sizeRecv += itemIn[1].Length()
288  else:
289  self.nRecv -= 1
290  try:
291  self.qin.task_done()
292  except:
293  self._gmpc.log.warning('TASK_DONE called too often by : %s'
294  % (self._gmpc.nodeType))
295  return itemIn
296 
297  def finalize(self):
298  self.log.info('Finalize Event Communicator : %s %s' %
299  (self._gmpc, self._gmpc.nodeType))
300  # Reader sends one flag for each worker
301  # Workers send one flag each
302  # Writer sends nothing (it's at the end of the chain)
303  if self._gmpc.nodeType == 'Reader':
304  downstream = self._gmpc.nWorkers
305  elif self._gmpc.nodeType == 'Writer':
306  downstream = 0
307  elif self._gmpc.nodeType == 'Worker':
308  downstream = 1
309 
310  for i in xrange(downstream):
311  self.qout.put('FINISHED')
312  if self._gmpc.nodeType != 'Writer':
313  self.qout.join()
314  # Now some reporting...
315  self.statistics()
316 
317  def statistics(self):
318  self.log.name = '%s-%i Audit ' % (self._gmpc.nodeType,
319  self._gmpc.nodeID)
320  self.log.info('Items Sent : %i' % (self.nSent))
321  self.log.info('Items Received : %i' % (self.nRecv))
322  self.log.info('Data Sent : %i' % (self.sizeSent))
323  self.log.info('Data Received : %i' % (self.sizeRecv))
324  self.log.info('Q-out Time : %5.2f' % (self.qoutTime))
325  self.log.info('Q-in Time : %5.2f' % (self.qinTime))
326 
327 # =============================================================================
328 
329 
330 class TESSerializer(object):
331  def __init__(self, gaudiTESSerializer, evtDataSvc,
332  nodeType, nodeID, log):
333  self.T = gaudiTESSerializer
334  self.evt = evtDataSvc
335  self.buffersIn = []
336  self.buffersOut = []
337  self.nIn = 0
338  self.nOut = 0
339  self.tDump = 0.0
340  self.tLoad = 0.0
341  # logging
342  self.nodeType = nodeType
343  self.nodeID = nodeID
344  self.log = log
345 
346  def Load(self, tbuf):
347  root = gbl.DataObject()
348  setOwnership(root, False)
349  self.evt.setRoot('/Event', root)
350  t = time.time()
351  self.T.loadBuffer(tbuf)
352  self.tLoad += (time.time() - t)
353  self.nIn += 1
354  self.buffersIn.append(tbuf.Length())
355 
356  def Dump(self):
357  t = time.time()
358  tb = TBufferFile(TBuffer.kWrite)
359  self.T.dumpBuffer(tb)
360  self.tDump += (time.time() - t)
361  self.nOut += 1
362  self.buffersOut.append(tb.Length())
363  return tb
364 
365  def Report(self):
366  evIn = "Events Loaded : %i" % (self.nIn)
367  evOut = "Events Dumped : %i" % (self.nOut)
368  din = sum(self.buffersIn)
369  dataIn = "Data Loaded : %i" % (din)
370  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
371  if self.nIn:
372  avgIn = "Avg Buf Loaded : %5.2f Mb"\
373  % (din / (self.nIn * MB))
374  maxIn = "Max Buf Loaded : %5.2f Mb"\
375  % (max(self.buffersIn) / MB)
376  else:
377  avgIn = "Avg Buf Loaded : N/A"
378  maxIn = "Max Buf Loaded : N/A"
379  dout = sum(self.buffersOut)
380  dataOut = "Data Dumped : %i" % (dout)
381  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
382  if self.nOut:
383  avgOut = "Avg Buf Dumped : %5.2f Mb"\
384  % (din / (self.nOut * MB))
385  maxOut = "Max Buf Dumped : %5.2f Mb"\
386  % (max(self.buffersOut) / MB)
387  else:
388  avgOut = "Avg Buf Dumped : N/A"
389  maxOut = "Max Buf Dumped : N/A"
390  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
391  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
392 
393  lines = evIn,\
394  evOut,\
395  dataIn,\
396  dataInMb,\
397  avgIn,\
398  maxIn,\
399  dataOut,\
400  dataOutMb,\
401  avgOut,\
402  maxOut,\
403  dumpTime,\
404  loadTime
405  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
406  for line in lines:
407  self.log.info(line)
408  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
409 
410 # =============================================================================
411 
412 
413 class GMPComponent(object):
414  # This class will be the template for Reader, Worker and Writer
415  # containing all common components
416  # nodeId will be a numerical identifier for the node
417  # -1 for reader
418  # -2 for writer
419  # 0,...,nWorkers-1 for the Workers
420  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
421  # declare a Gaudi MultiProcessing Node
422  # the nodeType is going to be one of Reader, Worker, Writer
423  # qPair is going to be a tuple of ( qin, qout )
424  # for sending and receiving
425  # if nodeType is "Writer", it will be a list of qPairs,
426  # as there's one queue-in from each Worker
427  #
428  # params is a tuple of (nWorkers, config, log)
429 
430  self.nodeType = nodeType
431  current_process().name = nodeType
432 
433  # Synchronisation Event() objects for keeping track of the system
434  self.initEvent, eventLoopSyncer, self.finalEvent = events
435  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
436 
437  # necessary for knowledge of the system
438  self.nWorkers, self.sEvent, self.config, self.log = params
439  self.subworkers = subworkers
440  self.nodeID = nodeID
441  self.msgFormat = self.config['MessageSvc'].Format
442 
443  # describe the state of the node by the current Event Number
444  self.currentEvent = None
445 
446  # Unpack the Queues : (events, histos, filerecords)
447  self.queues = queues
448  self.num = 0
449 
450  ks = self.config.keys()
451  self.app = None
452  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
453  for k in list:
454  if k in ks:
455  self.app = k
456 
457  def Start(self):
458  # define the separate process
459  qPair, histq, fq = self.queues
460 
461  # Set up the Queue Mechanisms ( Event Communicators )
462  if self.nodeType == 'Reader' or self.nodeType == 'Worker':
463  # Reader or Worker Node
464  qin, qout = qPair
465  self.evcom = EventCommunicator(self, qin, qout)
466  else:
467  # Writer : many queues in, no queue out
468  assert self.nodeType == 'Writer'
469  self.evcoms = []
470  qsin = qPair[0]
471  for q in qsin:
472  ec = EventCommunicator(self, q, None)
473  self.evcoms.append(ec)
474  # Histogram Queue
475  self.hq = histq
476  # FileRecords Queue
477  self.fq = fq
478 
479  # Universal Counters (available to all nodes)
480  # Use sensibly!!!
481  self.nIn = 0
482  self.nOut = 0
483 
484  # Status Flag (possibly remove later)
485  self.stat = SUCCESS
486 
487  # Set logger name
488  self.log.name = '%s-%i' % (self.nodeType, self.nodeID)
489 
490  # Heuristic variables
491  # time for init, run, final, firstEventTime, totalTime
492  self.iTime = 0.0
493  self.rTime = 0.0
494  self.fTime = 0.0
495  self.firstEvTime = 0.0
496  self.tTime = 0.0
497 
498  self.proc = Process(target=self.Engine)
499  # Fork and start the separate process
500  self.proc.start()
501 
502  def Engine(self):
503  # This will be the method carried out by the Node
504  # Different for all
505  pass
506 
508  # Different for all ; customize Configuration for multicore
509  pass
510 
511  def SetupGaudiPython(self):
512  # This method will initialize the GaudiPython Tools
513  # such as the AppMgr and so on
514  self.a = AppMgr()
515  if SMAPS:
516  from AlgSmapShot import SmapShot
517  smapsLog = self.nodeType + '-' + str(self.nodeID) + '.smp'
518  ss = SmapShot(logname=smapsLog)
519  self.a.addAlgorithm(ss)
520  self.evt = self.a.evtsvc()
521  self.hvt = self.a.histsvc()
522  self.fsr = self.a.filerecordsvc()
523  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
524  self.pers = self.a.service('EventPersistencySvc', 'IAddressCreator')
525  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
526  self.TS = TESSerializer(self.ts, self.evt,
527  self.nodeType, self.nodeID, self.log)
528  return SUCCESS
529 
530  def StartGaudiPython(self):
531  self.a.initialize()
532  self.a.start()
533  return SUCCESS
534 
535  def LoadTES(self, tbufferfile):
536  root = gbl.DataObject()
537  setOwnership(root, False)
538  self.evt.setRoot('/Event', root)
539  self.ts.loadBuffer(tbufferfile)
540 
541  def getEventNumber(self):
542  if self.app != 'Gauss':
543  # Using getList or getHistoNames can result in the EventSelector
544  # re-initialising connection to RootDBase, which costs a lot of
545  # time... try to build a set of Header paths??
546 
547  # First Attempt : Unpacked Event Data
548  lst = ['/Event/Gen/Header',
549  '/Event/Rec/Header']
550  for l in lst:
551  path = l
552  try:
553  n = self.evt[path].evtNumber()
554 
555  return n
556  except:
557  # No evt number at this path
558  continue
559 
560  # second attepmt : try DAQ/RawEvent data
561  # The Evt Number is in bank type 16, bank 0, data pt 4
562  try:
563  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
564 
565  return n
566  except:
567  pass
568 
569  # Default Action
570  if self.nIn > 0 or self.nOut > 0:
571  pass
572  else:
573  self.log.warning('Could not determine Event Number')
574  return -1
575  else:
576  if self.nodeID == -1:
577  self.num = self.num + 1
578 
579  return self.num
580 
581  def IdentifyWriters(self):
582  #
583  # Identify Writers in the Configuration
584  #
585  d = {}
586  keys = ["events", "records", "tuples", "histos"]
587  for k in keys:
588  d[k] = []
589 
590  # Identify Writers and Classify
591  wkeys = WRITERTYPES.keys()
592  for v in self.config.values():
593  if v.__class__.__name__ in wkeys:
594  writerType = WRITERTYPES[v.__class__.__name__]
595  d[writerType].append(MiniWriter(v, writerType, self.config))
596  if self.nodeID == 0:
597  self.log.info('Writer Found : %s' % (v.name()))
598 
599  # Now Check for the Histogram Service
600  if 'HistogramPersistencySvc' in self.config.keys():
601  hfile = self.config['HistogramPersistencySvc'].getProp(
602  'OutputFile')
603  d["histos"].append(hfile)
604  return d
605 
606  def dumpHistograms(self):
607  '''
608  Method used by the GaudiPython algorithm CollectHistos
609  to obtain a dictionary of form { path : object }
610  representing the Histogram Store
611  '''
612  nlist = self.hvt.getHistoNames()
613  histDict = {}
614  objects = 0
615  histos = 0
616  if nlist:
617  for n in nlist:
618  o = self.hvt[n]
619  if type(o) in aidatypes:
620  o = aida2root(o)
621  histos += 1
622  else:
623  objects += 1
624  histDict[n] = o
625  else:
626  print 'WARNING : no histograms to recover?'
627  return histDict
628 
629  def Initialize(self):
630  start = time.time()
631  self.processConfiguration()
632  self.SetupGaudiPython()
633  self.initEvent.set()
634  self.StartGaudiPython()
635 
636  if self.app == 'Gauss':
637 
638  tool = self.a.tool("ToolSvc.EvtCounter")
639  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
640  else:
641  self.cntr = None
642 
643  self.iTime = time.time() - start
644 
645  def Finalize(self):
646  start = time.time()
647  self.a.stop()
648  self.a.finalize()
649  self.log.info('%s-%i Finalized' % (self.nodeType, self.nodeID))
650  self.finalEvent.set()
651  self.fTime = time.time() - start
652 
653  def Report(self):
654  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
655  allTime = "Alive Time : %5.2f" % (self.tTime)
656  initTime = "Init Time : %5.2f" % (self.iTime)
657  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
658  runTime = "Run Time : %5.2f" % (self.rTime)
659  finTime = "Finalise Time : %5.2f" % (self.fTime)
660  tup = (allTime, initTime, frstTime, runTime, finTime)
661  for t in tup:
662  self.log.info(t)
663  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
664  # and report from the TESSerializer
665  self.TS.Report()
666 
667 # =============================================================================
668 
669 
671  def __init__(self, queues, events, params, subworkers):
672  GMPComponent.__init__(self, 'Reader', -1, queues,
673  events, params, subworkers)
674 
676  # Reader :
677  # No algorithms
678  # No output
679  # No histos
680  self.config['ApplicationMgr'].TopAlg = []
681  self.config['ApplicationMgr'].OutStream = []
682  if "HistogramPersistencySvc" in self.config.keys():
683  self.config['HistogramPersistencySvc'].OutputFile = ''
684  self.config['MessageSvc'].Format = '%-13s ' % '[Reader]' + \
685  self.msgFormat
686  self.evtMax = self.config['ApplicationMgr'].EvtMax
687 
688  def DumpEvent(self):
689  tb = TBufferFile(TBuffer.kWrite)
690  # print '----Reader dumping Buffer!!!'
691  self.ts.dumpBuffer(tb)
692  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
693  return tb
694 
695  def DoFirstEvent(self):
696  # Do First Event ------------------------------------------------------
697  # Check Termination Criteria
698  startFirst = time.time()
699  self.log.info('Reader : First Event')
700  if self.nOut == self.evtMax:
701  self.log.info('evtMax( %i ) reached' % (self.evtMax))
702  self.lastEvent.set()
703  return SUCCESS
704  else:
705  # Continue to read, dump and send event
706  self.a.run(1)
707  if not bool(self.evt['/Event']):
708  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
709  self.lastEvent.set()
710  return SUCCESS
711  else:
712  # Popluate TESSerializer list and send Event
713  if self.app == "Gauss":
714  lst = self.evt.getHistoNames()
715  else:
716  try:
717  lst = self.evt.getList()
718  if self.app == "DaVinci":
719  daqnode = self.evt.retrieveObject(
720  '/Event/DAQ').registry()
721  setOwnership(daqnode, False)
722  self.evt.getList(
723  daqnode, lst, daqnode.address().par())
724  except:
725  self.log.critical('Reader could not acquire TES List!')
726  self.lastEvent.set()
727  return FAILURE
728  self.log.info('Reader : TES List : %i items' % (len(lst)))
729  for l in lst:
730  self.ts.addItem(l)
732  tb = self.TS.Dump()
733  self.log.info('First Event Sent')
734  self.evcom.send((self.currentEvent, tb))
735  self.nOut += 1
736  self.eventLoopSyncer.set()
737  self.evt.clearStore()
738  self.firstEvTime = time.time() - startFirst
739  return SUCCESS
740 
741  def Engine(self):
742  # rename process
743  import os
744  import ctypes
745  libc = ctypes.CDLL('libc.so.6')
746  name = str(self.nodeType) + str(self.nodeID) + '\0'
747  libc.prctl(15, name, 0, 0, 0)
748 
749  startEngine = time.time()
750  self.log.name = 'Reader'
751  self.log.info('Reader Process starting')
752 
753  self.Initialize()
754 
755  # add the Histogram Collection Algorithm
756  self.a.addAlgorithm(CollectHistograms(self))
757 
758  self.log.info('Reader Beginning Distribution')
759  sc = self.DoFirstEvent()
760  if sc.isSuccess():
761  self.log.info('Reader First Event OK')
762  else:
763  self.log.critical('Reader Failed on First Event')
764  self.stat = FAILURE
765 
766  # Do All Others -------------------------------------------------------
767  while True:
768  # Check Termination Criteria
769  if self.nOut == self.evtMax:
770  self.log.info('evtMax( %i ) reached' % (self.evtMax))
771  break
772  # Check Health
773  if not self.stat.isSuccess():
774  self.log.critical('Reader is Damaged!')
775  break
776  # Continue to read, dump and send event
777  t = time.time()
778  self.a.run(1)
779  self.rTime += (time.time() - t)
780  if not bool(self.evt['/Event']):
781  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
782  break
783  self.currentEvent = self.getEventNumber()
784  tb = self.TS.Dump()
785  self.evcom.send((self.currentEvent, tb))
786  # clean up
787  self.nOut += 1
788  self.eventLoopSyncer.set()
789  self.evt.clearStore()
790  self.log.info('Setting <Last> Event')
791  self.lastEvent.set()
792 
793  # Finalize
794  self.log.info('Reader : Event Distribution complete.')
795  self.evcom.finalize()
796  self.Finalize()
797  self.tTime = time.time() - startEngine
798  self.Report()
799 
800 # =============================================================================
801 
802 
804  def __init__(self, workerID, queues, events, params, subworkers):
805  GMPComponent.__init__(self, 'Worker', workerID,
806  queues, events, params, subworkers)
807  # Identify the writer streams
809  # Identify the accept/veto checks for each event
810  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
811  self.log.info("Subworker-%i Created OK" % (self.nodeID))
812  self.eventOutput = True
813 
814  def Engine(self):
815  # rename process
816  import os
817  import ctypes
818  libc = ctypes.CDLL('libc.so.6')
819  name = str(self.nodeType) + str(self.nodeID) + '\0'
820  libc.prctl(15, name, 0, 0, 0)
821 
822  self.initEvent.set()
823  startEngine = time.time()
824  msg = self.a.service('MessageSvc')
825  msg.Format = '%-13s ' % ('['+self.log.name+']') + self.msgFormat
826 
827  self.log.name = "Worker-%i" % (self.nodeID)
828  self.log.info("Subworker %i starting Engine" % (self.nodeID))
830 
831  # populate the TESSerializer itemlist
832  self.log.info('EVT WRITERS ON WORKER : %i'
833  % (len(self.writerDict['events'])))
834 
835  nEventWriters = len(self.writerDict["events"])
836  self.a.addAlgorithm(CollectHistograms(self))
837 
838  # Begin processing
839  Go = True
840  while Go:
841  packet = self.evcom.receive()
842  if packet:
843  pass
844  else:
845  continue
846  if packet == 'FINISHED':
847  break
848  evtNumber, tbin = packet # unpack
849  if self.cntr != None:
850 
851  self.cntr.setEventCounter(evtNumber)
852 
853  self.nIn += 1
854  self.TS.Load(tbin)
855 
856  t = time.time()
857  sc = self.a.executeEvent()
858  if self.nIn == 1:
859  self.firstEvTime = time.time() - t
860  else:
861  self.rTime += (time.time() - t)
862  if sc.isSuccess():
863  pass
864  else:
865  self.log.name = "Worker-%i" % (self.nodeID)
866  self.log.warning('Did not Execute Event')
867  self.evt.clearStore()
868  continue
869  if self.isEventPassed():
870  pass
871  else:
872  self.log.name = "Worker-%i" % (self.nodeID)
873  self.log.warning('Event did not pass : %i' % (evtNumber))
874  self.evt.clearStore()
875  continue
876  if self.eventOutput:
877  # It may be the case of generating Event Tags; hence
878  # no event output
880  tb = self.TS.Dump()
881  self.evcom.send((self.currentEvent, tb))
882  self.nOut += 1
883  self.inc.fireIncident(gbl.Incident('Subworker', 'EndEvent'))
884  self.eventLoopSyncer.set()
885  self.evt.clearStore()
886  self.log.name = "Worker-%i" % (self.nodeID)
887  self.log.info('Setting <Last> Event %s' % (self.nodeID))
888  self.lastEvent.set()
889 
890  self.evcom.finalize()
891  # Now send the FileRecords and stop/finalize the appMgr
892  self.filerecordsAgent.SendFileRecords()
893  self.tTime = time.time() - startEngine
894  self.Finalize()
895  self.Report()
896  # self.finalEvent.set()
897 
898  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
899  self.a = a
900  self.evt = evt
901  self.hvt = hvt
902  self.fsr = fsr
903  #self.inc = inc
904  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
905  self.pers = pers
906  self.ts = ts
907  self.cntr = cntr
908  self.TS = TESSerializer(self.ts, self.evt,
909  self.nodeType, self.nodeID, self.log)
910 
911  def getCheckAlgs(self):
912  '''
913  For some output writers, a check is performed to see if the event has
914  executed certain algorithms.
915  These reside in the AcceptAlgs property for those writers
916  '''
917  acc = []
918  req = []
919  vet = []
920  for m in self.writerDict["events"]:
921  if hasattr(m.w, 'AcceptAlgs'):
922  acc += m.w.AcceptAlgs
923  if hasattr(m.w, 'RequireAlgs'):
924  req += m.w.RequireAlgs
925  if hasattr(m.w, 'VetoAlgs'):
926  vet += m.w.VetoAlgs
927  return (acc, req, vet)
928 
929  def checkExecutedPassed(self, algName):
930  if self.a.algorithm(algName)._ialg.isExecuted()\
931  and self.a.algorithm(algName)._ialg.filterPassed():
932  return True
933  else:
934  return False
935 
936  def isEventPassed(self):
937  '''
938  Check the algorithm status for an event.
939  Depending on output writer settings, the event
940  may be declined based on various criteria.
941  This is a transcript of the check that occurs in GaudiSvc::OutputStream
942  '''
943  passed = False
944 
945  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
946  if self.acceptAlgs:
947  for name in self.acceptAlgs:
948  if self.checkExecutedPassed(name):
949  passed = True
950  break
951  else:
952  passed = True
953 
954  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
955  for name in self.requireAlgs:
956  if self.checkExecutedPassed(name):
957  pass
958  else:
959  self.log.info('Evt declined (requireAlgs) : %s' % (name))
960  passed = False
961 
962  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
963  for name in self.vetoAlgs:
964  if self.checkExecutedPassed(name):
965  pass
966  else:
967  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
968  passed = False
969  return passed
970 
971 # =============================================================================
972 
973 
975  def __init__(self, workerID, queues, events, params, subworkers):
976  GMPComponent.__init__(self, 'Worker', workerID,
977  queues, events, params, subworkers)
978  # Identify the writer streams
980  # Identify the accept/veto checks for each event
981  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
982  self.log.name = "Worker-%i" % (self.nodeID)
983  self.log.info("Worker-%i Created OK" % (self.nodeID))
984  self.eventOutput = True
985 
987 
988  # Worker :
989  # No input
990  # No output
991  # No Histos
992  self.config['EventSelector'].Input = []
993  self.config['ApplicationMgr'].OutStream = []
994  if "HistogramPersistencySvc" in self.config.keys():
995  self.config['HistogramPersistencySvc'].OutputFile = ''
996  formatHead = '[Worker-%i]' % (self.nodeID)
997  self.config['MessageSvc'].Format = '%-13s ' % formatHead + \
998  self.msgFormat
999 
1000  for key, lst in self.writerDict.iteritems():
1001  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1002 
1003  for m in self.writerDict["tuples"]:
1004  # rename Tuple output file with an appendix
1005  # based on worker id, for merging later
1006  newName = m.getNewName('.', '.w%i.' % (self.nodeID))
1007  self.config[m.key].Output = newName
1008 
1009  # Suppress INFO Output for all but Worker-0
1010  # if self.nodeID == 0 :
1011  # pass
1012  # else :
1013  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1014 
1015  if self.app == "Gauss":
1016  try:
1017  if "ToolSvc.EvtCounter" not in self.config:
1018  from Configurables import EvtCounter
1019  counter = EvtCounter()
1020  else:
1021  counter = self.config["ToolSvc.EvtCounter"]
1022  counter.UseIncident = False
1023  except:
1024  # ignore errors when trying to change the configuration of the EvtCounter
1025  self.log.warning('Cannot configure EvtCounter')
1026 
1027  def Engine(self):
1028 
1029  # rename process
1030  import os
1031  import ctypes
1032  libc = ctypes.CDLL('libc.so.6')
1033  name = str(self.nodeType) + str(self.nodeID) + '\0'
1034  libc.prctl(15, name, 0, 0, 0)
1035 
1036  startEngine = time.time()
1037  self.log.info("Worker %i starting Engine" % (self.nodeID))
1038  self.Initialize()
1040 
1041  # populate the TESSerializer itemlist
1042  self.log.info('EVT WRITERS ON WORKER : %i'
1043  % (len(self.writerDict['events'])))
1044 
1045  nEventWriters = len(self.writerDict["events"])
1046  if nEventWriters:
1047  itemList = set()
1048  optItemList = set()
1049  for m in self.writerDict["events"]:
1050  for item in m.ItemList:
1051  hsh = item.find('#')
1052  if hsh != -1:
1053  item = item[:hsh]
1054  itemList.add(item)
1055  for item in m.OptItemList:
1056  hsh = item.find('#')
1057  if hsh != -1:
1058  item = item[:hsh]
1059  optItemList.add(item)
1060  # If an item is mandatory and optional, keep it only in the optional list
1061  itemList -= optItemList
1062  for item in sorted(itemList):
1063  self.log.info(' adding ItemList Item to ts : %s' % (item))
1064  self.ts.addItem(item)
1065  for item in sorted(optItemList):
1066  self.log.info(' adding Optional Item to ts : %s' % (item))
1067  self.ts.addOptItem(item)
1068  else:
1069  self.log.info('There is no Event Output for this app')
1070  self.eventOutput = False
1071 
1072  # Begin processing
1073  Go = True
1074  while Go:
1075  packet = self.evcom.receive()
1076  if packet:
1077  pass
1078  else:
1079  continue
1080  if packet == 'FINISHED':
1081  break
1082  evtNumber, tbin = packet # unpack
1083  if self.cntr != None:
1084  self.cntr.setEventCounter(evtNumber)
1085 
1086  # subworkers are forked before the first event is processed
1087  # reader-thread for ConDB must be closed and reopened in each subworker
1088  # this is done by disconnect()
1089  if self.nIn == 0:
1090 
1091  self.log.info("Fork new subworkers and disconnect from CondDB")
1092  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1093  condDB.disconnect()
1094 
1095  # Fork subworkers and share services
1096  for k in self.subworkers:
1097  k.SetServices(self.a, self.evt, self.hvt, self.fsr,
1098  self.inc, self.pers, self.ts, self.cntr)
1099  k.Start()
1100  self.a.addAlgorithm(CollectHistograms(self))
1101  self.nIn += 1
1102  self.TS.Load(tbin)
1103 
1104  t = time.time()
1105  sc = self.a.executeEvent()
1106  if self.nIn == 1:
1107  self.firstEvTime = time.time() - t
1108  else:
1109  self.rTime += (time.time() - t)
1110  if sc.isSuccess():
1111  pass
1112  else:
1113  self.log.warning('Did not Execute Event')
1114  self.evt.clearStore()
1115  continue
1116  if self.isEventPassed():
1117  pass
1118  else:
1119  self.log.warning('Event did not pass : %i' % (evtNumber))
1120  self.evt.clearStore()
1121  continue
1122  if self.eventOutput:
1123  # It may be the case of generating Event Tags; hence
1124  # no event output
1126  tb = self.TS.Dump()
1127  self.evcom.send((self.currentEvent, tb))
1128  self.nOut += 1
1129  self.inc.fireIncident(gbl.Incident('Worker', 'EndEvent'))
1130  self.eventLoopSyncer.set()
1131  self.evt.clearStore()
1132  self.log.info('Setting <Last> Event')
1133  self.lastEvent.set()
1134 
1135  self.evcom.finalize()
1136  self.log.info('Worker-%i Finished Processing Events' % (self.nodeID))
1137  # Now send the FileRecords and stop/finalize the appMgr
1138  self.filerecordsAgent.SendFileRecords()
1139  self.Finalize()
1140  self.tTime = time.time() - startEngine
1141  self.Report()
1142 
1143  for k in self.subworkers:
1144  self.log.info('Join subworkers')
1145  k.proc.join()
1146 
1147  def getCheckAlgs(self):
1148  '''
1149  For some output writers, a check is performed to see if the event has
1150  executed certain algorithms.
1151  These reside in the AcceptAlgs property for those writers
1152  '''
1153  acc = []
1154  req = []
1155  vet = []
1156  for m in self.writerDict["events"]:
1157  if hasattr(m.w, 'AcceptAlgs'):
1158  acc += m.w.AcceptAlgs
1159  if hasattr(m.w, 'RequireAlgs'):
1160  req += m.w.RequireAlgs
1161  if hasattr(m.w, 'VetoAlgs'):
1162  vet += m.w.VetoAlgs
1163  return (acc, req, vet)
1164 
1165  def checkExecutedPassed(self, algName):
1166  if self.a.algorithm(algName)._ialg.isExecuted()\
1167  and self.a.algorithm(algName)._ialg.filterPassed():
1168  return True
1169  else:
1170  return False
1171 
1172  def isEventPassed(self):
1173  '''
1174  Check the algorithm status for an event.
1175  Depending on output writer settings, the event
1176  may be declined based on various criteria.
1177  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1178  '''
1179  passed = False
1180 
1181  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1182  if self.acceptAlgs:
1183  for name in self.acceptAlgs:
1184  if self.checkExecutedPassed(name):
1185  passed = True
1186  break
1187  else:
1188  passed = True
1189 
1190  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
1191  for name in self.requireAlgs:
1192  if self.checkExecutedPassed(name):
1193  pass
1194  else:
1195  self.log.info('Evt declined (requireAlgs) : %s' % (name))
1196  passed = False
1197 
1198  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
1199  for name in self.vetoAlgs:
1200  if self.checkExecutedPassed(name):
1201  pass
1202  else:
1203  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
1204  passed = False
1205  return passed
1206 
1207 # =============================================================================
1208 
1209 
1211  def __init__(self, queues, events, params, subworkers):
1212  GMPComponent.__init__(self, 'Writer', -2, queues,
1213  events, params, subworkers)
1214  # Identify the writer streams
1216  # This keeps track of workers as they finish
1217  self.status = [False] * self.nWorkers
1218  self.log.name = "Writer--2"
1219 
1221  # Writer :
1222  # No input
1223  # No Algs
1224  self.config['ApplicationMgr'].TopAlg = []
1225  self.config['EventSelector'].Input = []
1226 
1227  self.config['MessageSvc'].Format = '%-13s ' % '[Writer]' + \
1228  self.msgFormat
1229 
1230  # Now process the output writers
1231  for key, lst in self.writerDict.iteritems():
1232  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1233 
1234  # Modify the name of the output file to reflect that it came
1235  # from a parallel processing
1236  #
1237  # Event Writers
1238  for m in self.writerDict["events"]:
1239  self.log.debug('Processing Event Writer : %s' % (m))
1240  newName = m.getNewName('.', '.p%i.' % self.nWorkers)
1241  self.config[m.key].Output = newName
1242 
1243  # Now, if there are no event writers, the FileRecords file
1244  # will fail to open, as it only opens an UPDATE version
1245  # of the existing Event Output File
1246  # So, if there are no event writers, edit the string of the
1247  # FileRecord Writer
1248 
1249  # FileRecords Writers
1250  for m in self.writerDict["records"]:
1251  self.log.debug('Processing FileRecords Writer: %s' % (m))
1252  newName = m.getNewName('.', '.p%i.' % self.nWorkers,
1253  extra=" OPT='RECREATE'")
1254  self.config[m.key].Output = newName
1255 
1256  # same for histos
1257  hs = "HistogramPersistencySvc"
1258  n = None
1259  if hs in self.config.keys():
1260  n = self.config[hs].OutputFile
1261  if n:
1262  newName = self.config[hs].OutputFile.replace('.',
1263  '.p%i.' % (self.nWorkers))
1264  self.config[hs].OutputFile = newName
1265 
1266  def Engine(self):
1267  # rename process
1268  import os
1269  import ctypes
1270  libc = ctypes.CDLL('libc.so.6')
1271  name = str(self.nodeType) + str(self.nodeID) + '\0'
1272  libc.prctl(15, name, 0, 0, 0)
1273 
1274  startEngine = time.time()
1275  self.Initialize()
1276  self.histoAgent = HistoAgent(self)
1278 
1279  # Begin processing
1280  Go = True
1281  current = -1
1282  stopCriteria = self.nWorkers
1283  while Go:
1284  current = (current + 1) % self.nWorkers
1285  packet = self.evcoms[current].receive(timeout=0.01)
1286  if packet == None:
1287  continue
1288  if packet == 'FINISHED':
1289  self.log.info(
1290  'Writer got FINISHED flag : Worker %i' % (current))
1291 
1292  self.status[current] = True
1293  if all(self.status):
1294  self.log.info('FINISHED recd from all workers, break loop')
1295  break
1296  continue
1297  # otherwise, continue as normal
1298  self.nIn += 1 # update central count (maybe needed by FSR store)
1299  evtNumber, tbin = packet # unpack
1300  self.TS.Load(tbin)
1301  t = time.time()
1302  self.a.executeEvent()
1303  self.rTime += (time.time() - t)
1305  self.evt.clearStore()
1306  self.eventLoopSyncer.set()
1307  self.log.name = "Writer--2"
1308  self.log.info('Setting <Last> Event')
1309  self.lastEvent.set()
1310 
1311  # finalisation steps
1312  [e.finalize() for e in self.evcoms]
1313  # Now do Histograms
1314  sc = self.histoAgent.Receive()
1315  sc = self.histoAgent.RebuildHistoStore()
1316  if sc.isSuccess():
1317  self.log.info('Histo Store rebuilt ok')
1318  else:
1319  self.log.warning('Histo Store Error in Rebuild')
1320 
1321  # Now do FileRecords
1322  sc = self.filerecordsAgent.Receive()
1323  self.filerecordsAgent.Rebuild()
1324  self.Finalize()
1325  #self.rTime = time.time()-startEngine
1326  self.Report()
1327 
1328 # =============================================================================
1329 
1330 
1331 # =============================================================================
1332 
1333 class Coord(object):
1334  def __init__(self, nWorkers, config, log):
1335 
1336  self.log = log
1337  self.config = config
1338  # set up Logging
1339  self.log.name = 'GaudiPython-Parallel-Logger'
1340  self.log.info('GaudiPython Parallel Process Co-ordinator beginning')
1341 
1342  if nWorkers == -1:
1343  # The user has requested all available cpus in the machine
1344  self.nWorkers = cpu_count()
1345  else:
1346  self.nWorkers = nWorkers
1347 
1348  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1349  self.hq = JoinableQueue() # for Histogram data
1350  self.fq = JoinableQueue() # for FileRecords data
1351 
1352  # Make a Syncer for Initalise, Run, and Finalise
1353  self.sInit = Syncer(self.nWorkers, self.log,
1354  limit=WAIT_INITIALISE,
1355  step=STEP_INITIALISE)
1356  self.sRun = Syncer(self.nWorkers, self.log,
1357  manyEvents=True,
1358  limit=WAIT_SINGLE_EVENT,
1359  step=STEP_EVENT,
1360  firstEvent=WAIT_FIRST_EVENT)
1361  self.sFin = Syncer(self.nWorkers, self.log,
1362  limit=WAIT_FINALISE,
1363  step=STEP_FINALISE)
1364  # and one final one for Histogram Transfer
1365  self.histSyncEvent = Event()
1366 
1367  # params are common to al subprocesses
1368  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1369 
1370  self.subworkers = []
1371  # Declare SubProcesses!
1372  for i in range(1, self.nWorkers):
1373  sub = Subworker(i, self.getQueues(
1374  i), self.getSyncEvents(i), params, self.subworkers)
1375  self.subworkers.append(sub)
1376  self.reader = Reader(self.getQueues(-1),
1377  self.getSyncEvents(-1), params, self.subworkers)
1378  self.workers = []
1379  wk = Worker(0, self.getQueues(0), self.getSyncEvents(0),
1380  params, self.subworkers)
1381  self.writer = Writer(self.getQueues(-2),
1382  self.getSyncEvents(-2), params, self.subworkers)
1383 
1384  self.system = []
1385  self.system.append(self.writer)
1386  self.system.append(wk)
1387  self.system.append(self.reader)
1388 
1389  def getSyncEvents(self, nodeID):
1390  init = self.sInit.d[nodeID].event
1391  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1392  fin = self.sFin.d[nodeID].event
1393  return (init, run, fin)
1394 
1395  def getQueues(self, nodeID):
1396  eventQ = self.qs[nodeID]
1397  histQ = self.hq
1398  fsrQ = self.fq
1399  return (eventQ, histQ, fsrQ)
1400 
1401  def Go(self):
1402 
1403  # Initialise
1404  self.log.name = 'GaudiPython-Parallel-Logger'
1405  self.log.info('INITIALISING SYSTEM')
1406 
1407  # Start reader, writer and main worker
1408  for p in self.system:
1409  p.Start()
1410 
1411  sc = self.sInit.syncAll(step="Initialise")
1412  if sc == SUCCESS:
1413  pass
1414  else:
1415  self.Terminate()
1416  return FAILURE
1417 
1418  # Run
1419  self.log.name = 'GaudiPython-Parallel-Logger'
1420  self.log.info('RUNNING SYSTEM')
1421  sc = self.sRun.syncAll(step="Run")
1422  if sc == SUCCESS:
1423  pass
1424  else:
1425  self.Terminate()
1426  return FAILURE
1427 
1428  # Finalise
1429  self.log.name = 'GaudiPython-Parallel-Logger'
1430  self.log.info('FINALISING SYSTEM')
1431  sc = self.sFin.syncAll(step="Finalise")
1432  if sc == SUCCESS:
1433  pass
1434  else:
1435  self.Terminate()
1436  return FAILURE
1437 
1438  # if we've got this far, finally report SUCCESS
1439  self.log.info("Cleanly join all Processes")
1440  self.Stop()
1441  self.log.info("Report Total Success to Main.py")
1442  return SUCCESS
1443 
1444  def Terminate(self):
1445  # Brutally kill sub-processes
1446  children = multiprocessing.active_children()
1447  for i in children:
1448  i.terminate()
1449 
1450  # self.writer.proc.terminate()
1451  #[ w.proc.terminate() for w in self.workers]
1452  # self.reader.proc.terminate()
1453 
1454  def Stop(self):
1455  # procs should be joined in reverse order to launch
1456  self.system.reverse()
1457  for s in self.system:
1458  s.proc.join()
1459  return SUCCESS
1460 
1461  def SetupQueues(self):
1462  # This method will set up the network of Queues needed
1463  # N Queues = nWorkers + 1
1464  # Each Worker has a Queue in, and a Queue out
1465  # Reader has Queue out only
1466  # Writer has nWorkers Queues in
1467 
1468  # one queue from Reader-Workers
1469  rwk = JoinableQueue()
1470  # one queue from each worker to writer
1471  workersWriter = [JoinableQueue() for i in xrange(self.nWorkers)]
1472  d = {}
1473  d[-1] = (None, rwk) # Reader
1474  d[-2] = (workersWriter, None) # Writer
1475  for i in xrange(self.nWorkers):
1476  d[i] = (rwk, workersWriter[i])
1477  return d
1478 
1479 # ============================= EOF ===========================================
def Load(self, tbuf)
Definition: GMPBase.py:346
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:420
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:671
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:124
StatusCode finalize() override
Definition: Algorithm.cpp:99
StatusCode execute() override
Definition: Algorithm.cpp:95
def SetupQueues(self)
Definition: GMPBase.py:1461
def receive(self, timeout=None)
Definition: GMPBase.py:277
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:241
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
Definition: GMPBase.py:197
def DumpEvent(self)
Definition: GMPBase.py:688
def DoFirstEvent(self)
Definition: GMPBase.py:695
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:804
def Terminate(self)
Definition: GMPBase.py:1444
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:929
def processConfiguration(self)
Definition: GMPBase.py:986
def getCheckAlgs(self)
Definition: GMPBase.py:1147
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:535
def getItemLists(self, config)
Definition: GMPBase.py:145
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1211
def set(self, key, output)
Definition: GMPBase.py:162
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1334
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
def __init__(self, writer, wType, config)
Definition: GMPBase.py:93
def processConfiguration(self)
Definition: GMPBase.py:507
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1389
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:898
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:975
Python Algorithm base class.
Definition: Algorithm.h:32
def isEventPassed(self)
Definition: GMPBase.py:1172
def processConfiguration(self)
Definition: GMPBase.py:675
def getQueues(self, nodeID)
Definition: GMPBase.py:1395
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1165
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:332
def processConfiguration(self)
Definition: GMPBase.py:1220