All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
3 from ROOT import TBufferFile, TBuffer
4 import multiprocessing
5 from multiprocessing import Process, Queue, JoinableQueue, Event
6 from multiprocessing import cpu_count, current_process
7 from multiprocessing.queues import Empty
8 from pTools import *
9 import time, sys, os
10 from ROOT import TParallelMergingFile
11 # This script contains the bases for the Gaudi MultiProcessing (GMP)
12 # classes
13 
14 # There are three classes :
15 # Reader
16 # Worker
17 # Writer
18 
19 # Each class needs to perform communication with the others
20 # For this, we need a means of communication, which will be based on
21 # the python multiprocessing package
22 # This is provided in SPI pytools package
23 # cmt line : use pytools v1.1 LCG_Interfaces
24 # The PYTHONPATH env variable may need to be modified, as this might
25 # still point to 1.0_python2.5
26 
27 # Each class will need Queues, and a defined method for using these
28 # queues.
29 # For example, as long as there is something in the Queue, both ends
30 # of the queue must be open
31 # Also, there needs to be proper Termination flags and criteria
32 # The System should be error proof.
33 
34 
35 # Constants -------------------------------------------------------------------
36 NAP = 0.001
37 MB = 1024.0*1024.0
38 # waits to guard against hanging, in seconds
39 WAIT_INITIALISE = 60*5
40 WAIT_FIRST_EVENT = 60*3
41 WAIT_SINGLE_EVENT = 60*6
42 WAIT_FINALISE = 60*2
43 STEP_INITIALISE = 10
44 STEP_EVENT = 2
45 STEP_FINALISE = 10
46 
47 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
48 SMAPS = False
49 
50 # -----------------------------------------------------------------------------
51 
52 # definitions
53 # ----------
54 # used to convert stored histos (in AIDA format) to ROOT format
55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
56 
57 # used to check which type of histo we are dealing with
58 # i.e. if currentHisto in aidatypes : pass
59 aidatypes = ( gbl.AIDA.IHistogram,
60  gbl.AIDA.IHistogram1D,
61  gbl.AIDA.IHistogram2D,
62  gbl.AIDA.IHistogram3D,
63  gbl.AIDA.IProfile1D,
64  gbl.AIDA.IProfile2D,
65  gbl.AIDA.IBaseHistogram ) # extra?
66 
67 # similar to aidatypes
68 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
69 
70 # Types of OutputStream in Gaudi
71 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
72  'InputCopyStream' : "events",
73  'OutputStream' : "events",
74  'RecordStream' : "records",
75  'RunRecordStream' : "records",
76  'SequentialOutputStream' : "events",
77  'TagCollectionStream' : "tuples" }
78 
79 # =============================================================================
80 
81 class MiniWriter( object ) :
82  '''
83  A class to represent a writer in the GaudiPython configuration
84  It can be non-trivial to access the name of the output file; it may be
85  specified in the DataSvc, or just on the writer, may be a list, or string
86  Also, there are three different types of writer (events, records, tuples)
87  so this bootstrap class provides easy access to this info while configuring
88  '''
89  def __init__( self, writer, wType, config ) :
90  self.w = writer
91  self.wType = wType
92  # set parameters for directly accessing the correct
93  # part of the configuration, so that later, we can do
94  # config[ key ].Output = modified(output)
95  self.key = None
96  self.output = None
97  self.ItemList = None
98  self.OptItemList = None
99  #
100  self.wName = writer.getName()
101  # Now process the Writer, find where the output is named
102  self.woutput = None
103  self.datasvcName = None
104  self.svcOutput = None
105  if hasattr( self.w, "Output" ) :
106  self.woutput = self.w.Output
107  self.getItemLists( config )
108  self.set( self.wName, self.w.Output )
109  return
110  else :
111  # if there is no output file, get it via the datasvc
112  # (every writer always has a datasvc property)
113  self.datasvcName = self.w.EvtDataSvc
114  datasvc = config[ self.datasvcName ]
115  if hasattr( datasvc, "Output" ) :
116  self.getItemLists( config )
117  self.set( self.datasvcName, datasvc.Output )
118  return
119 
120  def getNewName( self, replaceThis, withThis, extra='' ) :
121  # replace one pattern in the output name string
122  # with another, and return the Output name
123  # It *might* be in a list, so check for this
124  #
125  # @param extra : might need to add ROOT flags
126  # i.e.: OPT='RECREATE', or such
127  assert replaceThis.__class__.__name__ == 'str'
128  assert withThis.__class__.__name__ == 'str'
129  old = self.output
130  lst = False
131  if old.__class__.__name__ == 'list' :
132  old = self.output[0]
133  lst = True
134  new = old.replace( replaceThis, withThis )
135  new += extra
136  if lst :
137  return [ new ]
138  else :
139  return new
140 
141  def getItemLists( self, config ) :
142  # the item list
143  if hasattr( self.w, "ItemList" ) :
144  self.ItemList = self.w.ItemList
145  else :
146  datasvc = config[ self.w.EvtDataSvc ]
147  if hasattr( datasvc, "ItemList" ) :
148  self.ItemList = datasvc.ItemList
149  # The option item list; possibly not a valid variable
150  if hasattr( self.w, "OptItemList" ) :
151  self.OptItemList = self.w.OptItemList
152  else :
153  datasvc = config[ self.w.EvtDataSvc ]
154  if hasattr( datasvc, "OptItemList" ) :
155  self.OptItemList = datasvc.OptItemList
156  return
157 
158  def set( self, key, output ) :
159  self.key = key
160  self.output = output
161  return
162 
163  def __repr__( self ) :
164  s = ""
165  line = '-'*80
166  s += (line+'\n')
167  s += "Writer : %s\n"%( self.wName )
168  s += "Writer Type : %s\n"%( self.wType )
169  s += "Writer Output : %s\n"%( self.output )
170  s += "DataSvc : %s\n"%( self.datasvcName )
171  s += "DataSvc Output : %s\n"%( self.svcOutput )
172  s += '\n'
173  s += "Key for config : %s\n"%( self.key )
174  s += "Output File : %s\n"%( self.output )
175  s += "ItemList : %s\n"%( self.ItemList )
176  s += "OptItemList : %s\n"%( self.OptItemList )
177  s += (line+'\n')
178  return s
179 
180 # =============================================================================
181 
183  '''
184  GaudiPython algorithm used to clean up histos on the Reader and Workers
185  Only has a finalize method()
186  This retrieves a dictionary of path:histo objects and sends it to the
187  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
188  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
189  at the writer end, there will be an error.
190  '''
191  def __init__( self, gmpcomponent ) :
192  PyAlgorithm.__init__( self )
193  self._gmpc = gmpcomponent
194  self.log = self._gmpc.log
195  return None
196  def execute( self ) :
197  return SUCCESS
198  def finalize( self ) :
199  self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
200  self._gmpc.hDict = self._gmpc.dumpHistograms( )
201  ks = self._gmpc.hDict.keys()
202  self.log.info('%i Objects in Histogram Store'%(len(ks)))
203 
204  # crashes occurred due to Memory Error during the sending of hundreds
205  # histos on slc5 machines, so instead, break into chunks
206  # send 100 at a time
207  chunk = 100
208  reps = len(ks)/chunk + 1
209  for i in xrange(reps) :
210  someKeys = ks[i*chunk : (i+1)*chunk]
211  smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
212  self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
213  # "finished" Notification
214  self.log.debug('Signalling end of histos to Writer')
215  self._gmpc.hq.put( 'HISTOS_SENT' )
216  self.log.debug( 'Waiting on Sync Event' )
217  self._gmpc.sEvent.wait()
218  self.log.debug( 'Histo Sync Event set, clearing and returning' )
219  self._gmpc.hvt.clearStore()
220  root = gbl.DataObject()
221  setOwnership(root, False)
222  self._gmpc.hvt.setRoot( '/stat', root )
223  return SUCCESS
224 
225 # =============================================================================
226 
227 class EventCommunicator( object ) :
228  # This class is responsible for communicating Gaudi Events via Queues
229  # Events are communicated as TBufferFiles, filled either by the
230  # TESSerializer, or the GaudiSvc, "IPCSvc"
231  def __init__( self, GMPComponent, qin, qout ) :
232  self._gmpc = GMPComponent
233  self.log = self._gmpc.log
234  # maximum capacity of Queues
235  self.maxsize = 50
236  # central variables : Queues
237  self.qin = qin
238  self.qout = qout
239 
240  # flags
241  self.allsent = False
242  self.allrecv = False
243 
244  # statistics storage
245  self.nSent = 0 # counter : items sent
246  self.nRecv = 0 # counter : items received
247  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
248  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
249  self.qinTime = 0 # time : receiving from self.qin
250  self.qoutTime = 0 # time : sending on qout
251 
252  def send( self, item ) :
253  # This class manages the sending of a TBufferFile Event to a Queue
254  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
255  assert item.__class__.__name__ == 'tuple'
256  startTransmission = time.time()
257  self.qout.put( item )
258  # allow the background thread to feed the Queue; not 100% guaranteed to
259  # finish before next line
260  while self.qout._buffer : time.sleep( NAP )
261  self.qoutTime += time.time()-startTransmission
262  self.sizeSent += item[1].Length()
263  self.nSent += 1
264  return SUCCESS
265 
266  def receive( self, timeout=None ) :
267  # Receive items from self.qin
268  startWait = time.time()
269  try :
270  itemIn = self.qin.get( timeout=timeout )
271  except Empty :
272  return None
273  self.qinTime += time.time()-startWait
274  self.nRecv += 1
275  if itemIn.__class__.__name__ == 'tuple' :
276  self.sizeRecv += itemIn[1].Length()
277  else :
278  self.nRecv -= 1
279  try :
280  self.qin.task_done()
281  except :
282  self._gmpc.log.warning('TASK_DONE called too often by : %s'\
283  %(self._gmpc.nodeType))
284  return itemIn
285 
286  def finalize( self ) :
287  self.log.info('Finalize Event Communicator : %s %s'%(self._gmpc, self._gmpc.nodeType))
288  # Reader sends one flag for each worker
289  # Workers send one flag each
290  # Writer sends nothing (it's at the end of the chain)
291  if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
292  elif self._gmpc.nodeType == 'Writer' : downstream = 0
293  elif self._gmpc.nodeType == 'Worker' : downstream = 1
294 
295  for i in xrange(downstream) :
296  self.qout.put( 'FINISHED' )
297  if self._gmpc.nodeType != 'Writer' :
298  self.qout.join()
299  # Now some reporting...
300  self.statistics( )
301 
302  def statistics( self ) :
303  self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
304  self.log.info ( 'Items Sent : %i'%(self.nSent) )
305  self.log.info ( 'Items Received : %i'%(self.nRecv) )
306  self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
307  self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
308  self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
309  self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
310 
311 # =============================================================================
312 
313 class TESSerializer( object ) :
314  def __init__( self, gaudiTESSerializer, evtDataSvc,
315  nodeType, nodeID, log ) :
316  self.T = gaudiTESSerializer
317  self.evt = evtDataSvc
318  self.buffersIn = []
319  self.buffersOut = []
320  self.nIn = 0
321  self.nOut = 0
322  self.tDump = 0.0
323  self.tLoad = 0.0
324  # logging
325  self.nodeType = nodeType
326  self.nodeID = nodeID
327  self.log = log
328  def Load( self, tbuf ) :
329  root = gbl.DataObject()
330  setOwnership( root, False )
331  self.evt.setRoot( '/Event', root )
332  t = time.time()
333  self.T.loadBuffer( tbuf )
334  self.tLoad += (time.time() - t)
335  self.nIn += 1
336  self.buffersIn.append( tbuf.Length() )
337  def Dump( self ) :
338  t = time.time()
339  tb = TBufferFile( TBuffer.kWrite )
340  self.T.dumpBuffer(tb)
341  self.tDump += ( time.time()-t )
342  self.nOut += 1
343  self.buffersOut.append( tb.Length() )
344  return tb
345  def Report( self ) :
346  evIn = "Events Loaded : %i"%( self.nIn )
347  evOut = "Events Dumped : %i"%( self.nOut )
348  din = sum( self.buffersIn )
349  dataIn = "Data Loaded : %i"%(din)
350  dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
351  if self.nIn :
352  avgIn = "Avg Buf Loaded : %5.2f Mb"\
353  %( din/(self.nIn*MB) )
354  maxIn = "Max Buf Loaded : %5.2f Mb"\
355  %( max(self.buffersIn)/MB )
356  else :
357  avgIn = "Avg Buf Loaded : N/A"
358  maxIn = "Max Buf Loaded : N/A"
359  dout = sum( self.buffersOut )
360  dataOut = "Data Dumped : %i"%(dout)
361  dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
362  if self.nOut :
363  avgOut = "Avg Buf Dumped : %5.2f Mb"\
364  %( din/(self.nOut*MB) )
365  maxOut = "Max Buf Dumped : %5.2f Mb"\
366  %( max(self.buffersOut)/MB )
367  else :
368  avgOut = "Avg Buf Dumped : N/A"
369  maxOut = "Max Buf Dumped : N/A"
370  dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
371  loadTime = "Total Load Time : %5.2f"%( self.tLoad )
372 
373  lines = evIn ,\
374  evOut ,\
375  dataIn ,\
376  dataInMb ,\
377  avgIn ,\
378  maxIn ,\
379  dataOut ,\
380  dataOutMb,\
381  avgOut ,\
382  maxOut ,\
383  dumpTime ,\
384  loadTime
385  self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
386  for line in lines :
387  self.log.info( line )
388  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
389 
390 # =============================================================================
391 
392 class GMPComponent( object ) :
393  # This class will be the template for Reader, Worker and Writer
394  # containing all common components
395  # nodeId will be a numerical identifier for the node
396  # -1 for reader
397  # -2 for writer
398  # 0,...,nWorkers-1 for the Workers
399  def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
400  # declare a Gaudi MultiProcessing Node
401  # the nodeType is going to be one of Reader, Worker, Writer
402  # qPair is going to be a tuple of ( qin, qout )
403  # for sending and receiving
404  # if nodeType is "Writer", it will be a list of qPairs,
405  # as there's one queue-in from each Worker
406  #
407  # params is a tuple of (nWorkers, config, log)
408 
409  self.nodeType = nodeType
410  current_process().name = nodeType
411 
412  # Synchronisation Event() objects for keeping track of the system
413  self.initEvent, eventLoopSyncer, self.finalEvent = events
414  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
415 
416  # necessary for knowledge of the system
417  self.nWorkers, self.sEvent, self.config, self.log = params
418  self.subworkers = subworkers
419  self.nodeID = nodeID
420 
421  # describe the state of the node by the current Event Number
422  self.currentEvent = None
423 
424  # Unpack the Queues : (events, histos, filerecords)
425  self.queues = queues
426  self.num = 0
427 
428  ks = self.config.keys()
429  self.app = None
430  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
431  for k in list:
432  if k in ks: self.app = k
433 
434  def Start( self ) :
435  # define the separate process
436  qPair, histq, fq = self.queues
437 
438  # Set up the Queue Mechanisms ( Event Communicators )
439  if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
440  # Reader or Worker Node
441  qin, qout = qPair
442  self.evcom = EventCommunicator( self, qin, qout )
443  else :
444  # Writer : many queues in, no queue out
445  assert self.nodeType == 'Writer'
446  self.evcoms = []
447  qsin = qPair[0]
448  for q in qsin :
449  ec = EventCommunicator( self, q, None )
450  self.evcoms.append( ec )
451  # Histogram Queue
452  self.hq = histq
453  # FileRecords Queue
454  self.fq = fq
455 
456  # Universal Counters (available to all nodes)
457  # Use sensibly!!!
458  self.nIn = 0
459  self.nOut = 0
460 
461  # Status Flag (possibly remove later)
462  self.stat = SUCCESS
463 
464  # Set logger name
465  self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
466 
467  # Heuristic variables
468  # time for init, run, final, firstEventTime, totalTime
469  self.iTime = 0.0
470  self.rTime = 0.0
471  self.fTime = 0.0
472  self.firstEvTime = 0.0
473  self.tTime = 0.0
474 
475  self.proc = Process( target=self.Engine )
476  # Fork and start the separate process
477  self.proc.start()
478 
479 
480 
481 
482  def Engine( self ) :
483  # This will be the method carried out by the Node
484  # Different for all
485  pass
486 
487  def processConfiguration( self ) :
488  # Different for all ; customize Configuration for multicore
489  pass
490 
491  def SetupGaudiPython( self ) :
492  # This method will initialize the GaudiPython Tools
493  # such as the AppMgr and so on
494  self.a = AppMgr()
495  if SMAPS :
496  from AlgSmapShot import SmapShot
497  smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
498  ss = SmapShot( logname=smapsLog )
499  self.a.addAlgorithm( ss )
500  self.evt = self.a.evtsvc()
501  self.hvt = self.a.histsvc()
502  self.fsr = self.a.filerecordsvc()
503  self.inc = self.a.service('IncidentSvc','IIncidentSvc')
504  self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
505  self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
506  self.TS = TESSerializer( self.ts, self.evt,
507  self.nodeType, self.nodeID, self.log )
508  return SUCCESS
509 
510  def StartGaudiPython( self ) :
511  self.a.initialize()
512  self.a.start()
513  return SUCCESS
514 
515  def LoadTES( self, tbufferfile ) :
516  root = gbl.DataObject()
517  setOwnership(root, False)
518  self.evt.setRoot( '/Event', root )
519  self.ts.loadBuffer(tbufferfile)
520 
521  def getEventNumber( self ) :
522  if self.app != 'Gauss':
523  # Using getList or getHistoNames can result in the EventSelector
524  # re-initialising connection to RootDBase, which costs a lot of
525  # time... try to build a set of Header paths??
526 
527  # First Attempt : Unpacked Event Data
528  lst = [ '/Event/Gen/Header',
529  '/Event/Rec/Header' ]
530  for l in lst :
531  path = l
532  try :
533  n = self.evt[path].evtNumber()
534 
535  return n
536  except :
537  # No evt number at this path
538  continue
539 
540  # second attepmt : try DAQ/RawEvent data
541  # The Evt Number is in bank type 16, bank 0, data pt 4
542  try :
543  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
544 
545  return n
546  except :
547  pass
548 
549  # Default Action
550  if self.nIn > 0 or self.nOut > 0 :
551  pass
552  else :
553  self.log.warning('Could not determine Event Number')
554  return -1
555  else:
556  if self.nodeID == -1:
557  self.num = self.num + 1
558 
559  return self.num
560 
561  def IdentifyWriters( self ) :
562  #
563  # Identify Writers in the Configuration
564  #
565  d = {}
566  keys = [ "events", "records", "tuples", "histos" ]
567  for k in keys :
568  d[k] = []
569 
570  # Identify Writers and Classify
571  wkeys = WRITERTYPES.keys()
572  for v in self.config.values() :
573  if v.__class__.__name__ in wkeys :
574  writerType = WRITERTYPES[ v.__class__.__name__ ]
575  d[writerType].append( MiniWriter(v, writerType, self.config) )
576  if self.nodeID == 0 :
577  self.log.info('Writer Found : %s'%(v.name()))
578 
579  # Now Check for the Histogram Service
580  if 'HistogramPersistencySvc' in self.config.keys() :
581  hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
582  d[ "histos" ].append( hfile )
583  return d
584 
585  def dumpHistograms( self ) :
586  '''
587  Method used by the GaudiPython algorithm CollectHistos
588  to obtain a dictionary of form { path : object }
589  representing the Histogram Store
590  '''
591  nlist = self.hvt.getHistoNames( )
592  histDict = {}
593  objects = 0 ; histos = 0
594  if nlist :
595  for n in nlist :
596  o = self.hvt[ n ]
597  if type(o) in aidatypes :
598  o = aida2root(o)
599  histos += 1
600  else :
601  objects += 1
602  histDict[ n ] = o
603  else :
604  print 'WARNING : no histograms to recover?'
605  return histDict
606 
607  def Initialize( self ) :
608  start = time.time()
609  self.processConfiguration( )
610  self.SetupGaudiPython( )
611  self.initEvent.set()
612  self.StartGaudiPython( )
613 
614  if self.app == 'Gauss':
615 
616  tool = self.a.tool( "ToolSvc.EvtCounter" )
617  self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
618  else:
619  self.cntr = None
620 
621  self.iTime = time.time() - start
622 
623  def Finalize( self ) :
624  start = time.time()
625  self.a.stop()
626  self.a.finalize()
627  self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
628  self.finalEvent.set()
629  self.fTime = time.time() - start
630 
631  def Report( self ) :
632  self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
633  allTime = "Alive Time : %5.2f"%(self.tTime)
634  initTime = "Init Time : %5.2f"%(self.iTime)
635  frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
636  runTime = "Run Time : %5.2f"%(self.rTime)
637  finTime = "Finalise Time : %5.2f"%(self.fTime)
638  tup = ( allTime, initTime, frstTime, runTime, finTime )
639  for t in tup :
640  self.log.info( t )
641  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
642  # and report from the TESSerializer
643  self.TS.Report()
644 
645 # =============================================================================
646 
648  def __init__( self, queues, events, params, subworkers ) :
649  GMPComponent.__init__(self, 'Reader', -1, queues, events, params, subworkers )
650 
651  def processConfiguration( self ) :
652  # Reader :
653  # No algorithms
654  # No output
655  # No histos
656  self.config[ 'ApplicationMgr' ].TopAlg = []
657  self.config[ 'ApplicationMgr' ].OutStream = []
658  if "HistogramPersistencySvc" in self.config.keys() :
659  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
660  self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
661  self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
662 
663  def DumpEvent( self ) :
664  tb = TBufferFile( TBuffer.kWrite )
665  # print '----Reader dumping Buffer!!!'
666  self.ts.dumpBuffer( tb )
667  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
668  return tb
669 
670  def DoFirstEvent( self ) :
671  # Do First Event ------------------------------------------------------
672  # Check Termination Criteria
673  startFirst = time.time()
674  self.log.info('Reader : First Event')
675  if self.nOut == self.evtMax :
676  self.log.info('evtMax( %i ) reached'%(self.evtMax))
677  self.lastEvent.set()
678  return SUCCESS
679  else :
680  # Continue to read, dump and send event
681  self.a.run(1)
682  if not bool(self.evt['/Event']) :
683  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
684  self.lastEvent.set()
685  return SUCCESS
686  else :
687  # Popluate TESSerializer list and send Event
688  if self.app == "Gauss":
689  lst = self.evt.getHistoNames()
690  else:
691  try :
692  lst = self.evt.getList()
693  if self.app == "DaVinci":
694  daqnode = self.evt.retrieveObject( '/Event/DAQ' ).registry()
695  setOwnership( daqnode, False )
696  self.evt.getList( daqnode, lst, daqnode.address().par() )
697  except :
698  self.log.critical('Reader could not acquire TES List!')
699  self.lastEvent.set()
700  return FAILURE
701  self.log.info('Reader : TES List : %i items'%(len(lst)))
702  for l in lst :
703  self.ts.addItem(l)
705  tb = self.TS.Dump( )
706  self.log.info('First Event Sent')
707  self.evcom.send( (self.currentEvent, tb) )
708  self.nOut += 1
709  self.eventLoopSyncer.set()
710  self.evt.clearStore( )
711  self.firstEvTime = time.time()-startFirst
712  return SUCCESS
713 
714  def Engine( self ) :
715  # rename process
716  import os
717  import ctypes
718  libc = ctypes.CDLL('libc.so.6')
719  name = str(self.nodeType) + str(self.nodeID) + '\0'
720  libc.prctl(15,name,0,0,0)
721 
722 
723  startEngine = time.time()
724  self.log.name = 'Reader'
725  self.log.info('Reader Process starting')
726 
727  self.Initialize()
728 
729  # add the Histogram Collection Algorithm
730  self.a.addAlgorithm( CollectHistograms(self) )
731 
732  self.log.info('Reader Beginning Distribution')
733  sc = self.DoFirstEvent( )
734  if sc.isSuccess() :
735  self.log.info('Reader First Event OK')
736  else :
737  self.log.critical('Reader Failed on First Event')
738  self.stat = FAILURE
739 
740  # Do All Others -------------------------------------------------------
741  while True :
742  # Check Termination Criteria
743  if self.nOut == self.evtMax :
744  self.log.info('evtMax( %i ) reached'%(self.evtMax))
745  break
746  # Check Health
747  if not self.stat.isSuccess() :
748  self.log.critical( 'Reader is Damaged!' )
749  break
750  # Continue to read, dump and send event
751  t = time.time()
752  self.a.run(1)
753  self.rTime += (time.time()-t)
754  if not bool(self.evt['/Event']) :
755  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
756  break
757  self.currentEvent = self.getEventNumber( )
758  tb = self.TS.Dump( )
759  self.evcom.send( (self.currentEvent, tb) )
760  # clean up
761  self.nOut += 1
762  self.eventLoopSyncer.set()
763  self.evt.clearStore( )
764  self.log.info('Setting <Last> Event')
765  self.lastEvent.set()
766 
767  # Finalize
768  self.log.info( 'Reader : Event Distribution complete.' )
769  self.evcom.finalize()
770  self.Finalize()
771  self.tTime = time.time() - startEngine
772  self.Report()
773 
774 # =============================================================================
776  def __init__( self, workerID, queues, events, params, subworkers ) :
777  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
778  # Identify the writer streams
780  # Identify the accept/veto checks for each event
781  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
782  self.log.info("Subworker-%i Created OK"%(self.nodeID))
783  self.eventOutput = True
784 
785  def Engine( self ) :
786  # rename process
787  import os
788  import ctypes
789  libc = ctypes.CDLL('libc.so.6')
790  name = str(self.nodeType) + str(self.nodeID) + '\0'
791  libc.prctl(15,name,0,0,0)
792 
793  self.initEvent.set()
794  startEngine = time.time()
795  msg = self.a.service('MessageSvc')
796  msg.Format = '[' + self.log.name + '] % F%18W%S%7W%R%T %0W%M'
797 
798  self.log.name = "Worker-%i"%(self.nodeID)
799  self.log.info("Subworker %i starting Engine"%(self.nodeID))
801 
802  # populate the TESSerializer itemlist
803  self.log.info('EVT WRITERS ON WORKER : %i'\
804  %( len(self.writerDict['events'])))
805 
806  nEventWriters = len( self.writerDict[ "events" ] )
807  self.a.addAlgorithm( CollectHistograms(self) )
808 
809  # Begin processing
810  Go = True
811  while Go :
812  packet = self.evcom.receive( )
813  if packet : pass
814  else : continue
815  if packet == 'FINISHED' : break
816  evtNumber, tbin = packet # unpack
817  if self.cntr != None:
818 
819  self.cntr.setEventCounter( evtNumber )
820 
821  self.nIn += 1
822  self.TS.Load( tbin )
823 
824  t = time.time()
825  sc = self.a.executeEvent()
826  if self.nIn == 1 :
827  self.firstEvTime = time.time()-t
828  else :
829  self.rTime += (time.time()-t)
830  if sc.isSuccess() :
831  pass
832  else :
833  self.log.name = "Worker-%i"%(self.nodeID)
834  self.log.warning('Did not Execute Event')
835  self.evt.clearStore()
836  continue
837  if self.isEventPassed() :
838  pass
839  else :
840  self.log.name = "Worker-%i"%(self.nodeID)
841  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
842  self.evt.clearStore()
843  continue
844  if self.eventOutput :
845  # It may be the case of generating Event Tags; hence
846  # no event output
848  tb = self.TS.Dump( )
849  self.evcom.send( (self.currentEvent, tb) )
850  self.nOut += 1
851  self.inc.fireIncident(gbl.Incident('Subworker','EndEvent'))
852  self.eventLoopSyncer.set()
853  self.evt.clearStore( )
854  self.log.name = "Worker-%i"%(self.nodeID)
855  self.log.info('Setting <Last> Event %s' %(self.nodeID))
856  self.lastEvent.set()
857 
858  self.evcom.finalize()
859  # Now send the FileRecords and stop/finalize the appMgr
860  self.filerecordsAgent.SendFileRecords()
861  self.tTime = time.time()-startEngine
862  self.Finalize()
863  self.Report()
864  #self.finalEvent.set()
865 
866  def SetServices(self,a, evt, hvt, fsr, inc, pers, ts , cntr):
867  self.a = a
868  self.evt = evt
869  self.hvt = hvt
870  self.fsr = fsr
871  #self.inc = inc
872  self.inc = self.a.service('IncidentSvc','IIncidentSvc')
873  self.pers = pers
874  self.ts = ts
875  self.cntr = cntr
876  self.TS = TESSerializer( self.ts, self.evt,
877  self.nodeType, self.nodeID, self.log )
878 
879 
880  def getCheckAlgs( self ) :
881  '''
882  For some output writers, a check is performed to see if the event has
883  executed certain algorithms.
884  These reside in the AcceptAlgs property for those writers
885  '''
886  acc = []
887  req = []
888  vet = []
889  for m in self.writerDict[ "events" ] :
890  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
891  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
892  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
893  return (acc, req, vet)
894 
895 
896  def checkExecutedPassed( self, algName ) :
897  if self.a.algorithm( algName )._ialg.isExecuted()\
898  and self.a.algorithm( algName )._ialg.filterPassed() :
899  return True
900  else :
901  return False
902 
903  def isEventPassed( self ) :
904  '''
905  Check the algorithm status for an event.
906  Depending on output writer settings, the event
907  may be declined based on various criteria.
908  This is a transcript of the check that occurs in GaudiSvc::OutputStream
909  '''
910  passed = False
911 
912  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
913  if self.acceptAlgs :
914  for name in self.acceptAlgs :
915  if self.checkExecutedPassed( name ) :
916  passed = True
917  break
918  else :
919  passed = True
920 
921  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
922  for name in self.requireAlgs :
923  if self.checkExecutedPassed( name ) :
924  pass
925  else :
926  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
927  passed = False
928 
929  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
930  for name in self.vetoAlgs :
931  if self.checkExecutedPassed( name ) :
932  pass
933  else :
934  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
935  passed = False
936  return passed
937 
938 # =============================================================================
940  def __init__( self, workerID, queues, events, params , subworkers ) :
941  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
942  # Identify the writer streams
944  # Identify the accept/veto checks for each event
945  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
946  self.log.name = "Worker-%i"%(self.nodeID)
947  self.log.info("Worker-%i Created OK"%(self.nodeID))
948  self.eventOutput = True
949 
950  def processConfiguration( self ) :
951 
952  # Worker :
953  # No input
954  # No output
955  # No Histos
956  self.config[ 'EventSelector' ].Input = []
957  self.config[ 'ApplicationMgr' ].OutStream = []
958  if "HistogramPersistencySvc" in self.config.keys() :
959  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
960  formatHead = '[Worker-%i] '%(self.nodeID)
961  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
962 
963  for key, lst in self.writerDict.iteritems() :
964  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
965 
966  for m in self.writerDict[ "tuples" ] :
967  # rename Tuple output file with an appendix
968  # based on worker id, for merging later
969  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
970  self.config[ m.key ].Output = newName
971 
972  # Suppress INFO Output for all but Worker-0
973  #if self.nodeID == 0 :
974  # pass
975  #else :
976  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
977 
978  if self.app == "Gauss":
979  try:
980  if "ToolSvc.EvtCounter" not in self.config:
981  from Configurables import EvtCounter
982  counter = EvtCounter()
983  else:
984  counter = self.config["ToolSvc.EvtCounter"]
985  counter.UseIncident = False
986  except:
987  # ignore errors when trying to change the configuration of the EvtCounter
988  self.log.warning('Cannot configure EvtCounter')
989 
990  def Engine( self ) :
991 
992  # rename process
993  import os
994  import ctypes
995  libc = ctypes.CDLL('libc.so.6')
996  name = str(self.nodeType) + str(self.nodeID) + '\0'
997  libc.prctl(15,name,0,0,0)
998 
999  startEngine = time.time()
1000  self.log.info("Worker %i starting Engine"%(self.nodeID))
1001  self.Initialize()
1003 
1004  # populate the TESSerializer itemlist
1005  self.log.info('EVT WRITERS ON WORKER : %i'\
1006  %( len(self.writerDict['events'])))
1007 
1008  nEventWriters = len( self.writerDict[ "events" ] )
1009  if nEventWriters :
1010  itemList = set()
1011  optItemList = set()
1012  for m in self.writerDict[ "events" ] :
1013  for item in m.ItemList :
1014  hsh = item.find( '#' )
1015  if hsh != -1:
1016  item = item[:hsh]
1017  itemList.add( item )
1018  for item in m.OptItemList :
1019  hsh = item.find( '#' )
1020  if hsh != -1:
1021  item = item[:hsh]
1022  optItemList.add( item )
1023  # If an item is mandatory and optional, keep it only in the optional list
1024  itemList -= optItemList
1025  for item in sorted( itemList ):
1026  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
1027  self.ts.addItem( item )
1028  for item in sorted( optItemList ):
1029  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
1030  self.ts.addOptItem( item )
1031  else :
1032  self.log.info( 'There is no Event Output for this app' )
1033  self.eventOutput = False
1034 
1035  # Begin processing
1036  Go = True
1037  while Go :
1038  packet = self.evcom.receive( )
1039  if packet : pass
1040  else : continue
1041  if packet == 'FINISHED' : break
1042  evtNumber, tbin = packet # unpack
1043  if self.cntr != None:
1044  self.cntr.setEventCounter( evtNumber )
1045 
1046  # subworkers are forked before the first event is processed
1047  # reader-thread for ConDB must be closed and reopened in each subworker
1048  # this is done by disconnect()
1049  if self.nIn == 0:
1050 
1051  self.log.info("Fork new subworkers and disconnect from CondDB")
1052  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1053  condDB.disconnect()
1054 
1055  # Fork subworkers and share services
1056  for k in self.subworkers:
1057  k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1058  k.Start()
1059  self.a.addAlgorithm( CollectHistograms(self) )
1060  self.nIn += 1
1061  self.TS.Load( tbin )
1062 
1063  t = time.time()
1064  sc = self.a.executeEvent()
1065  if self.nIn == 1 :
1066  self.firstEvTime = time.time()-t
1067  else :
1068  self.rTime += (time.time()-t)
1069  if sc.isSuccess() :
1070  pass
1071  else :
1072  self.log.warning('Did not Execute Event')
1073  self.evt.clearStore()
1074  continue
1075  if self.isEventPassed() :
1076  pass
1077  else :
1078  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
1079  self.evt.clearStore()
1080  continue
1081  if self.eventOutput :
1082  # It may be the case of generating Event Tags; hence
1083  # no event output
1085  tb = self.TS.Dump( )
1086  self.evcom.send( (self.currentEvent, tb) )
1087  self.nOut += 1
1088  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
1089  self.eventLoopSyncer.set()
1090  self.evt.clearStore( )
1091  self.log.info('Setting <Last> Event')
1092  self.lastEvent.set()
1093 
1094  self.evcom.finalize()
1095  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
1096  # Now send the FileRecords and stop/finalize the appMgr
1097  self.filerecordsAgent.SendFileRecords()
1098  self.Finalize()
1099  self.tTime = time.time()-startEngine
1100  self.Report()
1101 
1102  for k in self.subworkers:
1103  self.log.info('Join subworkers')
1104  k.proc.join()
1105 
1106  def getCheckAlgs( self ) :
1107  '''
1108  For some output writers, a check is performed to see if the event has
1109  executed certain algorithms.
1110  These reside in the AcceptAlgs property for those writers
1111  '''
1112  acc = []
1113  req = []
1114  vet = []
1115  for m in self.writerDict[ "events" ] :
1116  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
1117  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
1118  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
1119  return (acc, req, vet)
1120 
1121 
1122  def checkExecutedPassed( self, algName ) :
1123  if self.a.algorithm( algName )._ialg.isExecuted()\
1124  and self.a.algorithm( algName )._ialg.filterPassed() :
1125  return True
1126  else :
1127  return False
1128 
1129  def isEventPassed( self ) :
1130  '''
1131  Check the algorithm status for an event.
1132  Depending on output writer settings, the event
1133  may be declined based on various criteria.
1134  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1135  '''
1136  passed = False
1137 
1138  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1139  if self.acceptAlgs :
1140  for name in self.acceptAlgs :
1141  if self.checkExecutedPassed( name ) :
1142  passed = True
1143  break
1144  else :
1145  passed = True
1146 
1147  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
1148  for name in self.requireAlgs :
1149  if self.checkExecutedPassed( name ) :
1150  pass
1151  else :
1152  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
1153  passed = False
1154 
1155  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1156  for name in self.vetoAlgs :
1157  if self.checkExecutedPassed( name ) :
1158  pass
1159  else :
1160  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
1161  passed = False
1162  return passed
1163 
1164 # =============================================================================
1165 
1167  def __init__( self, queues, events, params, subworkers ) :
1168  GMPComponent.__init__(self,'Writer', -2, queues, events, params, subworkers )
1169  # Identify the writer streams
1171  # This keeps track of workers as they finish
1172  self.status = [False]*self.nWorkers
1173  self.log.name = "Writer--2"
1174 
1175  def processConfiguration( self ) :
1176  # Writer :
1177  # No input
1178  # No Algs
1179  self.config[ 'ApplicationMgr' ].TopAlg = []
1180  self.config[ 'EventSelector' ].Input = []
1181 
1182  self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
1183 
1184  # Now process the output writers
1185  for key, lst in self.writerDict.iteritems() :
1186  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
1187 
1188  # Modify the name of the output file to reflect that it came
1189  # from a parallel processing
1190  #
1191  # Event Writers
1192  for m in self.writerDict[ "events" ] :
1193  self.log.debug( 'Processing Event Writer : %s'%(m) )
1194  newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
1195  self.config[ m.key ].Output = newName
1196 
1197  # Now, if there are no event writers, the FileRecords file
1198  # will fail to open, as it only opens an UPDATE version
1199  # of the existing Event Output File
1200  # So, if there are no event writers, edit the string of the
1201  # FileRecord Writer
1202 
1203  # FileRecords Writers
1204  for m in self.writerDict[ "records" ] :
1205  self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
1206  newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
1207  extra=" OPT='RECREATE'" )
1208  self.config[ m.key ].Output = newName
1209 
1210  # same for histos
1211  hs = "HistogramPersistencySvc"
1212  n = None
1213  if hs in self.config.keys() :
1214  n = self.config[ hs ].OutputFile
1215  if n :
1216  newName=self.config[hs].OutputFile.replace('.',\
1217  '.p%i.'%(self.nWorkers))
1218  self.config[ hs ].OutputFile = newName
1219 
1220  def Engine( self ) :
1221  # rename process
1222  import os
1223  import ctypes
1224  libc = ctypes.CDLL('libc.so.6')
1225  name = str(self.nodeType) + str(self.nodeID) + '\0'
1226  libc.prctl(15,name,0,0,0)
1227 
1228  startEngine = time.time()
1229  self.Initialize()
1230  self.histoAgent = HistoAgent( self )
1232 
1233  # Begin processing
1234  Go = True
1235  current = -1
1236  stopCriteria = self.nWorkers
1237  while Go :
1238  current = (current+1)%self.nWorkers
1239  packet = self.evcoms[current].receive( timeout=0.01 )
1240  if packet == None :
1241  continue
1242  if packet == 'FINISHED' :
1243  self.log.info('Writer got FINISHED flag : Worker %i'%(current))
1244 
1245  self.status[current] = True
1246  if all(self.status) :
1247  self.log.info('FINISHED recd from all workers, break loop')
1248  break
1249  continue
1250  # otherwise, continue as normal
1251  self.nIn += 1 # update central count (maybe needed by FSR store)
1252  evtNumber, tbin = packet # unpack
1253  self.TS.Load( tbin )
1254  t = time.time()
1255  self.a.executeEvent()
1256  self.rTime += ( time.time()-t )
1258  self.evt.clearStore( )
1259  self.eventLoopSyncer.set()
1260  self.log.name = "Writer--2"
1261  self.log.info('Setting <Last> Event')
1262  self.lastEvent.set()
1263 
1264  # finalisation steps
1265  [ e.finalize() for e in self.evcoms ]
1266  # Now do Histograms
1267  sc = self.histoAgent.Receive()
1268  sc = self.histoAgent.RebuildHistoStore()
1269  if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
1270  else : self.log.warning( 'Histo Store Error in Rebuild' )
1271 
1272  # Now do FileRecords
1273  sc = self.filerecordsAgent.Receive()
1274  self.filerecordsAgent.Rebuild()
1275  self.Finalize()
1276  #self.rTime = time.time()-startEngine
1277  self.Report()
1278 
1279 # =============================================================================
1280 
1281 
1282 
1283 # =============================================================================
1284 
1285 class Coord( object ) :
1286  def __init__( self, nWorkers, config, log ) :
1287 
1288  self.log = log
1289  self.config = config
1290  # set up Logging
1291  self.log.name = 'GaudiPython-Parallel-Logger'
1292  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1293 
1294  if nWorkers == -1 :
1295  # The user has requested all available cpus in the machine
1296  self.nWorkers = cpu_count()
1297  else :
1298  self.nWorkers = nWorkers
1299 
1300 
1301  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1302  self.hq = JoinableQueue( ) # for Histogram data
1303  self.fq = JoinableQueue( ) # for FileRecords data
1304 
1305  # Make a Syncer for Initalise, Run, and Finalise
1306  self.sInit = Syncer( self.nWorkers, self.log,
1307  limit=WAIT_INITIALISE,
1308  step=STEP_INITIALISE )
1309  self.sRun = Syncer( self.nWorkers, self.log,
1310  manyEvents=True,
1311  limit=WAIT_SINGLE_EVENT,
1312  step=STEP_EVENT,
1313  firstEvent=WAIT_FIRST_EVENT )
1314  self.sFin = Syncer( self.nWorkers, self.log,
1315  limit=WAIT_FINALISE,
1316  step=STEP_FINALISE )
1317  # and one final one for Histogram Transfer
1318  self.histSyncEvent = Event()
1319 
1320  # params are common to al subprocesses
1321  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1322 
1323  self.subworkers = []
1324  # Declare SubProcesses!
1325  for i in range(1, self.nWorkers ) :
1326  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1327  self.subworkers.append( sub )
1328  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1329  self.workers = []
1330  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1331  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1332 
1333  self.system = []
1334  self.system.append(self.writer)
1335  self.system.append(wk)
1336  self.system.append(self.reader)
1337 
1338  def getSyncEvents( self, nodeID ) :
1339  init = self.sInit.d[nodeID].event
1340  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1341  fin = self.sFin.d[nodeID].event
1342  return ( init, run, fin )
1343 
1344  def getQueues( self, nodeID ) :
1345  eventQ = self.qs[ nodeID ]
1346  histQ = self.hq
1347  fsrQ = self.fq
1348  return ( eventQ, histQ, fsrQ )
1349 
1350  def Go( self ) :
1351 
1352  # Initialise
1353  self.log.name = 'GaudiPython-Parallel-Logger'
1354  self.log.info( 'INITIALISING SYSTEM' )
1355 
1356  # Start reader, writer and main worker
1357  for p in self.system :
1358  p.Start()
1359 
1360  sc = self.sInit.syncAll(step="Initialise")
1361  if sc == SUCCESS: pass
1362  else : self.Terminate() ; return FAILURE
1363 
1364  # Run
1365  self.log.name = 'GaudiPython-Parallel-Logger'
1366  self.log.info( 'RUNNING SYSTEM' )
1367  sc = self.sRun.syncAll(step="Run")
1368  if sc == SUCCESS: pass
1369  else : self.Terminate() ; return FAILURE
1370 
1371  # Finalise
1372  self.log.name = 'GaudiPython-Parallel-Logger'
1373  self.log.info( 'FINALISING SYSTEM' )
1374  sc = self.sFin.syncAll(step="Finalise")
1375  if sc == SUCCESS: pass
1376  else : self.Terminate() ; return FAILURE
1377 
1378  # if we've got this far, finally report SUCCESS
1379  self.log.info( "Cleanly join all Processes" )
1380  self.Stop()
1381  self.log.info( "Report Total Success to Main.py" )
1382  return SUCCESS
1383 
1384  def Terminate( self ) :
1385  # Brutally kill sub-processes
1386  children = multiprocessing.active_children()
1387  for i in children:
1388  i.terminate()
1389 
1390  #self.writer.proc.terminate()
1391  #[ w.proc.terminate() for w in self.workers]
1392  #self.reader.proc.terminate()
1393 
1394  def Stop( self ) :
1395  # procs should be joined in reverse order to launch
1396  self.system.reverse()
1397  for s in self.system :
1398  s.proc.join()
1399  return SUCCESS
1400 
1401  def SetupQueues( self ) :
1402  # This method will set up the network of Queues needed
1403  # N Queues = nWorkers + 1
1404  # Each Worker has a Queue in, and a Queue out
1405  # Reader has Queue out only
1406  # Writer has nWorkers Queues in
1407 
1408  # one queue from Reader-Workers
1409  rwk = JoinableQueue()
1410  # one queue from each worker to writer
1411  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1412  d = {}
1413  d[-1] = (None, rwk) # Reader
1414  d[-2] = (workersWriter, None) # Writer
1415  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1416  return d
1417 
1418 # ============================= EOF ===========================================
def Load(self, tbuf)
Definition: GMPBase.py:328
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
Definition: GMPBase.py:399
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:648
def getNewName(self, replaceThis, withThis, extra='')
Definition: GMPBase.py:120
StatusCode finalize() override
Definition: Algorithm.cpp:97
StatusCode execute() override
Definition: Algorithm.cpp:91
def SetupQueues(self)
Definition: GMPBase.py:1401
def receive(self, timeout=None)
Definition: GMPBase.py:266
def __init__(self, GMPComponent, qin, qout)
Definition: GMPBase.py:231
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
Definition: GMPBase.py:191
def DumpEvent(self)
Definition: GMPBase.py:663
def DoFirstEvent(self)
Definition: GMPBase.py:670
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:776
def Terminate(self)
Definition: GMPBase.py:1384
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:896
def processConfiguration(self)
Definition: GMPBase.py:950
def getCheckAlgs(self)
Definition: GMPBase.py:1106
def LoadTES(self, tbufferfile)
Definition: GMPBase.py:515
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:130
def getItemLists(self, config)
Definition: GMPBase.py:141
def __init__(self, queues, events, params, subworkers)
Definition: GMPBase.py:1167
def set(self, key, output)
Definition: GMPBase.py:158
def __init__(self, nWorkers, config, log)
Definition: GMPBase.py:1286
def __init__(self, writer, wType, config)
Definition: GMPBase.py:89
def processConfiguration(self)
Definition: GMPBase.py:487
def getSyncEvents(self, nodeID)
Definition: GMPBase.py:1338
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
Definition: GMPBase.py:866
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:940
Python Algorithm base class.
Definition: Algorithm.h:32
def isEventPassed(self)
Definition: GMPBase.py:1129
def processConfiguration(self)
Definition: GMPBase.py:651
def getQueues(self, nodeID)
Definition: GMPBase.py:1344
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1122
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
Definition: GMPBase.py:315
def processConfiguration(self)
Definition: GMPBase.py:1175