Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v32r2 (46d42edc)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GMPBase.py
Go to the documentation of this file.
1 from __future__ import print_function
2 from Gaudi.Configuration import *
3 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS, FAILURE, InterfaceCast
4 from ROOT import TBufferFile, TBuffer
5 import multiprocessing
6 from multiprocessing import Process, Queue, JoinableQueue, Event
7 from multiprocessing import cpu_count, current_process
8 from multiprocessing.queues import Empty
9 from pTools import *
10 import time
11 import sys
12 import os
13 from ROOT import TParallelMergingFile
14 # This script contains the bases for the Gaudi MultiProcessing (GMP)
15 # classes
16 
17 # There are three classes :
18 # Reader
19 # Worker
20 # Writer
21 
22 # Each class needs to perform communication with the others
23 # For this, we need a means of communication, which will be based on
24 # the python multiprocessing package
25 # This is provided in SPI pytools package
26 # cmt line : use pytools v1.1 LCG_Interfaces
27 # The PYTHONPATH env variable may need to be modified, as this might
28 # still point to 1.0_python2.5
29 
30 # Each class will need Queues, and a defined method for using these
31 # queues.
32 # For example, as long as there is something in the Queue, both ends
33 # of the queue must be open
34 # Also, there needs to be proper Termination flags and criteria
35 # The System should be error proof.
36 
37 # Constants -------------------------------------------------------------------
38 NAP = 0.001
39 MB = 1024.0 * 1024.0
40 # waits to guard against hanging, in seconds
41 WAIT_INITIALISE = 60 * 5
42 WAIT_FIRST_EVENT = 60 * 3
43 WAIT_SINGLE_EVENT = 60 * 6
44 WAIT_FINALISE = 60 * 2
45 STEP_INITIALISE = 10
46 STEP_EVENT = 2
47 STEP_FINALISE = 10
48 
49 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
50 SMAPS = False
51 
52 # -----------------------------------------------------------------------------
53 
54 # definitions
55 # ----------
56 # used to convert stored histos (in AIDA format) to ROOT format
57 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
58 
59 # used to check which type of histo we are dealing with
60 # i.e. if currentHisto in aidatypes : pass
61 aidatypes = (gbl.AIDA.IHistogram, gbl.AIDA.IHistogram1D, gbl.AIDA.IHistogram2D,
62  gbl.AIDA.IHistogram3D, gbl.AIDA.IProfile1D, gbl.AIDA.IProfile2D,
63  gbl.AIDA.IBaseHistogram) # extra?
64 
65 # similar to aidatypes
66 thtypes = (gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D)
67 
68 # Types of OutputStream in Gaudi
69 WRITERTYPES = {
70  'EvtCollectionStream': "tuples",
71  'InputCopyStream': "events",
72  'OutputStream': "events",
73  'RecordStream': "records",
74  'RunRecordStream': "records",
75  'SequentialOutputStream': "events",
76  'TagCollectionStream': "tuples"
77 }
78 
79 # =============================================================================
80 
81 
82 class MiniWriter(object):
83  '''
84  A class to represent a writer in the GaudiPython configuration
85  It can be non-trivial to access the name of the output file; it may be
86  specified in the DataSvc, or just on the writer, may be a list, or string
87  Also, there are three different types of writer (events, records, tuples)
88  so this bootstrap class provides easy access to this info while configuring
89  '''
90 
91  def __init__(self, writer, wType, config):
92  self.w = writer
93  self.wType = wType
94  # set parameters for directly accessing the correct
95  # part of the configuration, so that later, we can do
96  # config[ key ].Output = modified(output)
97  self.key = None
98  self.output = None
99  self.ItemList = None
100  self.OptItemList = None
101  #
102  self.wName = writer.getName()
103  # Now process the Writer, find where the output is named
104  self.woutput = None
105  self.datasvcName = None
106  self.svcOutput = None
107  if hasattr(self.w, "Output"):
108  self.woutput = self.w.Output
109  self.getItemLists(config)
110  self.set(self.wName, self.w.Output)
111  return
112  else:
113  # if there is no output file, get it via the datasvc
114  # (every writer always has a datasvc property)
115  self.datasvcName = self.w.EvtDataSvc
116  datasvc = config[self.datasvcName]
117  if hasattr(datasvc, "Output"):
118  self.getItemLists(config)
119  self.set(self.datasvcName, datasvc.Output)
120  return
121 
122  def getNewName(self, replaceThis, withThis, extra=''):
123  # replace one pattern in the output name string
124  # with another, and return the Output name
125  # It *might* be in a list, so check for this
126  #
127  # @param extra : might need to add ROOT flags
128  # i.e.: OPT='RECREATE', or such
129  assert replaceThis.__class__.__name__ == 'str'
130  assert withThis.__class__.__name__ == 'str'
131  old = self.output
132  lst = False
133  if old.__class__.__name__ == 'list':
134  old = self.output[0]
135  lst = True
136  new = old.replace(replaceThis, withThis)
137  new += extra
138  if lst:
139  return [new]
140  else:
141  return new
142 
143  def getItemLists(self, config):
144  # the item list
145  if hasattr(self.w, "ItemList"):
146  self.ItemList = self.w.ItemList
147  else:
148  datasvc = config[self.w.EvtDataSvc]
149  if hasattr(datasvc, "ItemList"):
150  self.ItemList = datasvc.ItemList
151  # The option item list; possibly not a valid variable
152  if hasattr(self.w, "OptItemList"):
153  self.OptItemList = self.w.OptItemList
154  else:
155  datasvc = config[self.w.EvtDataSvc]
156  if hasattr(datasvc, "OptItemList"):
157  self.OptItemList = datasvc.OptItemList
158  return
159 
160  def set(self, key, output):
161  self.key = key
162  self.output = output
163  return
164 
165  def __repr__(self):
166  s = ""
167  line = '-' * 80
168  s += (line + '\n')
169  s += "Writer : %s\n" % (self.wName)
170  s += "Writer Type : %s\n" % (self.wType)
171  s += "Writer Output : %s\n" % (self.output)
172  s += "DataSvc : %s\n" % (self.datasvcName)
173  s += "DataSvc Output : %s\n" % (self.svcOutput)
174  s += '\n'
175  s += "Key for config : %s\n" % (self.key)
176  s += "Output File : %s\n" % (self.output)
177  s += "ItemList : %s\n" % (self.ItemList)
178  s += "OptItemList : %s\n" % (self.OptItemList)
179  s += (line + '\n')
180  return s
181 
182 
183 # =============================================================================
184 
185 
187  '''
188  GaudiPython algorithm used to clean up histos on the Reader and Workers
189  Only has a finalize method()
190  This retrieves a dictionary of path:histo objects and sends it to the
191  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
192  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
193  at the writer end, there will be an error.
194  '''
195 
196  def __init__(self, gmpcomponent):
197  PyAlgorithm.__init__(self)
198  self._gmpc = gmpcomponent
199  self.log = self._gmpc.log
200  return None
201 
202  def execute(self):
203  return SUCCESS
204 
205  def finalize(self):
206  self.log.info(
207  'CollectHistograms Finalise (%s)' % (self._gmpc.nodeType))
208  self._gmpc.hDict = self._gmpc.dumpHistograms()
209  ks = self._gmpc.hDict.keys()
210  self.log.info('%i Objects in Histogram Store' % (len(ks)))
211 
212  # crashes occurred due to Memory Error during the sending of hundreds
213  # histos on slc5 machines, so instead, break into chunks
214  # send 100 at a time
215  chunk = 100
216  reps = len(ks) / chunk + 1
217  for i in range(reps):
218  someKeys = ks[i * chunk:(i + 1) * chunk]
219  smalld = dict([(key, self._gmpc.hDict[key]) for key in someKeys])
220  self._gmpc.hq.put((self._gmpc.nodeID, smalld))
221  # "finished" Notification
222  self.log.debug('Signalling end of histos to Writer')
223  self._gmpc.hq.put('HISTOS_SENT')
224  self.log.debug('Waiting on Sync Event')
225  self._gmpc.sEvent.wait()
226  self.log.debug('Histo Sync Event set, clearing and returning')
227  self._gmpc.hvt.clearStore()
228  root = gbl.DataObject()
229  setOwnership(root, False)
230  self._gmpc.hvt.setRoot('/stat', root)
231  return SUCCESS
232 
233 
234 # =============================================================================
235 
236 
237 class EventCommunicator(object):
238  # This class is responsible for communicating Gaudi Events via Queues
239  # Events are communicated as TBufferFiles, filled either by the
240  # TESSerializer, or the GaudiSvc, "IPCSvc"
241  def __init__(self, GMPComponent, qin, qout):
242  self._gmpc = GMPComponent
243  self.log = self._gmpc.log
244  # maximum capacity of Queues
245  self.maxsize = 50
246  # central variables : Queues
247  self.qin = qin
248  self.qout = qout
249 
250  # flags
251  self.allsent = False
252  self.allrecv = False
253 
254  # statistics storage
255  self.nSent = 0 # counter : items sent
256  self.nRecv = 0 # counter : items received
257  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
258  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
259  self.qinTime = 0 # time : receiving from self.qin
260  self.qoutTime = 0 # time : sending on qout
261 
262  def send(self, item):
263  # This class manages the sending of a TBufferFile Event to a Queue
264  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
265  assert item.__class__.__name__ == 'tuple'
266  startTransmission = time.time()
267  self.qout.put(item)
268  # allow the background thread to feed the Queue; not 100% guaranteed to
269  # finish before next line
270  while self.qout._buffer:
271  time.sleep(NAP)
272  self.qoutTime += time.time() - startTransmission
273  self.sizeSent += item[1].Length()
274  self.nSent += 1
275  return SUCCESS
276 
277  def receive(self, timeout=None):
278  # Receive items from self.qin
279  startWait = time.time()
280  try:
281  itemIn = self.qin.get(timeout=timeout)
282  except Empty:
283  return None
284  self.qinTime += time.time() - startWait
285  self.nRecv += 1
286  if itemIn.__class__.__name__ == 'tuple':
287  self.sizeRecv += itemIn[1].Length()
288  else:
289  self.nRecv -= 1
290  try:
291  self.qin.task_done()
292  except:
293  self._gmpc.log.warning(
294  'TASK_DONE called too often by : %s' % (self._gmpc.nodeType))
295  return itemIn
296 
297  def finalize(self):
298  self.log.info('Finalize Event Communicator : %s %s' %
299  (self._gmpc, self._gmpc.nodeType))
300  # Reader sends one flag for each worker
301  # Workers send one flag each
302  # Writer sends nothing (it's at the end of the chain)
303  if self._gmpc.nodeType == 'Reader':
304  downstream = self._gmpc.nWorkers
305  elif self._gmpc.nodeType == 'Writer':
306  downstream = 0
307  elif self._gmpc.nodeType == 'Worker':
308  downstream = 1
309 
310  for i in range(downstream):
311  self.qout.put('FINISHED')
312  if self._gmpc.nodeType != 'Writer':
313  self.qout.join()
314  # Now some reporting...
315  self.statistics()
316 
317  def statistics(self):
318  self.log.name = '%s-%i Audit ' % (self._gmpc.nodeType,
319  self._gmpc.nodeID)
320  self.log.info('Items Sent : %i' % (self.nSent))
321  self.log.info('Items Received : %i' % (self.nRecv))
322  self.log.info('Data Sent : %i' % (self.sizeSent))
323  self.log.info('Data Received : %i' % (self.sizeRecv))
324  self.log.info('Q-out Time : %5.2f' % (self.qoutTime))
325  self.log.info('Q-in Time : %5.2f' % (self.qinTime))
326 
327 
328 # =============================================================================
329 
330 
331 class TESSerializer(object):
332  def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log):
333  self.T = gaudiTESSerializer
334  self.evt = evtDataSvc
335  self.buffersIn = []
336  self.buffersOut = []
337  self.nIn = 0
338  self.nOut = 0
339  self.tDump = 0.0
340  self.tLoad = 0.0
341  # logging
342  self.nodeType = nodeType
343  self.nodeID = nodeID
344  self.log = log
345 
346  def Load(self, tbuf):
347  root = gbl.DataObject()
348  setOwnership(root, False)
349  self.evt.setRoot('/Event', root)
350  t = time.time()
351  self.T.loadBuffer(tbuf)
352  self.tLoad += (time.time() - t)
353  self.nIn += 1
354  self.buffersIn.append(tbuf.Length())
355 
356  def Dump(self):
357  t = time.time()
358  tb = TBufferFile(TBuffer.kWrite)
359  self.T.dumpBuffer(tb)
360  self.tDump += (time.time() - t)
361  self.nOut += 1
362  self.buffersOut.append(tb.Length())
363  return tb
364 
365  def Report(self):
366  evIn = "Events Loaded : %i" % (self.nIn)
367  evOut = "Events Dumped : %i" % (self.nOut)
368  din = sum(self.buffersIn)
369  dataIn = "Data Loaded : %i" % (din)
370  dataInMb = "Data Loaded (MB) : %5.2f Mb" % (din / MB)
371  if self.nIn:
372  avgIn = "Avg Buf Loaded : %5.2f Mb"\
373  % (din / (self.nIn * MB))
374  maxIn = "Max Buf Loaded : %5.2f Mb"\
375  % (max(self.buffersIn) / MB)
376  else:
377  avgIn = "Avg Buf Loaded : N/A"
378  maxIn = "Max Buf Loaded : N/A"
379  dout = sum(self.buffersOut)
380  dataOut = "Data Dumped : %i" % (dout)
381  dataOutMb = "Data Dumped (MB) : %5.2f Mb" % (dout / MB)
382  if self.nOut:
383  avgOut = "Avg Buf Dumped : %5.2f Mb"\
384  % (din / (self.nOut * MB))
385  maxOut = "Max Buf Dumped : %5.2f Mb"\
386  % (max(self.buffersOut) / MB)
387  else:
388  avgOut = "Avg Buf Dumped : N/A"
389  maxOut = "Max Buf Dumped : N/A"
390  dumpTime = "Total Dump Time : %5.2f" % (self.tDump)
391  loadTime = "Total Load Time : %5.2f" % (self.tLoad)
392 
393  lines = evIn,\
394  evOut,\
395  dataIn,\
396  dataInMb,\
397  avgIn,\
398  maxIn,\
399  dataOut,\
400  dataOutMb,\
401  avgOut,\
402  maxOut,\
403  dumpTime,\
404  loadTime
405  self.log.name = "%s-%i TESSerializer" % (self.nodeType, self.nodeID)
406  for line in lines:
407  self.log.info(line)
408  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
409 
410 
411 # =============================================================================
412 
413 
414 class GMPComponent(object):
415  # This class will be the template for Reader, Worker and Writer
416  # containing all common components
417  # nodeId will be a numerical identifier for the node
418  # -1 for reader
419  # -2 for writer
420  # 0,...,nWorkers-1 for the Workers
421  def __init__(self, nodeType, nodeID, queues, events, params, subworkers):
422  # declare a Gaudi MultiProcessing Node
423  # the nodeType is going to be one of Reader, Worker, Writer
424  # qPair is going to be a tuple of ( qin, qout )
425  # for sending and receiving
426  # if nodeType is "Writer", it will be a list of qPairs,
427  # as there's one queue-in from each Worker
428  #
429  # params is a tuple of (nWorkers, config, log)
430 
431  self.nodeType = nodeType
432  current_process().name = nodeType
433 
434  # Synchronisation Event() objects for keeping track of the system
435  self.initEvent, eventLoopSyncer, self.finalEvent = events
436  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
437 
438  # necessary for knowledge of the system
439  self.nWorkers, self.sEvent, self.config, self.log = params
440  self.subworkers = subworkers
441  self.nodeID = nodeID
442  self.msgFormat = self.config['MessageSvc'].Format
443 
444  # describe the state of the node by the current Event Number
445  self.currentEvent = None
446 
447  # Unpack the Queues : (events, histos, filerecords)
448  self.queues = queues
449  self.num = 0
450 
451  ks = self.config.keys()
452  self.app = None
453  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
454  for k in list:
455  if k in ks:
456  self.app = k
457 
458  def Start(self):
459  # define the separate process
460  qPair, histq, fq = self.queues
461 
462  # Set up the Queue Mechanisms ( Event Communicators )
463  if self.nodeType == 'Reader' or self.nodeType == 'Worker':
464  # Reader or Worker Node
465  qin, qout = qPair
466  self.evcom = EventCommunicator(self, qin, qout)
467  else:
468  # Writer : many queues in, no queue out
469  assert self.nodeType == 'Writer'
470  self.evcoms = []
471  qsin = qPair[0]
472  for q in qsin:
473  ec = EventCommunicator(self, q, None)
474  self.evcoms.append(ec)
475  # Histogram Queue
476  self.hq = histq
477  # FileRecords Queue
478  self.fq = fq
479 
480  # Universal Counters (available to all nodes)
481  # Use sensibly!!!
482  self.nIn = 0
483  self.nOut = 0
484 
485  # Status Flag (possibly remove later)
486  self.stat = SUCCESS
487 
488  # Set logger name
489  self.log.name = '%s-%i' % (self.nodeType, self.nodeID)
490 
491  # Heuristic variables
492  # time for init, run, final, firstEventTime, totalTime
493  self.iTime = 0.0
494  self.rTime = 0.0
495  self.fTime = 0.0
496  self.firstEvTime = 0.0
497  self.tTime = 0.0
498 
499  self.proc = Process(target=self.Engine)
500  # Fork and start the separate process
501  self.proc.start()
502 
503  def Engine(self):
504  # This will be the method carried out by the Node
505  # Different for all
506  pass
507 
509  # Different for all ; customize Configuration for multicore
510  pass
511 
512  def SetupGaudiPython(self):
513  # This method will initialize the GaudiPython Tools
514  # such as the AppMgr and so on
515  self.a = AppMgr()
516  if SMAPS:
517  from AlgSmapShot import SmapShot
518  smapsLog = self.nodeType + '-' + str(self.nodeID) + '.smp'
519  ss = SmapShot(logname=smapsLog)
520  self.a.addAlgorithm(ss)
521  self.evt = self.a.evtsvc()
522  self.hvt = self.a.histsvc()
523  self.fsr = self.a.filerecordsvc()
524  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
525  self.pers = self.a.service('EventPersistencySvc', 'IAddressCreator')
526  self.ts = gbl.GaudiMP.TESSerializer(self.evt._idp, self.pers)
527  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
528  self.log)
529  return SUCCESS
530 
531  def StartGaudiPython(self):
532  self.a.initialize()
533  self.a.start()
534  return SUCCESS
535 
536  def LoadTES(self, tbufferfile):
537  root = gbl.DataObject()
538  setOwnership(root, False)
539  self.evt.setRoot('/Event', root)
540  self.ts.loadBuffer(tbufferfile)
541 
542  def getEventNumber(self):
543  if self.app != 'Gauss':
544  # Using getList or getHistoNames can result in the EventSelector
545  # re-initialising connection to RootDBase, which costs a lot of
546  # time... try to build a set of Header paths??
547 
548  # First Attempt : Unpacked Event Data
549  lst = ['/Event/Gen/Header', '/Event/Rec/Header']
550  for l in lst:
551  path = l
552  try:
553  n = self.evt[path].evtNumber()
554 
555  return n
556  except:
557  # No evt number at this path
558  continue
559 
560  # second attepmt : try DAQ/RawEvent data
561  # The Evt Number is in bank type 16, bank 0, data pt 4
562  try:
563  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
564 
565  return n
566  except:
567  pass
568 
569  # Default Action
570  if self.nIn > 0 or self.nOut > 0:
571  pass
572  else:
573  self.log.warning('Could not determine Event Number')
574  return -1
575  else:
576  if self.nodeID == -1:
577  self.num = self.num + 1
578 
579  return self.num
580 
581  def IdentifyWriters(self):
582  #
583  # Identify Writers in the Configuration
584  #
585  d = {}
586  keys = ["events", "records", "tuples", "histos"]
587  for k in keys:
588  d[k] = []
589 
590  # Identify Writers and Classify
591  wkeys = WRITERTYPES.keys()
592  for v in self.config.values():
593  if v.__class__.__name__ in wkeys:
594  writerType = WRITERTYPES[v.__class__.__name__]
595  d[writerType].append(MiniWriter(v, writerType, self.config))
596  if self.nodeID == 0:
597  self.log.info('Writer Found : %s' % (v.name()))
598 
599  # Now Check for the Histogram Service
600  if 'HistogramPersistencySvc' in self.config.keys():
601  hfile = self.config['HistogramPersistencySvc'].getProp(
602  'OutputFile')
603  d["histos"].append(hfile)
604  return d
605 
606  def dumpHistograms(self):
607  '''
608  Method used by the GaudiPython algorithm CollectHistos
609  to obtain a dictionary of form { path : object }
610  representing the Histogram Store
611  '''
612  nlist = self.hvt.getHistoNames()
613  histDict = {}
614  objects = 0
615  histos = 0
616  if nlist:
617  for n in nlist:
618  o = self.hvt[n]
619  if type(o) in aidatypes:
620  o = aida2root(o)
621  histos += 1
622  else:
623  objects += 1
624  histDict[n] = o
625  else:
626  print('WARNING : no histograms to recover?')
627  return histDict
628 
629  def Initialize(self):
630  start = time.time()
631  self.processConfiguration()
632  self.SetupGaudiPython()
633  self.initEvent.set()
634  self.StartGaudiPython()
635 
636  if self.app == 'Gauss':
637 
638  tool = self.a.tool("ToolSvc.EvtCounter")
639  self.cntr = InterfaceCast(gbl.IEventCounter)(tool.getInterface())
640  else:
641  self.cntr = None
642 
643  self.iTime = time.time() - start
644 
645  def Finalize(self):
646  start = time.time()
647  self.a.stop()
648  self.a.finalize()
649  self.log.info('%s-%i Finalized' % (self.nodeType, self.nodeID))
650  self.finalEvent.set()
651  self.fTime = time.time() - start
652 
653  def Report(self):
654  self.log.name = "%s-%i Audit" % (self.nodeType, self.nodeID)
655  allTime = "Alive Time : %5.2f" % (self.tTime)
656  initTime = "Init Time : %5.2f" % (self.iTime)
657  frstTime = "1st Event Time : %5.2f" % (self.firstEvTime)
658  runTime = "Run Time : %5.2f" % (self.rTime)
659  finTime = "Finalise Time : %5.2f" % (self.fTime)
660  tup = (allTime, initTime, frstTime, runTime, finTime)
661  for t in tup:
662  self.log.info(t)
663  self.log.name = "%s-%i" % (self.nodeType, self.nodeID)
664  # and report from the TESSerializer
665  self.TS.Report()
666 
667 
668 # =============================================================================
669 
670 
672  def __init__(self, queues, events, params, subworkers):
673  GMPComponent.__init__(self, 'Reader', -1, queues, events, params,
674  subworkers)
675 
677  # Reader :
678  # No algorithms
679  # No output
680  # No histos
681  self.config['ApplicationMgr'].TopAlg = []
682  self.config['ApplicationMgr'].OutStream = []
683  if "HistogramPersistencySvc" in self.config.keys():
684  self.config['HistogramPersistencySvc'].OutputFile = ''
685  self.config['MessageSvc'].Format = '%-13s ' % '[Reader]' + \
686  self.msgFormat
687  self.evtMax = self.config['ApplicationMgr'].EvtMax
688 
689  def DumpEvent(self):
690  tb = TBufferFile(TBuffer.kWrite)
691  # print '----Reader dumping Buffer!!!'
692  self.ts.dumpBuffer(tb)
693  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
694  return tb
695 
696  def DoFirstEvent(self):
697  # Do First Event ------------------------------------------------------
698  # Check Termination Criteria
699  startFirst = time.time()
700  self.log.info('Reader : First Event')
701  if self.nOut == self.evtMax:
702  self.log.info('evtMax( %i ) reached' % (self.evtMax))
703  self.lastEvent.set()
704  return SUCCESS
705  else:
706  # Continue to read, dump and send event
707  self.a.run(1)
708  if not bool(self.evt['/Event']):
709  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
710  self.lastEvent.set()
711  return SUCCESS
712  else:
713  # Popluate TESSerializer list and send Event
714  if self.app == "Gauss":
715  lst = self.evt.getHistoNames()
716  else:
717  try:
718  lst = self.evt.getList()
719  if self.app == "DaVinci":
720  daqnode = self.evt.retrieveObject(
721  '/Event/DAQ').registry()
722  setOwnership(daqnode, False)
723  self.evt.getList(daqnode, lst,
724  daqnode.address().par())
725  except:
726  self.log.critical('Reader could not acquire TES List!')
727  self.lastEvent.set()
728  return FAILURE
729  self.log.info('Reader : TES List : %i items' % (len(lst)))
730  for l in lst:
731  self.ts.addItem(l)
733  tb = self.TS.Dump()
734  self.log.info('First Event Sent')
735  self.evcom.send((self.currentEvent, tb))
736  self.nOut += 1
737  self.eventLoopSyncer.set()
738  self.evt.clearStore()
739  self.firstEvTime = time.time() - startFirst
740  return SUCCESS
741 
742  def Engine(self):
743  # rename process
744  import os
745  import ctypes
746  libc = ctypes.CDLL('libc.so.6')
747  name = str(self.nodeType) + str(self.nodeID) + '\0'
748  libc.prctl(15, name, 0, 0, 0)
749 
750  startEngine = time.time()
751  self.log.name = 'Reader'
752  self.log.info('Reader Process starting')
753 
754  self.Initialize()
755 
756  # add the Histogram Collection Algorithm
757  self.a.addAlgorithm(CollectHistograms(self))
758 
759  self.log.info('Reader Beginning Distribution')
760  sc = self.DoFirstEvent()
761  if sc.isSuccess():
762  self.log.info('Reader First Event OK')
763  else:
764  self.log.critical('Reader Failed on First Event')
765  self.stat = FAILURE
766 
767  # Do All Others -------------------------------------------------------
768  while True:
769  # Check Termination Criteria
770  if self.nOut == self.evtMax:
771  self.log.info('evtMax( %i ) reached' % (self.evtMax))
772  break
773  # Check Health
774  if not self.stat.isSuccess():
775  self.log.critical('Reader is Damaged!')
776  break
777  # Continue to read, dump and send event
778  t = time.time()
779  self.a.run(1)
780  self.rTime += (time.time() - t)
781  if not bool(self.evt['/Event']):
782  self.log.warning('No More Events! (So Far : %i)' % (self.nOut))
783  break
784  self.currentEvent = self.getEventNumber()
785  tb = self.TS.Dump()
786  self.evcom.send((self.currentEvent, tb))
787  # clean up
788  self.nOut += 1
789  self.eventLoopSyncer.set()
790  self.evt.clearStore()
791  self.log.info('Setting <Last> Event')
792  self.lastEvent.set()
793 
794  # Finalize
795  self.log.info('Reader : Event Distribution complete.')
796  self.evcom.finalize()
797  self.Finalize()
798  self.tTime = time.time() - startEngine
799  self.Report()
800 
801 
802 # =============================================================================
803 
804 
806  def __init__(self, workerID, queues, events, params, subworkers):
807  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
808  subworkers)
809  # Identify the writer streams
811  # Identify the accept/veto checks for each event
812  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
813  self.log.info("Subworker-%i Created OK" % (self.nodeID))
814  self.eventOutput = True
815 
816  def Engine(self):
817  # rename process
818  import os
819  import ctypes
820  libc = ctypes.CDLL('libc.so.6')
821  name = str(self.nodeType) + str(self.nodeID) + '\0'
822  libc.prctl(15, name, 0, 0, 0)
823 
824  self.initEvent.set()
825  startEngine = time.time()
826  msg = self.a.service('MessageSvc')
827  msg.Format = '%-13s ' % ('[' + self.log.name + ']') + self.msgFormat
828 
829  self.log.name = "Worker-%i" % (self.nodeID)
830  self.log.info("Subworker %i starting Engine" % (self.nodeID))
832 
833  # populate the TESSerializer itemlist
834  self.log.info(
835  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
836 
837  nEventWriters = len(self.writerDict["events"])
838  self.a.addAlgorithm(CollectHistograms(self))
839 
840  # Begin processing
841  Go = True
842  while Go:
843  packet = self.evcom.receive()
844  if packet:
845  pass
846  else:
847  continue
848  if packet == 'FINISHED':
849  break
850  evtNumber, tbin = packet # unpack
851  if self.cntr != None:
852 
853  self.cntr.setEventCounter(evtNumber)
854 
855  self.nIn += 1
856  self.TS.Load(tbin)
857 
858  t = time.time()
859  sc = self.a.executeEvent()
860  if self.nIn == 1:
861  self.firstEvTime = time.time() - t
862  else:
863  self.rTime += (time.time() - t)
864  if sc.isSuccess():
865  pass
866  else:
867  self.log.name = "Worker-%i" % (self.nodeID)
868  self.log.warning('Did not Execute Event')
869  self.evt.clearStore()
870  continue
871  if self.isEventPassed():
872  pass
873  else:
874  self.log.name = "Worker-%i" % (self.nodeID)
875  self.log.warning('Event did not pass : %i' % (evtNumber))
876  self.evt.clearStore()
877  continue
878  if self.eventOutput:
879  # It may be the case of generating Event Tags; hence
880  # no event output
882  tb = self.TS.Dump()
883  self.evcom.send((self.currentEvent, tb))
884  self.nOut += 1
885  self.inc.fireIncident(gbl.Incident('Subworker', 'EndEvent'))
886  self.eventLoopSyncer.set()
887  self.evt.clearStore()
888  self.log.name = "Worker-%i" % (self.nodeID)
889  self.log.info('Setting <Last> Event %s' % (self.nodeID))
890  self.lastEvent.set()
891 
892  self.evcom.finalize()
893  # Now send the FileRecords and stop/finalize the appMgr
894  self.filerecordsAgent.SendFileRecords()
895  self.tTime = time.time() - startEngine
896  self.Finalize()
897  self.Report()
898  # self.finalEvent.set()
899 
900  def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr):
901  self.a = a
902  self.evt = evt
903  self.hvt = hvt
904  self.fsr = fsr
905  #self.inc = inc
906  self.inc = self.a.service('IncidentSvc', 'IIncidentSvc')
907  self.pers = pers
908  self.ts = ts
909  self.cntr = cntr
910  self.TS = TESSerializer(self.ts, self.evt, self.nodeType, self.nodeID,
911  self.log)
912 
913  def getCheckAlgs(self):
914  '''
915  For some output writers, a check is performed to see if the event has
916  executed certain algorithms.
917  These reside in the AcceptAlgs property for those writers
918  '''
919  acc = []
920  req = []
921  vet = []
922  for m in self.writerDict["events"]:
923  if hasattr(m.w, 'AcceptAlgs'):
924  acc += m.w.AcceptAlgs
925  if hasattr(m.w, 'RequireAlgs'):
926  req += m.w.RequireAlgs
927  if hasattr(m.w, 'VetoAlgs'):
928  vet += m.w.VetoAlgs
929  return (acc, req, vet)
930 
931  def checkExecutedPassed(self, algName):
932  if self.a.algorithm(algName)._ialg.isExecuted()\
933  and self.a.algorithm(algName)._ialg.filterPassed():
934  return True
935  else:
936  return False
937 
938  def isEventPassed(self):
939  '''
940  Check the algorithm status for an event.
941  Depending on output writer settings, the event
942  may be declined based on various criteria.
943  This is a transcript of the check that occurs in GaudiSvc::OutputStream
944  '''
945  passed = False
946 
947  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
948  if self.acceptAlgs:
949  for name in self.acceptAlgs:
950  if self.checkExecutedPassed(name):
951  passed = True
952  break
953  else:
954  passed = True
955 
956  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
957  for name in self.requireAlgs:
958  if self.checkExecutedPassed(name):
959  pass
960  else:
961  self.log.info('Evt declined (requireAlgs) : %s' % (name))
962  passed = False
963 
964  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
965  for name in self.vetoAlgs:
966  if self.checkExecutedPassed(name):
967  pass
968  else:
969  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
970  passed = False
971  return passed
972 
973 
974 # =============================================================================
975 
976 
978  def __init__(self, workerID, queues, events, params, subworkers):
979  GMPComponent.__init__(self, 'Worker', workerID, queues, events, params,
980  subworkers)
981  # Identify the writer streams
983  # Identify the accept/veto checks for each event
984  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
985  self.log.name = "Worker-%i" % (self.nodeID)
986  self.log.info("Worker-%i Created OK" % (self.nodeID))
987  self.eventOutput = True
988 
990 
991  # Worker :
992  # No input
993  # No output
994  # No Histos
995  self.config['EventSelector'].Input = []
996  self.config['ApplicationMgr'].OutStream = []
997  if "HistogramPersistencySvc" in self.config.keys():
998  self.config['HistogramPersistencySvc'].OutputFile = ''
999  formatHead = '[Worker-%i]' % (self.nodeID)
1000  self.config['MessageSvc'].Format = '%-13s ' % formatHead + \
1001  self.msgFormat
1002 
1003  for key, lst in self.writerDict.iteritems():
1004  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1005 
1006  for m in self.writerDict["tuples"]:
1007  # rename Tuple output file with an appendix
1008  # based on worker id, for merging later
1009  newName = m.getNewName('.', '.w%i.' % (self.nodeID))
1010  self.config[m.key].Output = newName
1011 
1012  # Suppress INFO Output for all but Worker-0
1013  # if self.nodeID == 0 :
1014  # pass
1015  # else :
1016  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1017 
1018  if self.app == "Gauss":
1019  try:
1020  if "ToolSvc.EvtCounter" not in self.config:
1021  from Configurables import EvtCounter
1022  counter = EvtCounter()
1023  else:
1024  counter = self.config["ToolSvc.EvtCounter"]
1025  counter.UseIncident = False
1026  except:
1027  # ignore errors when trying to change the configuration of the EvtCounter
1028  self.log.warning('Cannot configure EvtCounter')
1029 
1030  def Engine(self):
1031 
1032  # rename process
1033  import os
1034  import ctypes
1035  libc = ctypes.CDLL('libc.so.6')
1036  name = str(self.nodeType) + str(self.nodeID) + '\0'
1037  libc.prctl(15, name, 0, 0, 0)
1038 
1039  startEngine = time.time()
1040  self.log.info("Worker %i starting Engine" % (self.nodeID))
1041  self.Initialize()
1043 
1044  # populate the TESSerializer itemlist
1045  self.log.info(
1046  'EVT WRITERS ON WORKER : %i' % (len(self.writerDict['events'])))
1047 
1048  nEventWriters = len(self.writerDict["events"])
1049  if nEventWriters:
1050  itemList = set()
1051  optItemList = set()
1052  for m in self.writerDict["events"]:
1053  for item in m.ItemList:
1054  hsh = item.find('#')
1055  if hsh != -1:
1056  item = item[:hsh]
1057  itemList.add(item)
1058  for item in m.OptItemList:
1059  hsh = item.find('#')
1060  if hsh != -1:
1061  item = item[:hsh]
1062  optItemList.add(item)
1063  # If an item is mandatory and optional, keep it only in the optional list
1064  itemList -= optItemList
1065  for item in sorted(itemList):
1066  self.log.info(' adding ItemList Item to ts : %s' % (item))
1067  self.ts.addItem(item)
1068  for item in sorted(optItemList):
1069  self.log.info(' adding Optional Item to ts : %s' % (item))
1070  self.ts.addOptItem(item)
1071  else:
1072  self.log.info('There is no Event Output for this app')
1073  self.eventOutput = False
1074 
1075  # Begin processing
1076  Go = True
1077  while Go:
1078  packet = self.evcom.receive()
1079  if packet:
1080  pass
1081  else:
1082  continue
1083  if packet == 'FINISHED':
1084  break
1085  evtNumber, tbin = packet # unpack
1086  if self.cntr != None:
1087  self.cntr.setEventCounter(evtNumber)
1088 
1089  # subworkers are forked before the first event is processed
1090  if self.nIn == 0:
1091  self.log.info("Fork new subworkers")
1092 
1093  # Fork subworkers and share services
1094  for k in self.subworkers:
1095  k.SetServices(self.a, self.evt, self.hvt, self.fsr,
1096  self.inc, self.pers, self.ts, self.cntr)
1097  k.Start()
1098  self.a.addAlgorithm(CollectHistograms(self))
1099  self.nIn += 1
1100  self.TS.Load(tbin)
1101 
1102  t = time.time()
1103  sc = self.a.executeEvent()
1104  if self.nIn == 1:
1105  self.firstEvTime = time.time() - t
1106  else:
1107  self.rTime += (time.time() - t)
1108  if sc.isSuccess():
1109  pass
1110  else:
1111  self.log.warning('Did not Execute Event')
1112  self.evt.clearStore()
1113  continue
1114  if self.isEventPassed():
1115  pass
1116  else:
1117  self.log.warning('Event did not pass : %i' % (evtNumber))
1118  self.evt.clearStore()
1119  continue
1120  if self.eventOutput:
1121  # It may be the case of generating Event Tags; hence
1122  # no event output
1124  tb = self.TS.Dump()
1125  self.evcom.send((self.currentEvent, tb))
1126  self.nOut += 1
1127  self.inc.fireIncident(gbl.Incident('Worker', 'EndEvent'))
1128  self.eventLoopSyncer.set()
1129  self.evt.clearStore()
1130  self.log.info('Setting <Last> Event')
1131  self.lastEvent.set()
1132 
1133  self.evcom.finalize()
1134  self.log.info('Worker-%i Finished Processing Events' % (self.nodeID))
1135  # Now send the FileRecords and stop/finalize the appMgr
1136  self.filerecordsAgent.SendFileRecords()
1137  self.Finalize()
1138  self.tTime = time.time() - startEngine
1139  self.Report()
1140 
1141  for k in self.subworkers:
1142  self.log.info('Join subworkers')
1143  k.proc.join()
1144 
1145  def getCheckAlgs(self):
1146  '''
1147  For some output writers, a check is performed to see if the event has
1148  executed certain algorithms.
1149  These reside in the AcceptAlgs property for those writers
1150  '''
1151  acc = []
1152  req = []
1153  vet = []
1154  for m in self.writerDict["events"]:
1155  if hasattr(m.w, 'AcceptAlgs'):
1156  acc += m.w.AcceptAlgs
1157  if hasattr(m.w, 'RequireAlgs'):
1158  req += m.w.RequireAlgs
1159  if hasattr(m.w, 'VetoAlgs'):
1160  vet += m.w.VetoAlgs
1161  return (acc, req, vet)
1162 
1163  def checkExecutedPassed(self, algName):
1164  if self.a.algorithm(algName)._ialg.isExecuted()\
1165  and self.a.algorithm(algName)._ialg.filterPassed():
1166  return True
1167  else:
1168  return False
1169 
1170  def isEventPassed(self):
1171  '''
1172  Check the algorithm status for an event.
1173  Depending on output writer settings, the event
1174  may be declined based on various criteria.
1175  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1176  '''
1177  passed = False
1178 
1179  self.log.debug('self.acceptAlgs is %s' % (str(self.acceptAlgs)))
1180  if self.acceptAlgs:
1181  for name in self.acceptAlgs:
1182  if self.checkExecutedPassed(name):
1183  passed = True
1184  break
1185  else:
1186  passed = True
1187 
1188  self.log.debug('self.requireAlgs is %s' % (str(self.requireAlgs)))
1189  for name in self.requireAlgs:
1190  if self.checkExecutedPassed(name):
1191  pass
1192  else:
1193  self.log.info('Evt declined (requireAlgs) : %s' % (name))
1194  passed = False
1195 
1196  self.log.debug('self.vetoAlgs is %s' % (str(self.vetoAlgs)))
1197  for name in self.vetoAlgs:
1198  if self.checkExecutedPassed(name):
1199  pass
1200  else:
1201  self.log.info('Evt declined : (vetoAlgs) : %s' % (name))
1202  passed = False
1203  return passed
1204 
1205 
1206 # =============================================================================
1207 
1208 
1210  def __init__(self, queues, events, params, subworkers):
1211  GMPComponent.__init__(self, 'Writer', -2, queues, events, params,
1212  subworkers)
1213  # Identify the writer streams
1215  # This keeps track of workers as they finish
1216  self.status = [False] * self.nWorkers
1217  self.log.name = "Writer--2"
1218 
1220  # Writer :
1221  # No input
1222  # No Algs
1223  self.config['ApplicationMgr'].TopAlg = []
1224  self.config['EventSelector'].Input = []
1225 
1226  self.config['MessageSvc'].Format = '%-13s ' % '[Writer]' + \
1227  self.msgFormat
1228 
1229  # Now process the output writers
1230  for key, lst in self.writerDict.iteritems():
1231  self.log.info('Writer Type : %s\t : %i' % (key, len(lst)))
1232 
1233  # Modify the name of the output file to reflect that it came
1234  # from a parallel processing
1235  #
1236  # Event Writers
1237  for m in self.writerDict["events"]:
1238  self.log.debug('Processing Event Writer : %s' % (m))
1239  newName = m.getNewName('.', '.p%i.' % self.nWorkers)
1240  self.config[m.key].Output = newName
1241 
1242  # Now, if there are no event writers, the FileRecords file
1243  # will fail to open, as it only opens an UPDATE version
1244  # of the existing Event Output File
1245  # So, if there are no event writers, edit the string of the
1246  # FileRecord Writer
1247 
1248  # FileRecords Writers
1249  for m in self.writerDict["records"]:
1250  self.log.debug('Processing FileRecords Writer: %s' % (m))
1251  newName = m.getNewName(
1252  '.', '.p%i.' % self.nWorkers, extra=" OPT='RECREATE'")
1253  self.config[m.key].Output = newName
1254 
1255  # same for histos
1256  hs = "HistogramPersistencySvc"
1257  n = None
1258  if hs in self.config.keys():
1259  n = self.config[hs].OutputFile
1260  if n:
1261  newName = self.config[hs].OutputFile.replace(
1262  '.', '.p%i.' % (self.nWorkers))
1263  self.config[hs].OutputFile = newName
1264 
1265  def Engine(self):
1266  # rename process
1267  import os
1268  import ctypes
1269  libc = ctypes.CDLL('libc.so.6')
1270  name = str(self.nodeType) + str(self.nodeID) + '\0'
1271  libc.prctl(15, name, 0, 0, 0)
1272 
1273  startEngine = time.time()
1274  self.Initialize()
1275  self.histoAgent = HistoAgent(self)
1277 
1278  # Begin processing
1279  Go = True
1280  current = -1
1281  stopCriteria = self.nWorkers
1282  while Go:
1283  current = (current + 1) % self.nWorkers
1284  packet = self.evcoms[current].receive(timeout=0.01)
1285  if packet == None:
1286  continue
1287  if packet == 'FINISHED':
1288  self.log.info(
1289  'Writer got FINISHED flag : Worker %i' % (current))
1290 
1291  self.status[current] = True
1292  if all(self.status):
1293  self.log.info('FINISHED recd from all workers, break loop')
1294  break
1295  continue
1296  # otherwise, continue as normal
1297  self.nIn += 1 # update central count (maybe needed by FSR store)
1298  evtNumber, tbin = packet # unpack
1299  self.TS.Load(tbin)
1300  t = time.time()
1301  self.a.executeEvent()
1302  self.rTime += (time.time() - t)
1304  self.evt.clearStore()
1305  self.eventLoopSyncer.set()
1306  self.log.name = "Writer--2"
1307  self.log.info('Setting <Last> Event')
1308  self.lastEvent.set()
1309 
1310  # finalisation steps
1311  [e.finalize() for e in self.evcoms]
1312  # Now do Histograms
1313  sc = self.histoAgent.Receive()
1314  sc = self.histoAgent.RebuildHistoStore()
1315  if sc.isSuccess():
1316  self.log.info('Histo Store rebuilt ok')
1317  else:
1318  self.log.warning('Histo Store Error in Rebuild')
1319 
1320  # Now do FileRecords
1321  sc = self.filerecordsAgent.Receive()
1322  self.filerecordsAgent.Rebuild()
1323  self.Finalize()
1324  #self.rTime = time.time()-startEngine
1325  self.Report()
1326 
1327 
1328 # =============================================================================
1329 
1330 # =============================================================================
1331 
1332 
1333 class Coord(object):
1334  def __init__(self, nWorkers, config, log):
1335 
1336  self.log = log
1337  self.config = config
1338  # set up Logging
1339  self.log.name = 'GaudiPython-Parallel-Logger'
1340  self.log.info('GaudiPython Parallel Process Co-ordinator beginning')
1341 
1342  if nWorkers == -1:
1343  # The user has requested all available cpus in the machine
1344  self.nWorkers = cpu_count()
1345  else:
1346  self.nWorkers = nWorkers
1347 
1348  self.qs = self.SetupQueues() # a dictionary of queues (for Events)
1349  self.hq = JoinableQueue() # for Histogram data
1350  self.fq = JoinableQueue() # for FileRecords data
1351 
1352  # Make a Syncer for Initalise, Run, and Finalise
1353  self.sInit = Syncer(
1354  self.nWorkers,
1355  self.log,
1356  limit=WAIT_INITIALISE,
1357  step=STEP_INITIALISE)
1358  self.sRun = Syncer(
1359  self.nWorkers,
1360  self.log,
1361  manyEvents=True,
1362  limit=WAIT_SINGLE_EVENT,
1363  step=STEP_EVENT,
1364  firstEvent=WAIT_FIRST_EVENT)
1365  self.sFin = Syncer(
1366  self.nWorkers, self.log, limit=WAIT_FINALISE, step=STEP_FINALISE)
1367  # and one final one for Histogram Transfer
1368  self.histSyncEvent = Event()
1369 
1370  # params are common to al subprocesses
1371  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1372 
1373  self.subworkers = []
1374  # Declare SubProcesses!
1375  for i in range(1, self.nWorkers):
1376  sub = Subworker(i, self.getQueues(i), self.getSyncEvents(i),
1377  params, self.subworkers)
1378  self.subworkers.append(sub)
1379  self.reader = Reader(
1380  self.getQueues(-1), self.getSyncEvents(-1), params,
1381  self.subworkers)
1382  self.workers = []
1383  wk = Worker(0, self.getQueues(0), self.getSyncEvents(0), params,
1384  self.subworkers)
1385  self.writer = Writer(
1386  self.getQueues(-2), self.getSyncEvents(-2), params,
1387  self.subworkers)
1388 
1389  self.system = []
1390  self.system.append(self.writer)
1391  self.system.append(wk)
1392  self.system.append(self.reader)
1393 
1394  def getSyncEvents(self, nodeID):
1395  init = self.sInit.d[nodeID].event
1396  run = (self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent)
1397  fin = self.sFin.d[nodeID].event
1398  return (init, run, fin)
1399 
1400  def getQueues(self, nodeID):
1401  eventQ = self.qs[nodeID]
1402  histQ = self.hq
1403  fsrQ = self.fq
1404  return (eventQ, histQ, fsrQ)
1405 
1406  def Go(self):
1407 
1408  # Initialise
1409  self.log.name = 'GaudiPython-Parallel-Logger'
1410  self.log.info('INITIALISING SYSTEM')
1411 
1412  # Start reader, writer and main worker
1413  for p in self.system:
1414  p.Start()
1415 
1416  sc = self.sInit.syncAll(step="Initialise")
1417  if sc == SUCCESS:
1418  pass
1419  else:
1420  self.Terminate()
1421  return FAILURE
1422 
1423  # Run
1424  self.log.name = 'GaudiPython-Parallel-Logger'
1425  self.log.info('RUNNING SYSTEM')
1426  sc = self.sRun.syncAll(step="Run")
1427  if sc == SUCCESS:
1428  pass
1429  else:
1430  self.Terminate()
1431  return FAILURE
1432 
1433  # Finalise
1434  self.log.name = 'GaudiPython-Parallel-Logger'
1435  self.log.info('FINALISING SYSTEM')
1436  sc = self.sFin.syncAll(step="Finalise")
1437  if sc == SUCCESS:
1438  pass
1439  else:
1440  self.Terminate()
1441  return FAILURE
1442 
1443  # if we've got this far, finally report SUCCESS
1444  self.log.info("Cleanly join all Processes")
1445  self.Stop()
1446  self.log.info("Report Total Success to Main.py")
1447  return SUCCESS
1448 
1449  def Terminate(self):
1450  # Brutally kill sub-processes
1451  children = multiprocessing.active_children()
1452  for i in children:
1453  i.terminate()
1454 
1455  # self.writer.proc.terminate()
1456  #[ w.proc.terminate() for w in self.workers]
1457  # self.reader.proc.terminate()
1458 
1459  def Stop(self):
1460  # procs should be joined in reverse order to launch
1461  self.system.reverse()
1462  for s in self.system:
1463  s.proc.join()
1464  return SUCCESS
1465 
1466  def SetupQueues(self):
1467  # This method will set up the network of Queues needed
1468  # N Queues = nWorkers + 1
1469  # Each Worker has a Queue in, and a Queue out
1470  # Reader has Queue out only
1471  # Writer has nWorkers Queues in
1472 
1473  # one queue from Reader-Workers
1474  rwk = JoinableQueue()
1475  # one queue from each worker to writer
1476  workersWriter = [JoinableQueue() for i in range(self.nWorkers)]
1477  d = {}
1478  d[-1] = (None, rwk) # Reader
1479  d[-2] = (workersWriter, None) # Writer
1480  for i in range(self.nWorkers):
1481  d[i] = (rwk, workersWriter[i])
1482  return d
1483 
1484 
1485 # ============================= EOF ===========================================
def Load(self, tbuf)
Definition: GMPBase.py:346
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:421
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:672
executeEvent
Helpers for re-entrant interfaces.
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:122
StatusCode finalize() override
Definition: Algorithm.cpp:104
StatusCode execute() override
Definition: Algorithm.cpp:100
def SetupQueues(self)
Definition: GMPBase.py:1466
def receive(self, timeout=None)
Definition: GMPBase.py:277
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:241
def __init__(self, gmpcomponent)
Definition: GMPBase.py:196
def DumpEvent(self)
Definition: GMPBase.py:689
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:49
def DoFirstEvent(self)
Definition: GMPBase.py:696
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:806
EventIDBase max(const EventIDBase &lhs, const EventIDBase &rhs)
Definition: EventIDBase.h:215
def Terminate(self)
Definition: GMPBase.py:1449
def start
Definition: IOTest.py:98
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:931
def processConfiguration(self)
Definition: GMPBase.py:989
def getCheckAlgs(self)
Definition: GMPBase.py:1145
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:536
def getItemLists(self, config)
Definition: GMPBase.py:143
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1210
def set(self, key, output)
Definition: GMPBase.py:160
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1334
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
def __init__(self, writer, wType, config)
Definition: GMPBase.py:91
def processConfiguration(self)
Definition: GMPBase.py:508
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1394
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:900
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:978
Python Algorithm base class.
Definition: Algorithm.h:30
def isEventPassed(self)
Definition: GMPBase.py:1170
def processConfiguration(self)
Definition: GMPBase.py:676
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
def getQueues(self, nodeID)
Definition: GMPBase.py:1400
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1163
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:332
def processConfiguration(self)
Definition: GMPBase.py:1219