Gaudi Framework, version v23r5

Home   Generated: Wed Nov 28 2012
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
GMPBase.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from Configurables import EvtCounter
3 from GaudiPython import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
4 from ROOT import TBufferFile, TBuffer
5 import multiprocessing
6 from multiprocessing import Process, Queue, JoinableQueue, Event
7 from multiprocessing import cpu_count, current_process
8 from multiprocessing.queues import Empty
9 from pTools import *
10 import time, sys, os
11 
12 # This script contains the bases for the Gaudi MultiProcessing (GMP)
13 # classes
14 
15 # There are three classes :
16 # Reader
17 # Worker
18 # Writer
19 
20 # Each class needs to perform communication with the others
21 # For this, we need a means of communication, which will be based on
22 # the python multiprocessing package
23 # This is provided in SPI pytools package
24 # cmt line : use pytools v1.1 LCG_Interfaces
25 # The PYTHONPATH env variable may need to be modified, as this might
26 # still point to 1.0_python2.5
27 
28 # Each class will need Queues, and a defined method for using these
29 # queues.
30 # For example, as long as there is something in the Queue, both ends
31 # of the queue must be open
32 # Also, there needs to be proper Termination flags and criteria
33 # The System should be error proof.
34 
35 
36 # Constants -------------------------------------------------------------------
37 NAP = 0.001
38 MB = 1024.0*1024.0
39 # waits to guard against hanging, in seconds
40 WAIT_INITIALISE = 60*5
41 WAIT_FIRST_EVENT = 60*3
42 WAIT_SINGLE_EVENT = 60*6
43 WAIT_FINALISE = 60*2
44 STEP_INITIALISE = 10
45 STEP_EVENT = 2
46 STEP_FINALISE = 5
47 
48 # My switch for direct switching on/off Smaps Algorithm in GaudiPython AppMgr
49 SMAPS = False
50 
51 # -----------------------------------------------------------------------------
52 
53 # definitions
54 # ----------
55 # used to convert stored histos (in AIDA format) to ROOT format
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
57 
58 # used to check which type of histo we are dealing with
59 # i.e. if currentHisto in aidatypes : pass
60 aidatypes = ( gbl.AIDA.IHistogram,
61  gbl.AIDA.IHistogram1D,
62  gbl.AIDA.IHistogram2D,
63  gbl.AIDA.IHistogram3D,
64  gbl.AIDA.IProfile1D,
65  gbl.AIDA.IProfile2D,
66  gbl.AIDA.IBaseHistogram ) # extra?
67 
68 # similar to aidatypes
69 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
70 
71 # Types of OutputStream in Gaudi
72 WRITERTYPES = { 'EvtCollectionStream' : "tuples",
73  'InputCopyStream' : "events",
74  'OutputStream' : "events",
75  'RecordStream' : "records",
76  'RunRecordStream' : "records",
77  'SequentialOutputStream' : "events",
78  'TagCollectionStream' : "tuples" }
79 
80 # =============================================================================
81 
82 class MiniWriter( object ) :
83  '''
84  A class to represent a writer in the GaudiPython configuration
85  It can be non-trivial to access the name of the output file; it may be
86  specified in the DataSvc, or just on the writer, may be a list, or string
87  Also, there are three different types of writer (events, records, tuples)
88  so this bootstrap class provides easy access to this info while configuring
89  '''
90  def __init__( self, writer, wType, config ) :
91  self.w = writer
92  self.wType = wType
93  # set parameters for directly accessing the correct
94  # part of the configuration, so that later, we can do
95  # config[ key ].Output = modified(output)
96  self.key = None
97  self.output = None
98  self.ItemList = None
99  self.OptItemList = None
100  #
101  self.wName = writer.getName()
102  # Now process the Writer, find where the output is named
103  self.woutput = None
104  self.datasvcName = None
105  self.svcOutput = None
106  if hasattr( self.w, "Output" ) :
107  self.woutput = self.w.Output
108  self.getItemLists( config )
109  self.set( self.wName, self.w.Output )
110  return
111  else :
112  # if there is no output file, get it via the datasvc
113  # (every writer always has a datasvc property)
114  self.datasvcName = self.w.EvtDataSvc
115  datasvc = config[ self.datasvcName ]
116  if hasattr( datasvc, "Output" ) :
117  self.getItemLists( config )
118  self.set( self.datasvcName, datasvc.Output )
119  return
120 
121  def getNewName( self, replaceThis, withThis, extra='' ) :
122  # replace one pattern in the output name string
123  # with another, and return the Output name
124  # It *might* be in a list, so check for this
125  #
126  # @param extra : might need to add ROOT flags
127  # i.e.: OPT='RECREATE', or such
128  assert replaceThis.__class__.__name__ == 'str'
129  assert withThis.__class__.__name__ == 'str'
130  old = self.output
131  lst = False
132  if old.__class__.__name__ == 'list' :
133  old = self.output[0]
134  lst = True
135  new = old.replace( replaceThis, withThis )
136  new += extra
137  if lst :
138  return [ new ]
139  else :
140  return new
141 
142  def getItemLists( self, config ) :
143  # the item list
144  if hasattr( self.w, "ItemList" ) :
145  self.ItemList = self.w.ItemList
146  else :
147  datasvc = config[ self.w.EvtDataSvc ]
148  if hasattr( datasvc, "ItemList" ) :
149  self.ItemList = datasvc.ItemList
150  # The option item list; possibly not a valid variable
151  if hasattr( self.w, "OptItemList" ) :
152  self.OptItemList = self.w.OptItemList
153  else :
154  datasvc = config[ self.w.EvtDataSvc ]
155  if hasattr( datasvc, "OptItemList" ) :
156  self.OptItemList = datasvc.OptItemList
157  return
158 
159  def set( self, key, output ) :
160  self.key = key
161  self.output = output
162  return
163 
164  def __repr__( self ) :
165  s = ""
166  line = '-'*80
167  s += (line+'\n')
168  s += "Writer : %s\n"%( self.wName )
169  s += "Writer Type : %s\n"%( self.wType )
170  s += "Writer Output : %s\n"%( self.output )
171  s += "DataSvc : %s\n"%( self.datasvcName )
172  s += "DataSvc Output : %s\n"%( self.svcOutput )
173  s += '\n'
174  s += "Key for config : %s\n"%( self.key )
175  s += "Output File : %s\n"%( self.output )
176  s += "ItemList : %s\n"%( self.ItemList )
177  s += "OptItemList : %s\n"%( self.OptItemList )
178  s += (line+'\n')
179  return s
180 
181 # =============================================================================
182 
184  '''
185  GaudiPython algorithm used to clean up histos on the Reader and Workers
186  Only has a finalize method()
187  This retrieves a dictionary of path:histo objects and sends it to the
188  writer. It then waits for a None flag : THIS IS IMPORTANT, as if
189  the algorithm returns before ALL histos have been COMPLETELY RECEIVED
190  at the writer end, there will be an error.
191  '''
192  def __init__( self, gmpcomponent ) :
193  PyAlgorithm.__init__( self )
194  self._gmpc = gmpcomponent
195  self.log = self._gmpc.log
196  return None
197  def execute( self ) :
198  return SUCCESS
199  def finalize( self ) :
200  self.log.info('CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
201  self._gmpc.hDict = self._gmpc.dumpHistograms( )
202  ks = self._gmpc.hDict.keys()
203  self.log.info('%i Objects in Histogram Store'%(len(ks)))
204  # crashes occurred due to Memory Error during the sending of hundreds
205  # histos on slc5 machines, so instead, break into chunks
206  # send 100 at a time
207  chunk = 100
208  reps = len(ks)/chunk + 1
209  for i in xrange(reps) :
210  someKeys = ks[i*chunk : (i+1)*chunk]
211  smalld = dict( [(key, self._gmpc.hDict[key]) for key in someKeys] )
212  self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
213  # "finished" Notification
214  self.log.debug('Signalling end of histos to Writer')
215  self._gmpc.hq.put( 'HISTOS_SENT' )
216  self.log.debug( 'Waiting on Sync Event' )
217  self._gmpc.sEvent.wait()
218  self.log.debug( 'Histo Sync Event set, clearing and returning' )
219  self._gmpc.hvt.clearStore()
220  root = gbl.DataObject()
221  setOwnership(root, False)
222  self._gmpc.hvt.setRoot( '/stat', root )
223  return SUCCESS
224 
225 # =============================================================================
226 
227 class EventCommunicator( object ) :
228  # This class is responsible for communicating Gaudi Events via Queues
229  # Events are communicated as TBufferFiles, filled either by the
230  # TESSerializer, or the GaudiSvc, "IPCSvc"
231  def __init__( self, GMPComponent, qin, qout ) :
232  self._gmpc = GMPComponent
233  self.log = self._gmpc.log
234  # maximum capacity of Queues
235  self.maxsize = 50
236  # central variables : Queues
237  self.qin = qin
238  self.qout = qout
239 
240  # flags
241  self.allsent = False
242  self.allrecv = False
243 
244  # statistics storage
245  self.nSent = 0 # counter : items sent
246  self.nRecv = 0 # counter : items received
247  self.sizeSent = 0 # measure : size of events sent ( tbuf.Length() )
248  self.sizeRecv = 0 # measure : size of events in ( tbuf.Length() )
249  self.qinTime = 0 # time : receiving from self.qin
250  self.qoutTime = 0 # time : sending on qout
251 
252  def send( self, item ) :
253  # This class manages the sending of a TBufferFile Event to a Queue
254  # The actual item to be sent is a tuple : ( evtNumber, TBufferFile )
255  assert item.__class__.__name__ == 'tuple'
256  startTransmission = time.time()
257  self.qout.put( item )
258  # allow the background thread to feed the Queue; not 100% guaranteed to
259  # finish before next line
260  while self.qout._buffer : time.sleep( NAP )
261  self.qoutTime += time.time()-startTransmission
262  self.sizeSent += item[1].Length()
263  self.nSent += 1
264  return SUCCESS
265 
266  def receive( self, timeout=None ) :
267  # Receive items from self.qin
268  startWait = time.time()
269  try :
270  itemIn = self.qin.get( timeout=timeout )
271  except Empty :
272  return None
273  self.qinTime += time.time()-startWait
274  self.nRecv += 1
275  if itemIn.__class__.__name__ == 'tuple' :
276  self.sizeRecv += itemIn[1].Length()
277  else :
278  self.nRecv -= 1
279  try :
280  self.qin.task_done()
281  except :
282  self._gmpc.log.warning('TASK_DONE called too often by : %s'\
283  %(self._gmpc.nodeType))
284  return itemIn
285 
286  def finalize( self ) :
287  self.log.info('Finalize Event Communicator : %s'%(self._gmpc.nodeType))
288  # Reader sends one flag for each worker
289  # Workers send one flag each
290  # Writer sends nothing (it's at the end of the chain)
291  if self._gmpc.nodeType == 'Reader' : downstream = self._gmpc.nWorkers
292  elif self._gmpc.nodeType == 'Writer' : downstream = 0
293  elif self._gmpc.nodeType == 'Worker' : downstream = 1
294  for i in xrange(downstream) :
295  self.qout.put( 'FINISHED' )
296  if self._gmpc.nodeType != 'Writer' :
297  self.qout.join()
298  # Now some reporting...
299  self.statistics( )
300 
301  def statistics( self ) :
302  self.log.name = '%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
303  self.log.info ( 'Items Sent : %i'%(self.nSent) )
304  self.log.info ( 'Items Received : %i'%(self.nRecv) )
305  self.log.info ( 'Data Sent : %i'%(self.sizeSent) )
306  self.log.info ( 'Data Received : %i'%(self.sizeRecv) )
307  self.log.info ( 'Q-out Time : %5.2f'%(self.qoutTime) )
308  self.log.info ( 'Q-in Time : %5.2f'%(self.qinTime ) )
309 
310 # =============================================================================
311 
312 class TESSerializer( object ) :
313  def __init__( self, gaudiTESSerializer, evtDataSvc,
314  nodeType, nodeID, log ) :
315  self.T = gaudiTESSerializer
316  self.evt = evtDataSvc
317  self.buffersIn = []
318  self.buffersOut = []
319  self.nIn = 0
320  self.nOut = 0
321  self.tDump = 0.0
322  self.tLoad = 0.0
323  # logging
324  self.nodeType = nodeType
325  self.nodeID = nodeID
326  self.log = log
327  def Load( self, tbuf ) :
328  root = gbl.DataObject()
329  setOwnership( root, False )
330  self.evt.setRoot( '/Event', root )
331  t = time.time()
332  self.T.loadBuffer( tbuf )
333  self.tLoad += (time.time() - t)
334  self.nIn += 1
335  self.buffersIn.append( tbuf.Length() )
336  def Dump( self ) :
337  t = time.time()
338  tb = TBufferFile( TBuffer.kWrite )
339  self.T.dumpBuffer(tb)
340  self.tDump += ( time.time()-t )
341  self.nOut += 1
342  self.buffersOut.append( tb.Length() )
343  return tb
344  def Report( self ) :
345  evIn = "Events Loaded : %i"%( self.nIn )
346  evOut = "Events Dumped : %i"%( self.nOut )
347  din = sum( self.buffersIn )
348  dataIn = "Data Loaded : %i"%(din)
349  dataInMb = "Data Loaded (MB) : %5.2f Mb"%(din/MB)
350  if self.nIn :
351  avgIn = "Avg Buf Loaded : %5.2f Mb"\
352  %( din/(self.nIn*MB) )
353  maxIn = "Max Buf Loaded : %5.2f Mb"\
354  %( max(self.buffersIn)/MB )
355  else :
356  avgIn = "Avg Buf Loaded : N/A"
357  maxIn = "Max Buf Loaded : N/A"
358  dout = sum( self.buffersOut )
359  dataOut = "Data Dumped : %i"%(dout)
360  dataOutMb = "Data Dumped (MB) : %5.2f Mb"%(dout/MB)
361  if self.nOut :
362  avgOut = "Avg Buf Dumped : %5.2f Mb"\
363  %( din/(self.nOut*MB) )
364  maxOut = "Max Buf Dumped : %5.2f Mb"\
365  %( max(self.buffersOut)/MB )
366  else :
367  avgOut = "Avg Buf Dumped : N/A"
368  maxOut = "Max Buf Dumped : N/A"
369  dumpTime = "Total Dump Time : %5.2f"%( self.tDump )
370  loadTime = "Total Load Time : %5.2f"%( self.tLoad )
371 
372  lines = evIn ,\
373  evOut ,\
374  dataIn ,\
375  dataInMb ,\
376  avgIn ,\
377  maxIn ,\
378  dataOut ,\
379  dataOutMb,\
380  avgOut ,\
381  maxOut ,\
382  dumpTime ,\
383  loadTime
384  self.log.name = "%s-%i TESSerializer"%(self.nodeType, self.nodeID)
385  for line in lines :
386  self.log.info( line )
387  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
388 
389 # =============================================================================
390 
391 class GMPComponent( object ) :
392  # This class will be the template for Reader, Worker and Writer
393  # containing all common components
394  # nodeId will be a numerical identifier for the node
395  # -1 for reader
396  # -2 for writer
397  # 0,...,nWorkers-1 for the Workers
398  def __init__( self, nodeType, nodeID, queues, events, params ) :
399  # declare a Gaudi MultiProcessing Node
400  # the nodeType is going to be one of Reader, Worker, Writer
401  # qPair is going to be a tuple of ( qin, qout )
402  # for sending and receiving
403  # if nodeType is "Writer", it will be a list of qPairs,
404  # as there's one queue-in from each Worker
405  #
406  # params is a tuple of (nWorkers, config, log)
407 
408  self.nodeType = nodeType
409  current_process().name = nodeType
410 
411  # Synchronisation Event() objects for keeping track of the system
412  self.initEvent, eventLoopSyncer, self.finalEvent = events
413  self.eventLoopSyncer, self.lastEvent = eventLoopSyncer # unpack tuple
414 
415  # necessary for knowledge of the system
416  self.nWorkers, self.sEvent, self.config, self.log = params
417  self.nodeID = nodeID
418 
419  # describe the state of the node by the current Event Number
420  self.currentEvent = None
421 
422  # Unpack the Queues : (events, histos, filerecords)
423  qPair, histq, fq = queues
424 
425  # Set up the Queue Mechanisms ( Event Communicators )
426  if self.nodeType == 'Reader' or self.nodeType == 'Worker' :
427  # Reader or Worker Node
428  qin, qout = qPair
429  self.evcom = EventCommunicator( self, qin, qout )
430  else :
431  # Writer : many queues in, no queue out
432  assert self.nodeType == 'Writer'
433  self.evcoms = []
434  qsin = qPair[0]
435  for q in qsin :
436  ec = EventCommunicator( self, q, None )
437  self.evcoms.append( ec )
438  # Histogram Queue
439  self.hq = histq
440  # FileRecords Queue
441  self.fq = fq
442 
443  # Universal Counters (available to all nodes)
444  # Use sensibly!!!
445  self.nIn = 0
446  self.nOut = 0
447 
448  # Status Flag (possibly remove later)
449  self.stat = SUCCESS
450 
451  # Set logger name
452  self.log.name = '%s-%i'%(self.nodeType, self.nodeID)
453 
454  # Heuristic variables
455  # time for init, run, final, firstEventTime, totalTime
456  self.iTime = 0.0
457  self.rTime = 0.0
458  self.fTime = 0.0
459  self.firstEvTime = 0.0
460  self.tTime = 0.0
461 
462  # define the separate process
463  self.proc = Process( target=self.Engine )
464 
465  self.num = 0
466 
467  ks = self.config.keys()
468  self.app = None
469  list = ["Brunel", "DaVinci", "Boole", "Gauss"]
470  for k in list:
471  if k in ks: self.app = k
472 
473  def Start( self ) :
474  # Fork and start the separate process
475  self.proc.start()
476 
477  def Engine( self ) :
478  # This will be the method carried out by the Node
479  # Different for all
480  pass
481 
482  def processConfiguration( self ) :
483  # Different for all ; customize Configuration for multicore
484  pass
485 
486  def SetupGaudiPython( self ) :
487  # This method will initialize the GaudiPython Tools
488  # such as the AppMgr and so on
489  self.a = AppMgr()
490  if SMAPS :
491  from AlgSmapShot import SmapShot
492  smapsLog = self.nodeType+'-'+str(self.nodeID)+'.smp'
493  ss = SmapShot( logname=smapsLog )
494  self.a.addAlgorithm( ss )
495  self.evt = self.a.evtsvc()
496  self.hvt = self.a.histsvc()
497  self.fsr = self.a.filerecordsvc()
498  self.inc = self.a.service('IncidentSvc','IIncidentSvc')
499  self.pers = self.a.service( 'EventPersistencySvc', 'IAddressCreator' )
500  self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
501  self.TS = TESSerializer( self.ts, self.evt,
502  self.nodeType, self.nodeID, self.log )
503  return SUCCESS
504 
505  def StartGaudiPython( self ) :
506  self.a.initialize()
507  self.a.start()
508  return SUCCESS
509 
510  def LoadTES( self, tbufferfile ) :
511  root = gbl.DataObject()
512  setOwnership(root, False)
513  self.evt.setRoot( '/Event', root )
514  self.ts.loadBuffer(tbufferfile)
515 
516  def getEventNumber( self ) :
517  if self.app != 'Gauss':
518  # Using getList or getHistoNames can result in the EventSelector
519  # re-initialising connection to RootDBase, which costs a lot of
520  # time... try to build a set of Header paths??
521 
522  # First Attempt : Unpacked Event Data
523  lst = [ '/Event/Gen/Header',
524  '/Event/Rec/Header' ]
525  for l in lst :
526  path = l
527  try :
528  n = self.evt[path].evtNumber()
529  return n
530  except :
531  # No evt number at this path
532  continue
533 
534  # second attepmt : try DAQ/RawEvent data
535  # The Evt Number is in bank type 16, bank 0, data pt 4
536  try :
537  n = self.evt['/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
538  return n
539  except :
540  pass
541 
542  # Default Action
543  if self.nIn > 0 or self.nOut > 0 :
544  pass
545  else :
546  self.log.warning('Could not determine Event Number')
547  return -1
548  else:
549  if self.nodeID == -1:
550  self.num = self.num + 1
551  self.log.info( 'Evt Number %d' % self.num )
552  return self.num
553 
554  def IdentifyWriters( self ) :
555  #
556  # Identify Writers in the Configuration
557  #
558  d = {}
559  keys = [ "events", "records", "tuples", "histos" ]
560  for k in keys :
561  d[k] = []
562 
563  # Identify Writers and Classify
564  wkeys = WRITERTYPES.keys()
565  for v in self.config.values() :
566  if v.__class__.__name__ in wkeys :
567  writerType = WRITERTYPES[ v.__class__.__name__ ]
568  d[writerType].append( MiniWriter(v, writerType, self.config) )
569  if self.nodeID == 0 :
570  self.log.info('Writer Found : %s'%(v.name()))
571 
572  # Now Check for the Histogram Service
573  if 'HistogramPersistencySvc' in self.config.keys() :
574  hfile =self.config['HistogramPersistencySvc'].getProp('OutputFile')
575  d[ "histos" ].append( hfile )
576  return d
577 
578  def dumpHistograms( self ) :
579  '''
580  Method used by the GaudiPython algorithm CollectHistos
581  to obtain a dictionary of form { path : object }
582  representing the Histogram Store
583  '''
584  nlist = self.hvt.getHistoNames( )
585  histDict = {}
586  objects = 0 ; histos = 0
587  if nlist :
588  for n in nlist :
589  o = self.hvt[ n ]
590  if type(o) in aidatypes :
591  o = aida2root(o)
592  histos += 1
593  else :
594  objects += 1
595  histDict[ n ] = o
596  else :
597  print 'WARNING : no histograms to recover?'
598  return histDict
599 
600  def Initialize( self ) :
601  start = time.time()
602  self.processConfiguration( )
603  self.SetupGaudiPython( )
604  self.initEvent.set()
605  self.StartGaudiPython( )
606 
607  if self.app == 'Gauss':
608 
609  tool = self.a.tool( "ToolSvc.EvtCounter" )
610  self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
611  self.log.info("interface %s" %self.cntr)
612  else:
613  self.cntr = None
614 
615  self.iTime = time.time() - start
616 
617  def Finalize( self ) :
618  start = time.time()
619  self.a.stop()
620  self.a.finalize()
621  self.log.info( '%s-%i Finalized'%(self.nodeType, self.nodeID) )
622  self.finalEvent.set()
623  self.fTime = time.time() - start
624 
625  def Report( self ) :
626  self.log.name = "%s-%i Audit"%(self.nodeType, self.nodeID)
627  allTime = "Alive Time : %5.2f"%(self.tTime)
628  initTime = "Init Time : %5.2f"%(self.iTime)
629  frstTime = "1st Event Time : %5.2f"%(self.firstEvTime)
630  runTime = "Run Time : %5.2f"%(self.rTime)
631  finTime = "Finalise Time : %5.2f"%(self.fTime)
632  tup = ( allTime, initTime, frstTime, runTime, finTime )
633  for t in tup :
634  self.log.info( t )
635  self.log.name = "%s-%i"%(self.nodeType, self.nodeID)
636  # and report from the TESSerializer
637  self.TS.Report()
638 
639 # =============================================================================
640 
642  def __init__( self, queues, events, params ) :
643  GMPComponent.__init__(self, 'Reader', -1, queues, events, params )
644 
645  def processConfiguration( self ) :
646  # Reader :
647  # No algorithms
648  # No output
649  # No histos
650  self.config[ 'ApplicationMgr' ].TopAlg = []
651  self.config[ 'ApplicationMgr' ].OutStream = []
652  if "HistogramPersistencySvc" in self.config.keys() :
653  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
654  self.config['MessageSvc'].Format = '[Reader]% F%18W%S%7W%R%T %0W%M'
655  self.evtMax = self.config[ 'ApplicationMgr' ].EvtMax
656 
657  def DumpEvent( self ) :
658  tb = TBufferFile( TBuffer.kWrite )
659  # print '----Reader dumping Buffer!!!'
660  self.ts.dumpBuffer( tb )
661  # print '\tBuffer Dumped, size : %i'%( tb.Length() )
662  return tb
663 
664  def DoFirstEvent( self ) :
665  # Do First Event ------------------------------------------------------
666  # Check Termination Criteria
667  startFirst = time.time()
668  self.log.info('Reader : First Event')
669  if self.nOut == self.evtMax :
670  self.log.info('evtMax( %i ) reached'%(self.evtMax))
671  self.lastEvent.set()
672  return SUCCESS
673  else :
674  # Continue to read, dump and send event
675  self.a.run(1)
676  if not bool(self.evt['/Event']) :
677  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
678  self.lastEvent.set()
679  return SUCCESS
680  else :
681  # Popluate TESSerializer list and send Event
682  if self.app == "Gauss":
683  lst = self.evt.getHistoNames()
684  else:
685  try :
686  lst = self.evt.getList()
687  if self.app == "DaVinci":
688  daqnode = self.evt.retrieveObject( '/Event/DAQ' ).registry()
689  setOwnership( daqnode, False )
690  self.evt.getList( daqnode, lst, daqnode.address().par() )
691  except :
692  self.log.critical('Reader could not acquire TES List!')
693  self.lastEvent.set()
694  return FAILURE
695  self.log.info('Reader : TES List : %i items'%(len(lst)))
696  for l in lst :
697  self.ts.addItem(l)
699  tb = self.TS.Dump( )
700  self.log.info('First Event Sent')
701  self.evcom.send( (self.currentEvent, tb) )
702  self.nOut += 1
703  self.eventLoopSyncer.set()
704  self.evt.clearStore( )
705  self.firstEvTime = time.time()-startFirst
706  return SUCCESS
707 
708  def Engine( self ) :
709 
710  startEngine = time.time()
711  self.log.name = 'Reader'
712  self.log.info('Reader Process starting')
713 
714  self.Initialize()
715 
716  # add the Histogram Collection Algorithm
717  self.a.addAlgorithm( CollectHistograms(self) )
718 
719  self.log.info('Reader Beginning Distribution')
720  sc = self.DoFirstEvent( )
721  if sc.isSuccess() :
722  self.log.info('Reader First Event OK')
723  else :
724  self.log.critical('Reader Failed on First Event')
725  self.stat = FAILURE
726 
727  # Do All Others -------------------------------------------------------
728  while True :
729  # Check Termination Criteria
730  if self.nOut == self.evtMax :
731  self.log.info('evtMax( %i ) reached'%(self.evtMax))
732  break
733  # Check Health
734  if not self.stat.isSuccess() :
735  self.log.critical( 'Reader is Damaged!' )
736  break
737  # Continue to read, dump and send event
738  t = time.time()
739  self.a.run(1)
740  self.rTime += (time.time()-t)
741  if not bool(self.evt['/Event']) :
742  self.log.warning('No More Events! (So Far : %i)'%(self.nOut))
743  break
744  self.currentEvent = self.getEventNumber( )
745  tb = self.TS.Dump( )
746  self.evcom.send( (self.currentEvent, tb) )
747  # clean up
748  self.nOut += 1
749  self.eventLoopSyncer.set()
750  self.evt.clearStore( )
751  self.log.info('Setting <Last> Event')
752  self.lastEvent.set()
753 
754  # Finalize
755  self.log.info( 'Reader : Event Distribution complete.' )
756  self.evcom.finalize()
757  self.Finalize()
758  self.tTime = time.time() - startEngine
759  self.Report()
760 
761 # =============================================================================
762 
764  def __init__( self, workerID, queues, events, params ) :
765  GMPComponent.__init__(self,'Worker', workerID, queues, events, params)
766  # Identify the writer streams
768  # Identify the accept/veto checks for each event
769  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
770  self.log.debug("Worker-%i Created OK"%(self.nodeID))
771  self.eventOutput = True
772 
773  def processConfiguration( self ) :
774  # Worker :
775  # No input
776  # No output
777  # No Histos
778  self.config[ 'EventSelector' ].Input = []
779  self.config[ 'ApplicationMgr' ].OutStream = []
780  if "HistogramPersistencySvc" in self.config.keys() :
781  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
782  formatHead = '[Worker-%i] '%(self.nodeID)
783  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
784 
785  for key, lst in self.writerDict.iteritems() :
786  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
787 
788  for m in self.writerDict[ "tuples" ] :
789  # rename Tuple output file with an appendix
790  # based on worker id, for merging later
791  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
792  self.config[ m.key ].Output = newName
793 
794  # Suppress INFO Output for all but Worker-0
795  #if self.nodeID == 0 :
796  # pass
797  #else :
798  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
799 
800  try:
801  if "ToolSvc.EvtCounter" not in self.config:
802  from Configurables import EvtCounter
803  counter = EvtCounter()
804  else:
805  counter = self.config["ToolSvc.EvtCounter"]
806  counter.UseIncident = False
807  except:
808  # ignore errors when trying to change the configuration of the EvtCounter
809  self.log.warning('Cannot configure EvtCounter')
810 
811  def Engine( self ) :
812  startEngine = time.time()
813  self.log.name = "Worker-%i"%(self.nodeID)
814  self.log.info("Worker %i starting Engine"%(self.nodeID))
815  self.Initialize()
817 
818  # populate the TESSerializer itemlist
819  self.log.info('EVT WRITERS ON WORKER : %i'\
820  %( len(self.writerDict['events'])))
821 
822  nEventWriters = len( self.writerDict[ "events" ] )
823  if nEventWriters :
824  itemList = set()
825  optItemList = set()
826  for m in self.writerDict[ "events" ] :
827  for item in m.ItemList :
828  hsh = item.find( '#' )
829  if hsh != -1:
830  item = item[:hsh]
831  itemList.add( item )
832  for item in m.OptItemList :
833  hsh = item.find( '#' )
834  if hsh != -1:
835  item = item[:hsh]
836  optItemList.add( item )
837  # If an item is mandatory and optional, keep it only in the optional list
838  itemList -= optItemList
839  for item in sorted( itemList ):
840  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
841  self.ts.addItem( item )
842  for item in sorted( optItemList ):
843  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
844  self.ts.addOptItem( item )
845  else :
846  self.log.info( 'There is no Event Output for this app' )
847  self.eventOutput = False
848 
849  # add the Histogram Collection Algorithm
850  self.a.addAlgorithm( CollectHistograms(self) )
851 
852  # Begin processing
853  self.log.name = "Worker-%i"%(self.nodeID)
854  Go = True
855  while Go :
856  packet = self.evcom.receive( )
857  if packet : pass
858  else : continue
859  if packet == 'FINISHED' : break
860  evtNumber, tbin = packet # unpack
861  if self.cntr != None:
862 
863  self.cntr.setEventCounter( evtNumber )
864 
865  self.nIn += 1
866  self.TS.Load( tbin )
867  # print 'Worker-%i : Event %i'%(self.nodeID, evtNumber)
868  t = time.time()
869  sc = self.a.executeEvent()
870  if self.nIn == 1 :
871  self.firstEvTime = time.time()-t
872  else :
873  self.rTime += (time.time()-t)
874  if sc.isSuccess() :
875  pass
876  else :
877  self.log.warning('Did not Execute Event')
878  self.evt.clearStore()
879  continue
880  if self.isEventPassed() :
881  pass
882  else :
883  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
884  self.evt.clearStore()
885  continue
886  if self.eventOutput :
887  # It may be the case of generating Event Tags; hence
888  # no event output
890  tb = self.TS.Dump( )
891  self.evcom.send( (self.currentEvent, tb) )
892  self.nOut += 1
893  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
894  self.eventLoopSyncer.set()
895  self.evt.clearStore( )
896  self.log.info('Setting <Last> Event')
897  self.lastEvent.set()
898 
899  self.evcom.finalize()
900  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
901  # Now send the FileRecords and stop/finalize the appMgr
902  self.filerecordsAgent.SendFileRecords()
903  self.Finalize()
904  self.tTime = time.time()-startEngine
905  self.Report()
906 
907  def getCheckAlgs( self ) :
908  '''
909  For some output writers, a check is performed to see if the event has
910  executed certain algorithms.
911  These reside in the AcceptAlgs property for those writers
912  '''
913  acc = []
914  req = []
915  vet = []
916  for m in self.writerDict[ "events" ] :
917  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
918  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
919  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
920  return (acc, req, vet)
921 
922 
923  def checkExecutedPassed( self, algName ) :
924  if self.a.algorithm( algName )._ialg.isExecuted()\
925  and self.a.algorithm( algName )._ialg.filterPassed() :
926  return True
927  else :
928  return False
929 
930  def isEventPassed( self ) :
931  '''
932  Check the algorithm status for an event.
933  Depending on output writer settings, the event
934  may be declined based on various criteria.
935  This is a transcript of the check that occurs in GaudiSvc::OutputStream
936  '''
937  passed = False
938 
939  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
940  if self.acceptAlgs :
941  for name in self.acceptAlgs :
942  if self.checkExecutedPassed( name ) :
943  passed = True
944  break
945  else :
946  passed = True
947 
948  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
949  for name in self.requireAlgs :
950  if self.checkExecutedPassed( name ) :
951  pass
952  else :
953  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
954  passed = False
955 
956  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
957  for name in self.vetoAlgs :
958  if self.checkExecutedPassed( name ) :
959  pass
960  else :
961  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
962  passed = False
963  return passed
964 
965 # =============================================================================
966 
968  def __init__( self, queues, events, params ) :
969  GMPComponent.__init__(self,'Writer', -2, queues, events, params)
970  # Identify the writer streams
972  # This keeps track of workers as they finish
973  self.status = [False]*self.nWorkers
974  self.log.name = "Writer--2"
975 
976  def processConfiguration( self ) :
977  # Writer :
978  # No input
979  # No Algs
980  self.config[ 'ApplicationMgr' ].TopAlg = []
981  self.config[ 'EventSelector' ].Input = []
982 
983  self.config['MessageSvc'].Format = '[Writer] % F%18W%S%7W%R%T %0W%M'
984 
985  # Now process the output writers
986  for key, lst in self.writerDict.iteritems() :
987  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
988 
989  # Modify the name of the output file to reflect that it came
990  # from a parallel processing
991  #
992  # Event Writers
993  for m in self.writerDict[ "events" ] :
994  self.log.debug( 'Processing Event Writer : %s'%(m) )
995  newName = m.getNewName( '.', '.p%i.'%self.nWorkers )
996  self.config[ m.key ].Output = newName
997 
998  # Now, if there are no event writers, the FileRecords file
999  # will fail to open, as it only opens an UPDATE version
1000  # of the existing Event Output File
1001  # So, if there are no event writers, edit the string of the
1002  # FileRecord Writer
1003 
1004  # FileRecords Writers
1005  for m in self.writerDict[ "records" ] :
1006  self.log.debug( 'Processing FileRecords Writer: %s'%(m) )
1007  newName = m.getNewName( '.', '.p%i.'%self.nWorkers,
1008  extra=" OPT='RECREATE'" )
1009  self.config[ m.key ].Output = newName
1010 
1011  # same for histos
1012  hs = "HistogramPersistencySvc"
1013  n = None
1014  if hs in self.config.keys() :
1015  n = self.config[ hs ].OutputFile
1016  if n :
1017  newName=self.config[hs].OutputFile.replace('.',\
1018  '.p%i.'%(self.nWorkers))
1019  self.config[ hs ].OutputFile = newName
1020 
1021  def Engine( self ) :
1022  startEngine = time.time()
1023  self.Initialize()
1024  self.histoAgent = HistoAgent( self )
1026 
1027  # Begin processing
1028  Go = True
1029  current = -1
1030  stopCriteria = self.nWorkers
1031  while Go :
1032  current = (current+1)%self.nWorkers
1033  packet = self.evcoms[current].receive( timeout=0.01 )
1034  if packet == None :
1035  continue
1036  if packet == 'FINISHED' :
1037  self.log.info('Writer got FINISHED flag : Worker %i'%(current))
1038  self.status[current] = True
1039  if all(self.status) :
1040  self.log.info('FINISHED recd from all workers, break loop')
1041  break
1042  continue
1043  # otherwise, continue as normal
1044  self.nIn += 1 # update central count (maybe needed by FSR store)
1045  evtNumber, tbin = packet # unpack
1046  self.TS.Load( tbin )
1047  t = time.time()
1048  self.a.executeEvent()
1049  self.rTime += ( time.time()-t )
1051  self.evt.clearStore( )
1052  self.eventLoopSyncer.set()
1053  self.log.name = "Writer--2"
1054  self.log.info('Setting <Last> Event')
1055  self.lastEvent.set()
1056 
1057  # finalisation steps
1058  [ e.finalize() for e in self.evcoms ]
1059  # Now do Histograms
1060  sc = self.histoAgent.Receive()
1061  sc = self.histoAgent.RebuildHistoStore()
1062  if sc.isSuccess() : self.log.info( 'Histo Store rebuilt ok' )
1063  else : self.log.warning( 'Histo Store Error in Rebuild' )
1064 
1065  # Now do FileRecords
1066  sc = self.filerecordsAgent.Receive()
1067  self.filerecordsAgent.Rebuild()
1068  self.Finalize()
1069  #self.rTime = time.time()-startEngine
1070  self.Report()
1071 
1072 # =============================================================================
1073 
1074 
1075 
1076 # =============================================================================
1077 
1078 class Coord( object ) :
1079  def __init__( self, nWorkers, config, log ) :
1080 
1081  self.log = log
1082  self.config = config
1083  # set up Logging
1084  self.log.name = 'GaudiPython-Parallel-Logger'
1085  self.log.info( 'GaudiPython Parallel Process Co-ordinator beginning' )
1086 
1087  if nWorkers == -1 :
1088  # The user has requested all available cpus in the machine
1089  self.nWorkers = cpu_count()
1090  else :
1091  self.nWorkers = nWorkers
1092 
1093 
1094  self.qs = self.SetupQueues( ) # a dictionary of queues (for Events)
1095  self.hq = JoinableQueue( ) # for Histogram data
1096  self.fq = JoinableQueue( ) # for FileRecords data
1097 
1098  # Make a Syncer for Initalise, Run, and Finalise
1099  self.sInit = Syncer( self.nWorkers, self.log,
1100  limit=WAIT_INITIALISE,
1101  step=STEP_INITIALISE )
1102  self.sRun = Syncer( self.nWorkers, self.log,
1103  manyEvents=True,
1104  limit=WAIT_SINGLE_EVENT,
1105  step=STEP_EVENT,
1106  firstEvent=WAIT_FIRST_EVENT )
1107  self.sFin = Syncer( self.nWorkers, self.log,
1108  limit=WAIT_FINALISE,
1109  step=STEP_FINALISE )
1110  # and one final one for Histogram Transfer
1111  self.histSyncEvent = Event()
1112 
1113  # params are common to al subprocesses
1114  params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1115 
1116  # Declare SubProcesses!
1117  self.reader= Reader(self.getQueues(-1), self.getSyncEvents(-1), params)
1118  self.workers = []
1119  for i in xrange( self.nWorkers ) :
1120  wk = Worker( i, self.getQueues(i), self.getSyncEvents(i), params )
1121  self.workers.append( wk )
1122  self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params)
1123 
1124  self.system = []
1125  self.system.append(self.writer)
1126  [ self.system.append(w) for w in self.workers ]
1127  self.system.append(self.reader)
1128 
1129  def getSyncEvents( self, nodeID ) :
1130  init = self.sInit.d[nodeID].event
1131  run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1132  fin = self.sFin.d[nodeID].event
1133  return ( init, run, fin )
1134 
1135  def getQueues( self, nodeID ) :
1136  eventQ = self.qs[ nodeID ]
1137  histQ = self.hq
1138  fsrQ = self.fq
1139  return ( eventQ, histQ, fsrQ )
1140 
1141  def Go( self ) :
1142 
1143  # Initialise
1144  self.log.name = 'GaudiPython-Parallel-Logger'
1145  self.log.info( 'INITIALISING SYSTEM' )
1146  for p in self.system :
1147  p.Start()
1148  sc = self.sInit.syncAll(step="Initialise")
1149  if sc == SUCCESS: pass
1150  else : self.Terminate() ; return FAILURE
1151 
1152  # Run
1153  self.log.name = 'GaudiPython-Parallel-Logger'
1154  self.log.info( 'RUNNING SYSTEM' )
1155  sc = self.sRun.syncAll(step="Run")
1156  if sc == SUCCESS: pass
1157  else : self.Terminate() ; return FAILURE
1158 
1159  # Finalise
1160  self.log.name = 'GaudiPython-Parallel-Logger'
1161  self.log.info( 'FINALISING SYSTEM' )
1162  sc = self.sFin.syncAll(step="Finalise")
1163  if sc == SUCCESS: pass
1164  else : self.Terminate() ; return FAILURE
1165 
1166  # if we've got this far, finally report SUCCESS
1167  self.log.info( "Cleanly join all Processes" )
1168  self.Stop()
1169  self.log.info( "Report Total Success to Main.py" )
1170  return SUCCESS
1171 
1172  def Terminate( self ) :
1173  # Brutally kill sub-processes
1174  children = multiprocessing.active_children()
1175  for i in children:
1176  i.terminate()
1177 
1178  #self.writer.proc.terminate()
1179  #[ w.proc.terminate() for w in self.workers]
1180  #self.reader.proc.terminate()
1181 
1182  def Stop( self ) :
1183  # procs should be joined in reverse order to launch
1184  self.system.reverse()
1185  for s in self.system :
1186  s.proc.join()
1187  return SUCCESS
1188 
1189  def SetupQueues( self ) :
1190  # This method will set up the network of Queues needed
1191  # N Queues = nWorkers + 1
1192  # Each Worker has a Queue in, and a Queue out
1193  # Reader has Queue out only
1194  # Writer has nWorkers Queues in
1195 
1196  # one queue from Reader-Workers
1197  rwk = JoinableQueue()
1198  # one queue from each worker to writer
1199  workersWriter = [ JoinableQueue() for i in xrange(self.nWorkers) ]
1200  d = {}
1201  d[-1] = (None, rwk) # Reader
1202  d[-2] = (workersWriter, None) # Writer
1203  for i in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
1204  return d
1205 
1206 # ============================= EOF ===========================================

Generated at Wed Nov 28 2012 12:17:16 for Gaudi Framework, version v23r5 by Doxygen version 1.8.2 written by Dimitri van Heesch, © 1997-2004