All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from Configurables import EvtCounter
3 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
4 from ROOT import TBufferFile, TBuffer
5 import multiprocessing
6 from multiprocessing import Process, Queue, JoinableQueue, Event
7 from multiprocessing import cpu_count, current_process
8 from multiprocessing.queues import Empty
9 from pTools import *
10 import time, sys, os
11 from ROOT import TParallelMergingFile
12 # This script contains the bases for the Gaudi MultiProcessing (GMP)
13 # classes
14 
15 # There are three classes :
16 # Reader
17 # Worker
18 # Writer
19 
20 # Each class needs to perform communication with the others
21 # For this, we need a means of communication, which will be based on
22 # the python multiprocessing package
23 # This is provided in SPI pytools package
24 # cmt line : use pytools v1.1 LCG_Interfaces
25 # The PYTHONPATH env variable may need to be modified, as this might
26 # still point to 1.0_python2.5
27 
28 # Each class will need Queues, and a defined method for using these
29 # queues.
30 # For example, as long as there is something in the Queue, both ends
31 # of the queue must be open
32 # Also, there needs to be proper Termination flags and criteria
33 # The System should be error proof.
34 
35 
36 # Constants -------------------------------------------------------------------
37 NAP = 0.001
38 MB = 1024.0*1024.0
39 # waits to guard against hanging, in seconds
40 WAIT_INITIALISE = 60*5
41 WAIT_FIRST_EVENT = 60*3
42 WAIT_SINGLE_EVENT = 60*6
43 WAIT_FINALISE = 60*2
44 STEP_INITIALISE = 10
45 STEP_EVENT = 2
46 STEP_FINALISE = 10
47 
48 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
49 SMAPS = False
50 
51 # -----------------------------------------------------------------------------
52 
53 # definitions
54 # ----------
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # used to check which type of histo we are dealing with
59 # i.e. if currentHisto in aidatypes : pass
60 aidatypes = ( gbl.AIDA.IHistogram,
61  gbl.AIDA.IHistogram1D,
62  gbl.AIDA.IHistogram2D,
63  gbl.AIDA.IHistogram3D,
64  gbl.AIDA.IProfile1D,
65  gbl.AIDA.IProfile2D,
66  gbl.AIDA.IBaseHistogram ) # extra?
67 
68 # similar to aidatypes
69 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
70 
71 # Types of OutputStream in Gaudi
72 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
73  'InputCopyStream' : "events",
74  'OutputStream' : "events",
75  'RecordStream' : "records",
76  'RunRecordStream' : "records",
77  'SequentialOutputStream' : "events",
78  'TagCollectionStream' : "tuples" }
79 
80 # =============================================================================
81 
82 class MiniWriter( object ) :
83  '''
84  A class to represent a writer in the GaudiPython configuration
85  It can be non-trivial to access the name of the output file; it may be
86  specified in the DataSvc, or just on the writer, may be a list, or string
87  Also, there are three different types of writer (events, records, tuples)
88  so this bootstrap class provides easy access to this info while configuring
89  '''
90  def __init__( self, writer, wType, config ) :
91  self.w = writer
92  self.wType = wType
93  # set parameters for directly accessing the correct
94  # part of the configuration, so that later, we can do
95  # config[ key ].Output = modified(output)
96  self.key = None
97  self.output = None
98  self.ItemList = None
99  self.OptItemList = None
100  #
101  self.wName = writer.getName()
102  # Now process the Writer, find where the output is named
103  self.woutput = None
104  self.datasvcName = None
105  self.svcOutput = None
106  if hasattr( self.w, "Output" ) :
107  self.woutput = self.w.Output
108  self.getItemLists( config )
109  self.set( self.wName, self.w.Output )
110  return
111  else :
112  # if there is no output file, get it via the datasvc
113  # (every writer always has a datasvc property)
114  self.datasvcName = self.w.EvtDataSvc
115  datasvc = config[ self.datasvcName ]
116  if hasattr( datasvc, "Output" ) :
117  self.getItemLists( config )
118  self.set( self.datasvcName, datasvc.Output )
119  return
120 
121  def getNewName( self, replaceThis, withThis, extra='' ) :
122  # replace one pattern in the output name string
123  # with another, and return the Output name
124  # It *might* be in a list, so check for this
125  #
126  # @param extra : might need to add ROOT flags
127  # i.e.: OPT='RECREATE', or such
128  assert replaceThis.__class__.__name__ == 'str'
129  assert withThis.__class__.__name__ == 'str'
130  old = self.output
131  lst = False
132  if old.__class__.__name__ == 'list' :
133  old = self.output[0]
134  lst = True
135  new = old.replace( replaceThis, withThis )
136  new += extra
137  if lst :
138  return [ new ]
139  else :
140  return new
141 
142  def getItemLists( self, config ) :
143  # the item list
144  if hasattr( self.w, "ItemList" ) :
145  self.ItemList = self.w.ItemList
146  else :
147  datasvc = config[ self.w.EvtDataSvc ]
148  if hasattr( datasvc, "ItemList" ) :
149  self.ItemList = datasvc.ItemList
150  # The option item list; possibly not a valid variable
151  if hasattr( self.w, "OptItemList" ) :
152  self.OptItemList = self.w.OptItemList
153  else :
154  datasvc = config[ self.w.EvtDataSvc ]
155  if hasattr( datasvc, "OptItemList" ) :
156  self.OptItemList = datasvc.OptItemList
157  return
158 
159  def set( self, key, output ) :
160  self.key = key
161  self.output = output
162  return
163 
164  def __repr__( self ) :
165  s = ""
166  line = '-'*80
167  s += (line+'\n')
168  s += "Writer : %s\n"%( self.wName )
169  s += "Writer Type : %s\n"%( self.wType )
170  s += "Writer Output : %s\n"%( self.output )
171  s += "DataSvc : %s\n"%( self.datasvcName )
172  s += "DataSvc Output : %s\n"%( self.svcOutput )
173  s += '\n'
174  s += "Key for config : %s\n"%( self.key )
175  s += "Output File : %s\n"%( self.output )
176  s += "ItemList : %s\n"%( self.ItemList )
177  s += "OptItemList : %s\n"%( self.OptItemList )
178  s += (line+'\n')
179  return s
180 
181 # =============================================================================
182 
184  '''
185  GaudiPython algorithm used to clean up histos on the Reader and Workers
186  Only has a finalize method()
187  This retrieves a dictionary of path:histo objects and sends it to the
188  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
189  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
190  at the writer end, there will be an error.
191  '''
192  def __init__( self, gmpcomponent ) :
193  PyAlgorithm.__init__( self )
194  self._gmpc = gmpcomponent
195  self.log = self._gmpc.log
196  return None
197  def execute( self ) :
198  return SUCCESS
199  def finalize( self ) :
200  self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
201  self._gmpc.hDict = self._gmpc.dumpHistograms( )
202  ks = self._gmpc.hDict.keys()
203  self.log.info('%i Objects in Histogram Store'%(len(ks)))
204 
205  # crashes occurred due to Memory Error during the sending of hundreds
206  # histos on slc5 machines, so instead, break into chunks
207  # send 100 at a time
208  chunk = 100
209  reps = len(ks)/chunk + 1
210  for i in xrange(reps) :
211  someKeys = ks[i*chunk : (i+1)*chunk]
212  smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
213  self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
214  # "finished" Notification
215  self.log.debug('Signalling end of histos to Writer')
216  self._gmpc.hq.put( 'HISTOS_SENT' )
217  self.log.debug( 'Waiting on Sync Event' )
218  self._gmpc.sEvent.wait()
219  self.log.debug( 'Histo Sync Event set, clearing and returning' )
220  self._gmpc.hvt.clearStore()
221  root = gbl.DataObject()
222  setOwnership(root, False)
223  self._gmpc.hvt.setRoot( '/stat', root )
224  return SUCCESS
225 
226 # =============================================================================
227 
228 class EventCommunicator( object ) :
229  # This class is responsible for communicating Gaudi Events via Queues
230  # Events are communicated as TBufferFiles, filled either by the
231  # TESSerializer, or the GaudiSvc, "IPCSvc"
232  def __init__( self, GMPComponent, qin, qout ) :
233  self._gmpc = GMPComponent
234  self.log = self._gmpc.log
235  # maximum capacity of Queues
236  self.maxsize = 50
237  # central variables : Queues
238  self.qin = qin
239  self.qout = qout
240 
241  # flags
242  self.allsent = False
243  self.allrecv = False
244 
245  # statistics storage
246  self.nSent = 0 # counter : items sent
247  self.nRecv = 0 # counter : items received
248  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
249  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
250  self.qinTime = 0 # time : receiving from self.qin
251  self.qoutTime = 0 # time : sending on qout
252 
253  def send( self, item ) :
254  # This class manages the sending of a TBufferFile Event to a Queue
255  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
256  assert item.__class__.__name__ == 'tuple'
257  startTransmission = time.time()
258  self.qout.put( item )
259  # allow the background thread to feed the Queue; not 100% guaranteed to
260  # finish before next line
261  while self.qout._buffer : time.sleep( NAP )
262  self.qoutTime += time.time()-startTransmission
263  self.sizeSent += item[1].Length()
264  self.nSent += 1
265  return SUCCESS
266 
267  def receive( self, timeout=None ) :
268  # Receive items from self.qin
269  startWait = time.time()
270  try :
271  itemIn = self.qin.get( timeout=timeout )
272  except Empty :
273  return None
274  self.qinTime += time.time()-startWait
275  self.nRecv += 1
276  if itemIn.__class__.__name__ == 'tuple' :
277  self.sizeRecv += itemIn[1].Length()
278  else :
279  self.nRecv -= 1
280  try :
281  self.qin.task_done()
282  except :
283  self._gmpc.log.warning('TASK_DONE called too often by : %s'\
284  %(self._gmpc.nodeType))
285  return itemIn
286 
287  def finalize( self ) :
288  self.log.info('Finalize Event Communicator : %s %s'%(self._gmpc, self._gmpc.nodeType))
289  # Reader sends one flag for each worker
290  # Workers send one flag each
291  # Writer sends nothing (it's at the end of the chain)
292  if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
293  elif self._gmpc.nodeType == 'Writer' : downstream = 0
294  elif self._gmpc.nodeType == 'Worker' : downstream = 1
295 
296  for i in xrange(downstream) :
297  self.qout.put( 'FINISHED' )
298  if self._gmpc.nodeType != 'Writer' :
299  self.qout.join()
300  # Now some reporting...
301  self.statistics( )
302 
303  def statistics( self ) :
304  self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
305  self.log.info ( 'Items Sent : %i'%(self.nSent) )
306  self.log.info ( 'Items Received : %i'%(self.nRecv) )
307  self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
308  self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
309  self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
310  self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
311 
312 # =============================================================================
313 
314 class TESSerializer( object ) :
315  def __init__( self, gaudiTESSerializer, evtDataSvc,
316  nodeType, nodeID, log ) :
317  self.T = gaudiTESSerializer
318  self.evt = evtDataSvc
319  self.buffersIn = []
320  self.buffersOut = []
321  self.nIn = 0
322  self.nOut = 0
323  self.tDump = 0.0
324  self.tLoad = 0.0
325  # logging
326  self.nodeType = nodeType
327  self.nodeID = nodeID
328  self.log = log
329  def Load( self, tbuf ) :
330  root = gbl.DataObject()
331  setOwnership( root, False )
332  self.evt.setRoot( '/Event', root )
333  t = time.time()
334  self.T.loadBuffer( tbuf )
335  self.tLoad += (time.time() - t)
336  self.nIn += 1
337  self.buffersIn.append( tbuf.Length() )
338  def Dump( self ) :
339  t = time.time()
340  tb = TBufferFile( TBuffer.kWrite )
341  self.T.dumpBuffer(tb)
342  self.tDump += ( time.time()-t )
343  self.nOut += 1
344  self.buffersOut.append( tb.Length() )
345  return tb
346  def Report( self ) :
347  evIn = "Events Loaded : %i"%( self.nIn )
348  evOut = "Events Dumped : %i"%( self.nOut )
349  din = sum( self.buffersIn )
350  dataIn = "Data Loaded : %i"%(din)
351  dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
352  if self.nIn :
353  avgIn = "Avg Buf Loaded : %5.2f Mb"\
354  %( din/(self.nIn*MB) )
355  maxIn = "Max Buf Loaded : %5.2f Mb"\
356  %( max(self.buffersIn)/MB )
357  else :
358  avgIn = "Avg Buf Loaded : N/A"
359  maxIn = "Max Buf Loaded : N/A"
360  dout = sum( self.buffersOut )
361  dataOut = "Data Dumped : %i"%(dout)
362  dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
363  if self.nOut :
364  avgOut = "Avg Buf Dumped : %5.2f Mb"\
365  %( din/(self.nOut*MB) )
366  maxOut = "Max Buf Dumped : %5.2f Mb"\
367  %( max(self.buffersOut)/MB )
368  else :
369  avgOut = "Avg Buf Dumped : N/A"
370  maxOut = "Max Buf Dumped : N/A"
371  dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
372  loadTime = "Total Load Time : %5.2f"%( self.tLoad )
373 
374  lines = evIn ,\
375  evOut ,\
376  dataIn ,\
377  dataInMb ,\
378  avgIn ,\
379  maxIn ,\
380  dataOut ,\
381  dataOutMb,\
382  avgOut ,\
383  maxOut ,\
384  dumpTime ,\
385  loadTime
386  self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
387  for line in lines :
388  self.log.info( line )
389  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
390 
391 # =============================================================================
392 
393 class GMPComponent( object ) :
394  # This class will be the template for Reader, Worker and Writer
395  # containing all common components
396  # nodeId will be a numerical identifier for the node
397  # -1 for reader
398  # -2 for writer
399  # 0,...,nWorkers-1 for the Workers
400  def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
401  # declare a Gaudi MultiProcessing Node
402  # the nodeType is going to be one of Reader, Worker, Writer
403  # qPair is going to be a tuple of ( qin, qout )
404  # for sending and receiving
405  # if nodeType is "Writer", it will be a list of qPairs,
406  # as there's one queue-in from each Worker
407  #
408  # params is a tuple of (nWorkers, config, log)
409 
410  self.nodeType = nodeType
411  current_process().name = nodeType
412 
413  # Synchronisation Event() objects for keeping track of the system
414  self.initEvent, eventLoopSyncer, self.finalEvent = events
415  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
416 
417  # necessary for knowledge of the system
418  self.nWorkers, self.sEvent, self.config, self.log = params
419  self.subworkers = subworkers
420  self.nodeID = nodeID
421 
422  # describe the state of the node by the current Event Number
423  self.currentEvent = None
424 
425  # Unpack the Queues : (events, histos, filerecords)
426  self.queues = queues
427  self.num = 0
428 
429  ks = self.config.keys()
430  self.app = None
431  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
432  for k in list:
433  if k in ks: self.app = k
434 
435  def Start( self ) :
436  # define the separate process
437  qPair, histq, fq = self.queues
438 
439  # Set up the Queue Mechanisms ( Event Communicators )
440  if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
441  # Reader or Worker Node
442  qin, qout = qPair
443  self.evcom = EventCommunicator( self, qin, qout )
444  else :
445  # Writer : many queues in, no queue out
446  assert self.nodeType == 'Writer'
447  self.evcoms = []
448  qsin = qPair[0]
449  for q in qsin :
450  ec = EventCommunicator( self, q, None )
451  self.evcoms.append( ec )
452  # Histogram Queue
453  self.hq = histq
454  # FileRecords Queue
455  self.fq = fq
456 
457  # Universal Counters (available to all nodes)
458  # Use sensibly!!!
459  self.nIn = 0
460  self.nOut = 0
461 
462  # Status Flag (possibly remove later)
463  self.stat = SUCCESS
464 
465  # Set logger name
466  self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
467 
468  # Heuristic variables
469  # time for init, run, final, firstEventTime, totalTime
470  self.iTime = 0.0
471  self.rTime = 0.0
472  self.fTime = 0.0
473  self.firstEvTime = 0.0
474  self.tTime = 0.0
475 
476  self.proc = Process( target=self.Engine )
477  # Fork and start the separate process
478  self.proc.start()
479 
480 
481 
482 
483  def Engine( self ) :
484  # This will be the method carried out by the Node
485  # Different for all
486  pass
487 
488  def processConfiguration( self ) :
489  # Different for all ; customize Configuration for multicore
490  pass
491 
492  def SetupGaudiPython( self ) :
493  # This method will initialize the GaudiPython Tools
494  # such as the AppMgr and so on
495  self.a = AppMgr()
496  if SMAPS :
497  from AlgSmapShot import SmapShot
498  smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
499  ss = SmapShot( logname=smapsLog )
500  self.a.addAlgorithm( ss )
501  self.evt = self.a.evtsvc()
502  self.hvt = self.a.histsvc()
503  self.fsr = self.a.filerecordsvc()
504  self.inc = self.a.service('IncidentSvc','IIncidentSvc')
505  self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
506  self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
507  self.TS = TESSerializer( self.ts, self.evt,
508  self.nodeType, self.nodeID, self.log )
509  return SUCCESS
510 
511  def StartGaudiPython( self ) :
512  self.a.initialize()
513  self.a.start()
514  return SUCCESS
515 
516  def LoadTES( self, tbufferfile ) :
517  root = gbl.DataObject()
518  setOwnership(root, False)
519  self.evt.setRoot( '/Event', root )
520  self.ts.loadBuffer(tbufferfile)
521 
522  def getEventNumber( self ) :
523  if self.app != 'Gauss':
524  # Using getList or getHistoNames can result in the EventSelector
525  # re-initialising connection to RootDBase, which costs a lot of
526  # time... try to build a set of Header paths??
527 
528  # First Attempt : Unpacked Event Data
529  lst = [ '/Event/Gen/Header',
530  '/Event/Rec/Header' ]
531  for l in lst :
532  path = l
533  try :
534  n = self.evt[path].evtNumber()
535 
536  return n
537  except :
538  # No evt number at this path
539  continue
540 
541  # second attepmt : try DAQ/RawEvent data
542  # The Evt Number is in bank type 16, bank 0, data pt 4
543  try :
544  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
545 
546  return n
547  except :
548  pass
549 
550  # Default Action
551  if self.nIn > 0 or self.nOut > 0 :
552  pass
553  else :
554  self.log.warning('Could not determine Event Number')
555  return -1
556  else:
557  if self.nodeID == -1:
558  self.num = self.num + 1
559 
560  return self.num
561 
562  def IdentifyWriters( self ) :
563  #
564  # Identify Writers in the Configuration
565  #
566  d = {}
567  keys = [ "events", "records", "tuples", "histos" ]
568  for k in keys :
569  d[k] = []
570 
571  # Identify Writers and Classify
572  wkeys = WRITERTYPES.keys()
573  for v in self.config.values() :
574  if v.__class__.__name__ in wkeys :
575  writerType = WRITERTYPES[ v.__class__.__name__ ]
576  d[writerType].append( MiniWriter(v, writerType, self.config) )
577  if self.nodeID == 0 :
578  self.log.info('Writer Found : %s'%(v.name()))
579 
580  # Now Check for the Histogram Service
581  if 'HistogramPersistencySvc' in self.config.keys() :
582  hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
583  d[ "histos" ].append( hfile )
584  return d
585 
586  def dumpHistograms( self ) :
587  '''
588  Method used by the GaudiPython algorithm CollectHistos
589  to obtain a dictionary of form { path : object }
590  representing the Histogram Store
591  '''
592  nlist = self.hvt.getHistoNames( )
593  histDict = {}
594  objects = 0 ; histos = 0
595  if nlist :
596  for n in nlist :
597  o = self.hvt[ n ]
598  if type(o) in aidatypes :
599  o = aida2root(o)
600  histos += 1
601  else :
602  objects += 1
603  histDict[ n ] = o
604  else :
605  print 'WARNING : no histograms to recover?'
606  return histDict
607 
608  def Initialize( self ) :
609  start = time.time()
610  self.processConfiguration( )
611  self.SetupGaudiPython( )
612  self.initEvent.set()
613  self.StartGaudiPython( )
614 
615  if self.app == 'Gauss':
616 
617  tool = self.a.tool( "ToolSvc.EvtCounter" )
618  self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
619  else:
620  self.cntr = None
621 
622  self.iTime = time.time() - start
623 
624  def Finalize( self ) :
625  start = time.time()
626  self.a.stop()
627  self.a.finalize()
628  self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
629  self.finalEvent.set()
630  self.fTime = time.time() - start
631 
632  def Report( self ) :
633  self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
634  allTime = "Alive Time : %5.2f"%(self.tTime)
635  initTime = "Init Time : %5.2f"%(self.iTime)
636  frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
637  runTime = "Run Time : %5.2f"%(self.rTime)
638  finTime = "Finalise Time : %5.2f"%(self.fTime)
639  tup = ( allTime, initTime, frstTime, runTime, finTime )
640  for t in tup :
641  self.log.info( t )
642  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
643  # and report from the TESSerializer
644  self.TS.Report()
645 
646 # =============================================================================
647 
649  def __init__( self, queues, events, params, subworkers ) :
650  GMPComponent.__init__(self, 'Reader', -1, queues, events, params, subworkers )
651 
652  def processConfiguration( self ) :
653  # Reader :
654  # No algorithms
655  # No output
656  # No histos
657  self.config[ 'ApplicationMgr' ].TopAlg = []
658  self.config[ 'ApplicationMgr' ].OutStream = []
659  if "HistogramPersistencySvc" in self.config.keys() :
660  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
661  self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
662  self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
663 
664  def DumpEvent( self ) :
665  tb = TBufferFile( TBuffer.kWrite )
666  # print '----Reader dumping Buffer!!!'
667  self.ts.dumpBuffer( tb )
668  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
669  return tb
670 
671  def DoFirstEvent( self ) :
672  # Do First Event ------------------------------------------------------
673  # Check Termination Criteria
674  startFirst = time.time()
675  self.log.info('Reader : First Event')
676  if self.nOut == self.evtMax :
677  self.log.info('evtMax( %i ) reached'%(self.evtMax))
678  self.lastEvent.set()
679  return SUCCESS
680  else :
681  # Continue to read, dump and send event
682  self.a.run(1)
683  if not bool(self.evt['/Event']) :
684  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
685  self.lastEvent.set()
686  return SUCCESS
687  else :
688  # Popluate TESSerializer list and send Event
689  if self.app == "Gauss":
690  lst = self.evt.getHistoNames()
691  else:
692  try :
693  lst = self.evt.getList()
694  if self.app == "DaVinci":
695  daqnode = self.evt.retrieveObject( '/Event/DAQ' ).registry()
696  setOwnership( daqnode, False )
697  self.evt.getList( daqnode, lst, daqnode.address().par() )
698  except :
699  self.log.critical('Reader could not acquire TES List!')
700  self.lastEvent.set()
701  return FAILURE
702  self.log.info('Reader : TES List : %i items'%(len(lst)))
703  for l in lst :
704  self.ts.addItem(l)
706  tb = self.TS.Dump( )
707  self.log.info('First Event Sent')
708  self.evcom.send( (self.currentEvent, tb) )
709  self.nOut += 1
710  self.eventLoopSyncer.set()
711  self.evt.clearStore( )
712  self.firstEvTime = time.time()-startFirst
713  return SUCCESS
714 
715  def Engine( self ) :
716  # rename process
717  import os
718  import ctypes
719  libc = ctypes.CDLL('libc.so.6')
720  name = str(self.nodeType) + str(self.nodeID) + '\0'
721  libc.prctl(15,name,0,0,0)
722 
723 
724  startEngine = time.time()
725  self.log.name = 'Reader'
726  self.log.info('Reader Process starting')
727 
728  self.Initialize()
729 
730  # add the Histogram Collection Algorithm
731  self.a.addAlgorithm( CollectHistograms(self) )
732 
733  self.log.info('Reader Beginning Distribution')
734  sc = self.DoFirstEvent( )
735  if sc.isSuccess() :
736  self.log.info('Reader First Event OK')
737  else :
738  self.log.critical('Reader Failed on First Event')
739  self.stat = FAILURE
740 
741  # Do All Others -------------------------------------------------------
742  while True :
743  # Check Termination Criteria
744  if self.nOut == self.evtMax :
745  self.log.info('evtMax( %i ) reached'%(self.evtMax))
746  break
747  # Check Health
748  if not self.stat.isSuccess() :
749  self.log.critical( 'Reader is Damaged!' )
750  break
751  # Continue to read, dump and send event
752  t = time.time()
753  self.a.run(1)
754  self.rTime += (time.time()-t)
755  if not bool(self.evt['/Event']) :
756  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
757  break
758  self.currentEvent = self.getEventNumber( )
759  tb = self.TS.Dump( )
760  self.evcom.send( (self.currentEvent, tb) )
761  # clean up
762  self.nOut += 1
763  self.eventLoopSyncer.set()
764  self.evt.clearStore( )
765  self.log.info('Setting <Last> Event')
766  self.lastEvent.set()
767 
768  # Finalize
769  self.log.info( 'Reader : Event Distribution complete.' )
770  self.evcom.finalize()
771  self.Finalize()
772  self.tTime = time.time() - startEngine
773  self.Report()
774 
775 # =============================================================================
777  def __init__( self, workerID, queues, events, params, subworkers ) :
778  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
779  # Identify the writer streams
781  # Identify the accept/veto checks for each event
782  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
783  self.log.info("Subworker-%i Created OK"%(self.nodeID))
784  self.eventOutput = True
785 
786  def Engine( self ) :
787  # rename process
788  import os
789  import ctypes
790  libc = ctypes.CDLL('libc.so.6')
791  name = str(self.nodeType) + str(self.nodeID) + '\0'
792  libc.prctl(15,name,0,0,0)
793 
794  self.initEvent.set()
795  startEngine = time.time()
796  msg = self.a.service('MessageSvc')
797  msg.Format = '[' + self.log.name + '] % F%18W%S%7W%R%T %0W%M'
798 
799  self.log.name = "Worker-%i"%(self.nodeID)
800  self.log.info("Subworker %i starting Engine"%(self.nodeID))
802 
803  # populate the TESSerializer itemlist
804  self.log.info('EVT WRITERS ON WORKER : %i'\
805  %( len(self.writerDict['events'])))
806 
807  nEventWriters = len( self.writerDict[ "events" ] )
808  self.a.addAlgorithm( CollectHistograms(self) )
809 
810  # Begin processing
811  Go = True
812  while Go :
813  packet = self.evcom.receive( )
814  if packet : pass
815  else : continue
816  if packet == 'FINISHED' : break
817  evtNumber, tbin = packet # unpack
818  if self.cntr != None:
819 
820  self.cntr.setEventCounter( evtNumber )
821 
822  self.nIn += 1
823  self.TS.Load( tbin )
824 
825  t = time.time()
826  sc = self.a.executeEvent()
827  if self.nIn == 1 :
828  self.firstEvTime = time.time()-t
829  else :
830  self.rTime += (time.time()-t)
831  if sc.isSuccess() :
832  pass
833  else :
834  self.log.name = "Worker-%i"%(self.nodeID)
835  self.log.warning('Did not Execute Event')
836  self.evt.clearStore()
837  continue
838  if self.isEventPassed() :
839  pass
840  else :
841  self.log.name = "Worker-%i"%(self.nodeID)
842  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
843  self.evt.clearStore()
844  continue
845  if self.eventOutput :
846  # It may be the case of generating Event Tags; hence
847  # no event output
849  tb = self.TS.Dump( )
850  self.evcom.send( (self.currentEvent, tb) )
851  self.nOut += 1
852  self.inc.fireIncident(gbl.Incident('Subworker','EndEvent'))
853  self.eventLoopSyncer.set()
854  self.evt.clearStore( )
855  self.log.name = "Worker-%i"%(self.nodeID)
856  self.log.info('Setting <Last> Event %s' %(self.nodeID))
857  self.lastEvent.set()
858 
859  self.evcom.finalize()
860  # Now send the FileRecords and stop/finalize the appMgr
861  self.filerecordsAgent.SendFileRecords()
862  self.tTime = time.time()-startEngine
863  self.Finalize()
864  self.Report()
865  #self.finalEvent.set()
866 
867  def SetServices(self,a, evt, hvt, fsr, inc, pers, ts , cntr):
868  self.a = a
869  self.evt = evt
870  self.hvt = hvt
871  self.fsr = fsr
872  #self.inc = inc
873  self.inc = self.a.service('IncidentSvc','IIncidentSvc')
874  self.pers = pers
875  self.ts = ts
876  self.cntr = cntr
877  self.TS = TESSerializer( self.ts, self.evt,
878  self.nodeType, self.nodeID, self.log )
879 
880 
881  def getCheckAlgs( self ) :
882  '''
883  For some output writers, a check is performed to see if the event has
884  executed certain algorithms.
885  These reside in the AcceptAlgs property for those writers
886  '''
887  acc = []
888  req = []
889  vet = []
890  for m in self.writerDict[ "events" ] :
891  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
892  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
893  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
894  return (acc, req, vet)
895 
896 
897  def checkExecutedPassed( self, algName ) :
898  if self.a.algorithm( algName )._ialg.isExecuted()\
899  and self.a.algorithm( algName )._ialg.filterPassed() :
900  return True
901  else :
902  return False
903 
904  def isEventPassed( self ) :
905  '''
906  Check the algorithm status for an event.
907  Depending on output writer settings, the event
908  may be declined based on various criteria.
909  This is a transcript of the check that occurs in GaudiSvc::OutputStream
910  '''
911  passed = False
912 
913  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
914  if self.acceptAlgs :
915  for name in self.acceptAlgs :
916  if self.checkExecutedPassed( name ) :
917  passed = True
918  break
919  else :
920  passed = True
921 
922  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
923  for name in self.requireAlgs :
924  if self.checkExecutedPassed( name ) :
925  pass
926  else :
927  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
928  passed = False
929 
930  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
931  for name in self.vetoAlgs :
932  if self.checkExecutedPassed( name ) :
933  pass
934  else :
935  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
936  passed = False
937  return passed
938 
939 # =============================================================================
941  def __init__( self, workerID, queues, events, params , subworkers ) :
942  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
943  # Identify the writer streams
945  # Identify the accept/veto checks for each event
946  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
947  self.log.name = "Worker-%i"%(self.nodeID)
948  self.log.info("Worker-%i Created OK"%(self.nodeID))
949  self.eventOutput = True
950 
951  def processConfiguration( self ) :
952 
953  # Worker :
954  # No input
955  # No output
956  # No Histos
957  self.config[ 'EventSelector' ].Input = []
958  self.config[ 'ApplicationMgr' ].OutStream = []
959  if "HistogramPersistencySvc" in self.config.keys() :
960  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
961  formatHead = '[Worker-%i] '%(self.nodeID)
962  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
963 
964  for key, lst in self.writerDict.iteritems() :
965  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
966 
967  for m in self.writerDict[ "tuples" ] :
968  # rename Tuple output file with an appendix
969  # based on worker id, for merging later
970  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
971  self.config[ m.key ].Output = newName
972 
973  # Suppress INFO Output for all but Worker-0
974  #if self.nodeID == 0 :
975  # pass
976  #else :
977  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
978 
979  if self.app == "Gauss":
980  try:
981  if "ToolSvc.EvtCounter" not in self.config:
982  from Configurables import EvtCounter
983  counter = EvtCounter()
984  else:
985  counter = self.config["ToolSvc.EvtCounter"]
986  counter.UseIncident = False
987  except:
988  # ignore errors when trying to change the configuration of the EvtCounter
989  self.log.warning('Cannot configure EvtCounter')
990 
991  def Engine( self ) :
992 
993  # rename process
994  import os
995  import ctypes
996  libc = ctypes.CDLL('libc.so.6')
997  name = str(self.nodeType) + str(self.nodeID) + '\0'
998  libc.prctl(15,name,0,0,0)
999 
1000  startEngine = time.time()
1001  self.log.info("Worker %i starting Engine"%(self.nodeID))
1002  self.Initialize()
1004 
1005  # populate the TESSerializer itemlist
1006  self.log.info('EVT WRITERS ON WORKER : %i'\
1007  %( len(self.writerDict['events'])))
1008 
1009  nEventWriters = len( self.writerDict[ "events" ] )
1010  if nEventWriters :
1011  itemList = set()
1012  optItemList = set()
1013  for m in self.writerDict[ "events" ] :
1014  for item in m.ItemList :
1015  hsh = item.find( '#' )
1016  if hsh != -1:
1017  item = item[:hsh]
1018  itemList.add( item )
1019  for item in m.OptItemList :
1020  hsh = item.find( '#' )
1021  if hsh != -1:
1022  item = item[:hsh]
1023  optItemList.add( item )
1024  # If an item is mandatory and optional, keep it only in the optional list
1025  itemList -= optItemList
1026  for item in sorted( itemList ):
1027  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
1028  self.ts.addItem( item )
1029  for item in sorted( optItemList ):
1030  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
1031  self.ts.addOptItem( item )
1032  else :
1033  self.log.info( 'There is no Event Output for this app' )
1034  self.eventOutput = False
1035 
1036  # Begin processing
1037  Go = True
1038  while Go :
1039  packet = self.evcom.receive( )
1040  if packet : pass
1041  else : continue
1042  if packet == 'FINISHED' : break
1043  evtNumber, tbin = packet # unpack
1044  if self.cntr != None:
1045  self.cntr.setEventCounter( evtNumber )
1046 
1047  # subworkers are forked before the first event is processed
1048  # reader-thread for ConDB must be closed and reopened in each subworker
1049  # this is done by disconnect()
1050  if self.nIn == 0:
1051 
1052  self.log.info("Fork new subworkers and disconnect from CondDB")
1053  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1054  condDB.disconnect()
1055 
1056  # Fork subworkers and share services
1057  for k in self.subworkers:
1058  k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1059  k.Start()
1060  self.a.addAlgorithm( CollectHistograms(self) )
1061  self.nIn += 1
1062  self.TS.Load( tbin )
1063 
1064  t = time.time()
1065  sc = self.a.executeEvent()
1066  if self.nIn == 1 :
1067  self.firstEvTime = time.time()-t
1068  else :
1069  self.rTime += (time.time()-t)
1070  if sc.isSuccess() :
1071  pass
1072  else :
1073  self.log.warning('Did not Execute Event')
1074  self.evt.clearStore()
1075  continue
1076  if self.isEventPassed() :
1077  pass
1078  else :
1079  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
1080  self.evt.clearStore()
1081  continue
1082  if self.eventOutput :
1083  # It may be the case of generating Event Tags; hence
1084  # no event output
1086  tb = self.TS.Dump( )
1087  self.evcom.send( (self.currentEvent, tb) )
1088  self.nOut += 1
1089  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
1090  self.eventLoopSyncer.set()
1091  self.evt.clearStore( )
1092  self.log.info('Setting <Last> Event')
1093  self.lastEvent.set()
1094 
1095  self.evcom.finalize()
1096  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
1097  # Now send the FileRecords and stop/finalize the appMgr
1098  self.filerecordsAgent.SendFileRecords()
1099  self.Finalize()
1100  self.tTime = time.time()-startEngine
1101  self.Report()
1102 
1103  for k in self.subworkers:
1104  self.log.info('Join subworkers')
1105  k.proc.join()
1106 
1107  def getCheckAlgs( self ) :
1108  '''
1109  For some output writers, a check is performed to see if the event has
1110  executed certain algorithms.
1111  These reside in the AcceptAlgs property for those writers
1112  '''
1113  acc = []
1114  req = []
1115  vet = []
1116  for m in self.writerDict[ "events" ] :
1117  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
1118  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
1119  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
1120  return (acc, req, vet)
1121 
1122 
1123  def checkExecutedPassed( self, algName ) :
1124  if self.a.algorithm( algName )._ialg.isExecuted()\
1125  and self.a.algorithm( algName )._ialg.filterPassed() :
1126  return True
1127  else :
1128  return False
1129 
1130  def isEventPassed( self ) :
1131  '''
1132  Check the algorithm status for an event.
1133  Depending on output writer settings, the event
1134  may be declined based on various criteria.
1135  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1136  '''
1137  passed = False
1138 
1139  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1140  if self.acceptAlgs :
1141  for name in self.acceptAlgs :
1142  if self.checkExecutedPassed( name ) :
1143  passed = True
1144  break
1145  else :
1146  passed = True
1147 
1148  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
1149  for name in self.requireAlgs :
1150  if self.checkExecutedPassed( name ) :
1151  pass
1152  else :
1153  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
1154  passed = False
1155 
1156  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1157  for name in self.vetoAlgs :
1158  if self.checkExecutedPassed( name ) :
1159  pass
1160  else :
1161  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
1162  passed = False
1163  return passed
1164 
1165 # =============================================================================
1166 
1168  def __init__( self, queues, events, params, subworkers ) :
1169  GMPComponent.__init__(self,'Writer', -2, queues, events, params, subworkers )
1170  # Identify the writer streams
1172  # This keeps track of workers as they finish
1173  self.status = [False]*self.nWorkers
1174  self.log.name = "Writer--2"
1175 
1176  def processConfiguration( self ) :
1177  # Writer :
1178  # No input
1179  # No Algs
1180  self.config[ 'ApplicationMgr' ].TopAlg = []
1181  self.config[ 'EventSelector' ].Input = []
1182 
1183  self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
1184 
1185  # Now process the output writers
1186  for key, lst in self.writerDict.iteritems() :
1187  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
1188 
1189  # Modify the name of the output file to reflect that it came
1190  # from a parallel processing
1191  #
1192  # Event Writers
1193  for m in self.writerDict[ "events" ] :
1194  self.log.debug( 'Processing Event Writer : %s'%(m) )
1195  newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
1196  self.config[ m.key ].Output = newName
1197 
1198  # Now, if there are no event writers, the FileRecords file
1199  # will fail to open, as it only opens an UPDATE version
1200  # of the existing Event Output File
1201  # So, if there are no event writers, edit the string of the
1202  # FileRecord Writer
1203 
1204  # FileRecords Writers
1205  for m in self.writerDict[ "records" ] :
1206  self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
1207  newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
1208  extra=" OPT='RECREATE'" )
1209  self.config[ m.key ].Output = newName
1210 
1211  # same for histos
1212  hs = "HistogramPersistencySvc"
1213  n = None
1214  if hs in self.config.keys() :
1215  n = self.config[ hs ].OutputFile
1216  if n :
1217  newName=self.config[hs].OutputFile.replace('.',\
1218  '.p%i.'%(self.nWorkers))
1219  self.config[ hs ].OutputFile = newName
1220 
1221  def Engine( self ) :
1222  # rename process
1223  import os
1224  import ctypes
1225  libc = ctypes.CDLL('libc.so.6')
1226  name = str(self.nodeType) + str(self.nodeID) + '\0'
1227  libc.prctl(15,name,0,0,0)
1228 
1229  startEngine = time.time()
1230  self.Initialize()
1231  self.histoAgent = HistoAgent( self )
1233 
1234  # Begin processing
1235  Go = True
1236  current = -1
1237  stopCriteria = self.nWorkers
1238  while Go :
1239  current = (current+1)%self.nWorkers
1240  packet = self.evcoms[current].receive( timeout=0.01 )
1241  if packet == None :
1242  continue
1243  if packet == 'FINISHED' :
1244  self.log.info('Writer got FINISHED flag : Worker %i'%(current))
1245 
1246  self.status[current] = True
1247  if all(self.status) :
1248  self.log.info('FINISHED recd from all workers, break loop')
1249  break
1250  continue
1251  # otherwise, continue as normal
1252  self.nIn += 1 # update central count (maybe needed by FSR store)
1253  evtNumber, tbin = packet # unpack
1254  self.TS.Load( tbin )
1255  t = time.time()
1256  self.a.executeEvent()
1257  self.rTime += ( time.time()-t )
1259  self.evt.clearStore( )
1260  self.eventLoopSyncer.set()
1261  self.log.name = "Writer--2"
1262  self.log.info('Setting <Last> Event')
1263  self.lastEvent.set()
1264 
1265  # finalisation steps
1266  [ e.finalize() for e in self.evcoms ]
1267  # Now do Histograms
1268  sc = self.histoAgent.Receive()
1269  sc = self.histoAgent.RebuildHistoStore()
1270  if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
1271  else : self.log.warning( 'Histo Store Error in Rebuild' )
1272 
1273  # Now do FileRecords
1274  sc = self.filerecordsAgent.Receive()
1275  self.filerecordsAgent.Rebuild()
1276  self.Finalize()
1277  #self.rTime = time.time()-startEngine
1278  self.Report()
1279 
1280 # =============================================================================
1281 
1282 
1283 
1284 # =============================================================================
1285 
1286 class Coord( object ) :
1287  def __init__( self, nWorkers, config, log ) :
1288 
1289  self.log = log
1290  self.config = config
1291  # set up Logging
1292  self.log.name = 'GaudiPython-Parallel-Logger'
1293  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1294 
1295  if nWorkers == -1 :
1296  # The user has requested all available cpus in the machine
1297  self.nWorkers = cpu_count()
1298  else :
1299  self.nWorkers = nWorkers
1300 
1301 
1302  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1303  self.hq = JoinableQueue( ) # for Histogram data
1304  self.fq = JoinableQueue( ) # for FileRecords data
1305 
1306  # Make a Syncer for Initalise, Run, and Finalise
1307  self.sInit = Syncer( self.nWorkers, self.log,
1308  limit=WAIT_INITIALISE,
1309  step=STEP_INITIALISE )
1310  self.sRun = Syncer( self.nWorkers, self.log,
1311  manyEvents=True,
1312  limit=WAIT_SINGLE_EVENT,
1313  step=STEP_EVENT,
1314  firstEvent=WAIT_FIRST_EVENT )
1315  self.sFin = Syncer( self.nWorkers, self.log,
1316  limit=WAIT_FINALISE,
1317  step=STEP_FINALISE )
1318  # and one final one for Histogram Transfer
1319  self.histSyncEvent = Event()
1320 
1321  # params are common to al subprocesses
1322  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1323 
1324  self.subworkers = []
1325  # Declare SubProcesses!
1326  for i in range(1, self.nWorkers ) :
1327  sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1328  self.subworkers.append( sub )
1329  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1330  self.workers = []
1331  wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1332  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1333 
1334  self.system = []
1335  self.system.append(self.writer)
1336  self.system.append(wk)
1337  self.system.append(self.reader)
1338 
1339  def getSyncEvents( self, nodeID ) :
1340  init = self.sInit.d[nodeID].event
1341  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1342  fin = self.sFin.d[nodeID].event
1343  return ( init, run, fin )
1344 
1345  def getQueues( self, nodeID ) :
1346  eventQ = self.qs[ nodeID ]
1347  histQ = self.hq
1348  fsrQ = self.fq
1349  return ( eventQ, histQ, fsrQ )
1350 
1351  def Go( self ) :
1352 
1353  # Initialise
1354  self.log.name = 'GaudiPython-Parallel-Logger'
1355  self.log.info( 'INITIALISING SYSTEM' )
1356 
1357  # Start reader, writer and main worker
1358  for p in self.system :
1359  p.Start()
1360 
1361  sc = self.sInit.syncAll(step="Initialise")
1362  if sc == SUCCESS: pass
1363  else : self.Terminate() ; return FAILURE
1364 
1365  # Run
1366  self.log.name = 'GaudiPython-Parallel-Logger'
1367  self.log.info( 'RUNNING SYSTEM' )
1368  sc = self.sRun.syncAll(step="Run")
1369  if sc == SUCCESS: pass
1370  else : self.Terminate() ; return FAILURE
1371 
1372  # Finalise
1373  self.log.name = 'GaudiPython-Parallel-Logger'
1374  self.log.info( 'FINALISING SYSTEM' )
1375  sc = self.sFin.syncAll(step="Finalise")
1376  if sc == SUCCESS: pass
1377  else : self.Terminate() ; return FAILURE
1378 
1379  # if we've got this far, finally report SUCCESS
1380  self.log.info( "Cleanly join all Processes" )
1381  self.Stop()
1382  self.log.info( "Report Total Success to Main.py" )
1383  return SUCCESS
1384 
1385  def Terminate( self ) :
1386  # Brutally kill sub-processes
1387  children = multiprocessing.active_children()
1388  for i in children:
1389  i.terminate()
1390 
1391  #self.writer.proc.terminate()
1392  #[ w.proc.terminate() for w in self.workers]
1393  #self.reader.proc.terminate()
1394 
1395  def Stop( self ) :
1396  # procs should be joined in reverse order to launch
1397  self.system.reverse()
1398  for s in self.system :
1399  s.proc.join()
1400  return SUCCESS
1401 
1402  def SetupQueues( self ) :
1403  # This method will set up the network of Queues needed
1404  # N Queues = nWorkers + 1
1405  # Each Worker has a Queue in, and a Queue out
1406  # Reader has Queue out only
1407  # Writer has nWorkers Queues in
1408 
1409  # one queue from Reader-Workers
1410  rwk = JoinableQueue()
1411  # one queue from each worker to writer
1412  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1413  d = {}
1414  d[-1] = (None, rwk) # Reader
1415  d[-2] = (workersWriter, None) # Writer
1416  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1417  return d
1418 
1419 # ============================= EOF ===========================================
double sum(double x, double y, double z)
string type
Definition: gaudirun.py:126
StatusCode execute()
The action to be performed by the algorithm on an event.
Definition: Algorithm.cpp:92
Python Algorithm base class.
Definition: Algorithm.h:35
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container
Definition: NamedRange.h:133
StatusCode finalize()
the default (empty) implementation of IStateful::finalize() method
Definition: Algorithm.cpp:98