GaudiMP.pTools.FileRecordsAgent Class Reference

Public Member Functions

def __init__ (self, gmpComponent)
 
def localCmp (self, tupA, tupB)
 
def SendFileRecords (self)
 
def Receive (self)
 
def Rebuild (self)
 
def MergeFSRobject (self, sourceNode, path, ob)
 
def ProcessTimeSpanFSR (self, path, ob)
 
def ProcessEventCountFSR (self, path, ob)
 
def MergeLumiFSR (self, path, keyedC)
 
def __init__ (self, gmpComponent)
 
def localCmp (self, tupA, tupB)
 
def SendFileRecords (self)
 
def Receive (self)
 
def Rebuild (self)
 
def MergeFSRobject (self, sourceNode, path, ob)
 
def ProcessTimeSpanFSR (self, path, ob)
 
def ProcessEventCountFSR (self, path, ob)
 
def MergeLumiFSR (self, path, keyedC)
 

Public Attributes

 fsr
 
 q
 
 log
 
 objectsIn
 
 objectsOut
 

Private Attributes

 _gmpc
 

Detailed Description

Definition at line 206 of file pTools.py.

Constructor & Destructor Documentation

def GaudiMP.pTools.FileRecordsAgent.__init__ (   self,
  gmpComponent 
)

Definition at line 207 of file pTools.py.

207  def __init__( self, gmpComponent ) :
208  self._gmpc = gmpComponent
209  self.fsr = self._gmpc.fsr
210  self.q = self._gmpc.fq
211  self.log = self._gmpc.log
212  self.objectsIn = [] # used for collecting FSR store objects
213  self.objectsOut = []
214 
def __init__(self, gmpComponent)
Definition: pTools.py:207
def GaudiMP.pTools.FileRecordsAgent.__init__ (   self,
  gmpComponent 
)

Definition at line 207 of file pTools.py.

207  def __init__( self, gmpComponent ) :
208  self._gmpc = gmpComponent
209  self.fsr = self._gmpc.fsr
210  self.q = self._gmpc.fq
211  self.log = self._gmpc.log
212  self.objectsIn = [] # used for collecting FSR store objects
213  self.objectsOut = []
214 
def __init__(self, gmpComponent)
Definition: pTools.py:207

Member Function Documentation

def GaudiMP.pTools.FileRecordsAgent.localCmp (   self,
  tupA,
  tupB 
)

Definition at line 215 of file pTools.py.

215  def localCmp( self, tupA, tupB ) :
216  # sort tuples by a particular element
217  # for the sort() method
218  ind = 0
219  valA = tupA[ind]
220  valB = tupB[ind]
221  if valA<valB : return -1
222  elif valA>valB : return 1
223  else : return 0
224 
225 
def localCmp(self, tupA, tupB)
Definition: pTools.py:215
def GaudiMP.pTools.FileRecordsAgent.localCmp (   self,
  tupA,
  tupB 
)

Definition at line 215 of file pTools.py.

215  def localCmp( self, tupA, tupB ) :
216  # sort tuples by a particular element
217  # for the sort() method
218  ind = 0
219  valA = tupA[ind]
220  valB = tupB[ind]
221  if valA<valB : return -1
222  elif valA>valB : return 1
223  else : return 0
224 
225 
def localCmp(self, tupA, tupB)
Definition: pTools.py:215
def GaudiMP.pTools.FileRecordsAgent.MergeFSRobject (   self,
  sourceNode,
  path,
  ob 
)

Definition at line 349 of file pTools.py.

349  def MergeFSRobject( self, sourceNode, path, ob ) :
350  # Merge Non-Empty Keyed Container from Worker>0
351  if path == '/FileRecords/TimeSpanFSR' :
352  # TimeSpanFSR is a straightforward case
353  self.ProcessTimeSpanFSR( path, ob )
354  elif path == '/FileRecords/EventCountFSR' :
355  # Event Counter is also easy
356  self.ProcessEventCountFSR( path, ob )
357  # now other cases may not be so easy...
358  elif "KeyedContainer" in ob.__class__.__name__ :
359  # Keyed Container of LumiFSRs : extract and re-register
360  # self.ProcessLumiFSR( path, ob )
361  if "LumiFSR" in ob.__class__.__name__ :
362  self.MergeLumiFSR( path, ob )
363  else:
364  self.log.info("Skipping Merge of Keyed Container %s for %s"\
365  %(ob.__class__.__name__,path))
366 
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:389
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:393
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:367
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:349
def GaudiMP.pTools.FileRecordsAgent.MergeFSRobject (   self,
  sourceNode,
  path,
  ob 
)

Definition at line 349 of file pTools.py.

349  def MergeFSRobject( self, sourceNode, path, ob ) :
350  # Merge Non-Empty Keyed Container from Worker>0
351  if path == '/FileRecords/TimeSpanFSR' :
352  # TimeSpanFSR is a straightforward case
353  self.ProcessTimeSpanFSR( path, ob )
354  elif path == '/FileRecords/EventCountFSR' :
355  # Event Counter is also easy
356  self.ProcessEventCountFSR( path, ob )
357  # now other cases may not be so easy...
358  elif "KeyedContainer" in ob.__class__.__name__ :
359  # Keyed Container of LumiFSRs : extract and re-register
360  # self.ProcessLumiFSR( path, ob )
361  if "LumiFSR" in ob.__class__.__name__ :
362  self.MergeLumiFSR( path, ob )
363  else:
364  self.log.info("Skipping Merge of Keyed Container %s for %s"\
365  %(ob.__class__.__name__,path))
366 
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:389
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:393
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:367
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:349
def GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR (   self,
  path,
  keyedC 
)

Definition at line 393 of file pTools.py.

393  def MergeLumiFSR( self, path, keyedC ) :
394  from ROOT import string
395  # Fetch the first lumi
396  keyedContainer = self.fsr.retrieveObject(path)
397  # The LumiFSR KeyedContainers only have one object
398  assert keyedContainer.numberOfObjects() == 1
399  l = keyedContainer.containedObject(0)
400  baseLumi = LumiFSR( l )
401  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
402  nCont = keyedC.numberOfObjects()
403  for i in xrange(nCont) :
404  obj = keyedC.containedObject(i)
405  nextLumi = LumiFSR( obj )
406  baseLumi.merge( nextLumi )
407  # Now Rebuild and ReRegister
408  newLumi = gbl.LHCb.LumiFSR()
409  for r in baseLumi.runs :
410  newLumi.addRunNumber( r )
411  for f in baseLumi.files :
412  newLumi.addFileID( string(f) )
413  for k in baseLumi.keys :
414  increment, integral = baseLumi.info[k]
415  newLumi.addInfo(k, increment, integral)
416  # clear existing Keyed Container
417  self.fsr[path].clear()
418  # Add newly merged lumiFSR
419  self.fsr[path].add(newLumi)
420  return SUCCESS
421 
422 # =============================================================================
423 
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:393
def GaudiMP.pTools.FileRecordsAgent.MergeLumiFSR (   self,
  path,
  keyedC 
)

Definition at line 393 of file pTools.py.

393  def MergeLumiFSR( self, path, keyedC ) :
394  from ROOT import string
395  # Fetch the first lumi
396  keyedContainer = self.fsr.retrieveObject(path)
397  # The LumiFSR KeyedContainers only have one object
398  assert keyedContainer.numberOfObjects() == 1
399  l = keyedContainer.containedObject(0)
400  baseLumi = LumiFSR( l )
401  # Now deal with the argument Non-empty Keyed Container of LumiFSRs
402  nCont = keyedC.numberOfObjects()
403  for i in xrange(nCont) :
404  obj = keyedC.containedObject(i)
405  nextLumi = LumiFSR( obj )
406  baseLumi.merge( nextLumi )
407  # Now Rebuild and ReRegister
408  newLumi = gbl.LHCb.LumiFSR()
409  for r in baseLumi.runs :
410  newLumi.addRunNumber( r )
411  for f in baseLumi.files :
412  newLumi.addFileID( string(f) )
413  for k in baseLumi.keys :
414  increment, integral = baseLumi.info[k]
415  newLumi.addInfo(k, increment, integral)
416  # clear existing Keyed Container
417  self.fsr[path].clear()
418  # Add newly merged lumiFSR
419  self.fsr[path].add(newLumi)
420  return SUCCESS
421 
422 # =============================================================================
423 
def MergeLumiFSR(self, path, keyedC)
Definition: pTools.py:393
def GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR (   self,
  path,
  ob 
)

Definition at line 389 of file pTools.py.

389  def ProcessEventCountFSR( self, path, ob ) :
390  self.log.debug('Event Count Input Addition')
391  self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
392 
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:389
def GaudiMP.pTools.FileRecordsAgent.ProcessEventCountFSR (   self,
  path,
  ob 
)

Definition at line 389 of file pTools.py.

389  def ProcessEventCountFSR( self, path, ob ) :
390  self.log.debug('Event Count Input Addition')
391  self.fsr[path].setInput( self.fsr[path].input()+ob.input() )
392 
def ProcessEventCountFSR(self, path, ob)
Definition: pTools.py:389
def GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR (   self,
  path,
  ob 
)

Definition at line 367 of file pTools.py.

367  def ProcessTimeSpanFSR( self, path, ob ) :
368  ob2 = self.fsr.retrieveObject( path )
369  if ob.containedObjects().size() :
370  sz = ob.containedObjects().size()
371  cob = ob2.containedObjects()[0]
372  min = cob.earliest()
373  max = cob.latest()
374  for j in xrange( sz ) :
375  cob = ob.containedObjects()[j]
376  self.log.debug( 'Adding TimeSpanFSR' )
377  if cob.earliest() < min:
378  min = cob.earliest()
379  if cob.latest() > max:
380  max = cob.latest()
381  # this is annoying: it has to be rebuilt, without a key & added
382  continue
383  tsfsr = gbl.LHCb.TimeSpanFSR()
384  tsfsr.setEarliest( min )
385  tsfsr.setLatest( max )
386  self.fsr[path].clear()
387  self.fsr[path].add( tsfsr )
388 
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:367
def GaudiMP.pTools.FileRecordsAgent.ProcessTimeSpanFSR (   self,
  path,
  ob 
)

Definition at line 367 of file pTools.py.

367  def ProcessTimeSpanFSR( self, path, ob ) :
368  ob2 = self.fsr.retrieveObject( path )
369  if ob.containedObjects().size() :
370  sz = ob.containedObjects().size()
371  cob = ob2.containedObjects()[0]
372  min = cob.earliest()
373  max = cob.latest()
374  for j in xrange( sz ) :
375  cob = ob.containedObjects()[j]
376  self.log.debug( 'Adding TimeSpanFSR' )
377  if cob.earliest() < min:
378  min = cob.earliest()
379  if cob.latest() > max:
380  max = cob.latest()
381  # this is annoying: it has to be rebuilt, without a key & added
382  continue
383  tsfsr = gbl.LHCb.TimeSpanFSR()
384  tsfsr.setEarliest( min )
385  tsfsr.setLatest( max )
386  self.fsr[path].clear()
387  self.fsr[path].add( tsfsr )
388 
def ProcessTimeSpanFSR(self, path, ob)
Definition: pTools.py:367
def GaudiMP.pTools.FileRecordsAgent.Rebuild (   self)

Definition at line 305 of file pTools.py.

305  def Rebuild( self ) :
306  # objects is a list of (path, serializedObject) tuples
307  for sourceNode, path, serialob in self.objectsIn :
308  self.log.debug('Working with %s'%(path))
309  ob = pickle.loads(serialob)
310  if hasattr( ob, 'update' ) :
311  ob.update()
312  if hasattr( ob, 'numberOfObjects' ) :
313  nCont = ob.numberOfObjects()
314  self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
315  if sourceNode == 0 :
316  self.log.debug('Registering Object to : %s'%(path))
317  self.fsr.registerObject( path, ob )
318  else :
319  self.log.debug('Merging Object to : %s'%(path))
320  self.MergeFSRobject( sourceNode, path, ob )
321  # As RecordStream has been split into Worker and Writer parts, the
322  # count for output is wrong... fix that here, as every event received
323  # by the writer is written (validation testing occurs on the worker)
324 
325  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
326  if bool( self.fsr._idp ) : # There might not be an FSR stream (Gauss)
327  ecount = '/FileRecords/EventCountFSR'
328  if self.fsr[ecount] :
329  self.fsr[ecount].setOutput( self._gmpc.nIn )
330  self.log.info( 'Event Counter Output set : %s : %i'\
331  %(ecount, self.fsr[ecount].output()) )
332  # Do some reporting
333  self.log.debug('FSR store reconstructed!')
334  lst = self.fsr.getHistoNames()
335  if lst :
336  for l in lst :
337  ob = self.fsr.retrieveObject(l)
338  if hasattr( ob, 'configureDirectAccess' ) :
339  ob.configureDirectAccess()
340  if hasattr( ob, 'containedObjects' ) :
341  # if ob.numberOfObjects() :
342  self.log.debug('\t%s (cont. objects : %i)'\
343  %(l, ob.numberOfObjects()))
344  else :
345  self.log.debug('\t%s'%(l))
346  self.log.info('FSR Store fully rebuilt.')
347  return SUCCESS
348 
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:349
string type
Definition: gaudirun.py:151
def GaudiMP.pTools.FileRecordsAgent.Rebuild (   self)

Definition at line 305 of file pTools.py.

305  def Rebuild( self ) :
306  # objects is a list of (path, serializedObject) tuples
307  for sourceNode, path, serialob in self.objectsIn :
308  self.log.debug('Working with %s'%(path))
309  ob = pickle.loads(serialob)
310  if hasattr( ob, 'update' ) :
311  ob.update()
312  if hasattr( ob, 'numberOfObjects' ) :
313  nCont = ob.numberOfObjects()
314  self.log.debug( '\t %s has containedObjects : %i'%(type(ob).__name__, nCont) )
315  if sourceNode == 0 :
316  self.log.debug('Registering Object to : %s'%(path))
317  self.fsr.registerObject( path, ob )
318  else :
319  self.log.debug('Merging Object to : %s'%(path))
320  self.MergeFSRobject( sourceNode, path, ob )
321  # As RecordStream has been split into Worker and Writer parts, the
322  # count for output is wrong... fix that here, as every event received
323  # by the writer is written (validation testing occurs on the worker)
324 
325  self.log.info('FSR Store Rebuilt. Correcting EventCountFSR')
326  if bool( self.fsr._idp ) : # There might not be an FSR stream (Gauss)
327  ecount = '/FileRecords/EventCountFSR'
328  if self.fsr[ecount] :
329  self.fsr[ecount].setOutput( self._gmpc.nIn )
330  self.log.info( 'Event Counter Output set : %s : %i'\
331  %(ecount, self.fsr[ecount].output()) )
332  # Do some reporting
333  self.log.debug('FSR store reconstructed!')
334  lst = self.fsr.getHistoNames()
335  if lst :
336  for l in lst :
337  ob = self.fsr.retrieveObject(l)
338  if hasattr( ob, 'configureDirectAccess' ) :
339  ob.configureDirectAccess()
340  if hasattr( ob, 'containedObjects' ) :
341  # if ob.numberOfObjects() :
342  self.log.debug('\t%s (cont. objects : %i)'\
343  %(l, ob.numberOfObjects()))
344  else :
345  self.log.debug('\t%s'%(l))
346  self.log.info('FSR Store fully rebuilt.')
347  return SUCCESS
348 
def MergeFSRobject(self, sourceNode, path, ob)
Definition: pTools.py:349
string type
Definition: gaudirun.py:151
def GaudiMP.pTools.FileRecordsAgent.Receive (   self)

Definition at line 285 of file pTools.py.

285  def Receive( self ) :
286  # Receive contents of all Workers FileRecords Transient Stores
287  self.log.info('Receiving FSR store data...')
288  nc = self._gmpc.nWorkers
289  while nc > 0 :
290  objects = self.q.get( )
291  if objects == 'END_FSR' :
292  nc -= 1
293  continue
294  if nc==0 :
295  break
296  # but if it's regular objects...
297  for o in objects :
298  self.objectsIn.append(o)
299  # Now sort it by which worker it came from
300  # an object is : (nodeID, path, pickledObject)
301  self.objectsIn.sort(cmp=self.localCmp)
302  self.log.info('All FSR data received')
303  return SUCCESS
304 
def localCmp(self, tupA, tupB)
Definition: pTools.py:215
def GaudiMP.pTools.FileRecordsAgent.Receive (   self)

Definition at line 285 of file pTools.py.

285  def Receive( self ) :
286  # Receive contents of all Workers FileRecords Transient Stores
287  self.log.info('Receiving FSR store data...')
288  nc = self._gmpc.nWorkers
289  while nc > 0 :
290  objects = self.q.get( )
291  if objects == 'END_FSR' :
292  nc -= 1
293  continue
294  if nc==0 :
295  break
296  # but if it's regular objects...
297  for o in objects :
298  self.objectsIn.append(o)
299  # Now sort it by which worker it came from
300  # an object is : (nodeID, path, pickledObject)
301  self.objectsIn.sort(cmp=self.localCmp)
302  self.log.info('All FSR data received')
303  return SUCCESS
304 
def localCmp(self, tupA, tupB)
Definition: pTools.py:215
def GaudiMP.pTools.FileRecordsAgent.SendFileRecords (   self)

Definition at line 226 of file pTools.py.

226  def SendFileRecords( self ) :
227  # send the FileRecords data as part of finalisation
228 
229  # Take Care of FileRecords!
230  # There are two main things to consider here
231  # 1) The DataObjects in the FileRecords Transient Store
232  # 2) The fact that they are Keyed Containers, containing other objects
233  #
234  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
235  # a completeness guarantee,
236  #
237  # send in form ( nodeID, path, object)
238  self.log.info('Sending FileRecords...')
239  lst = self.fsr.getHistoNames()
240 
241  # Check Validity
242  if not lst :
243  self.log.info('No FileRecords Data to send to Writer.')
244  self.q.put( 'END_FSR' )
245  return SUCCESS
246 
247  # no need to send the root node
248  if '/FileRecords' in lst : lst.remove('/FileRecords')
249 
250  for l in lst :
251  o = self.fsr.retrieveObject( l )
252  if hasattr(o, "configureDirectAccess") :
253  o.configureDirectAccess()
254  # lead worker sends everything, as completeness guarantee
255  if self._gmpc.nodeID == 0 :
256  self.objectsOut.append( (0, l, pickle.dumps(o)) )
257  else :
258  # only add the Event Counter
259  # and non-Empty Keyed Containers (ignore empty ones)
260  if l == '/FileRecords/EventCountFSR' :
261  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
262  self.objectsOut.append( tup )
263  else :
264  # It's a Keyed Container
265  assert "KeyedContainer" in o.__class__.__name__
266  nObjects = o.numberOfObjects()
267  if nObjects :
268  self.log.debug("Keyed Container %s with %i objects"\
269  %(l, nObjects))
270  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
271  self.objectsOut.append( tup )
272  self.log.debug('Done with FSR store, just to send to Writer.')
273 
274  if self.objectsOut :
275  self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
276  for ob in self.objectsOut :
277  self.log.debug('\t%s'%(ob[0]))
278  self.q.put( self.objectsOut )
279  else :
280  self.log.info('Valid FSR Store, but no data to send to Writer')
281  self.log.info('SendFSR complete')
282  self.q.put( 'END_FSR' )
283  return SUCCESS
284 
def GaudiMP.pTools.FileRecordsAgent.SendFileRecords (   self)

Definition at line 226 of file pTools.py.

226  def SendFileRecords( self ) :
227  # send the FileRecords data as part of finalisation
228 
229  # Take Care of FileRecords!
230  # There are two main things to consider here
231  # 1) The DataObjects in the FileRecords Transient Store
232  # 2) The fact that they are Keyed Containers, containing other objects
233  #
234  # The Lead Worker, nodeID=0, sends everything in the FSR store, as
235  # a completeness guarantee,
236  #
237  # send in form ( nodeID, path, object)
238  self.log.info('Sending FileRecords...')
239  lst = self.fsr.getHistoNames()
240 
241  # Check Validity
242  if not lst :
243  self.log.info('No FileRecords Data to send to Writer.')
244  self.q.put( 'END_FSR' )
245  return SUCCESS
246 
247  # no need to send the root node
248  if '/FileRecords' in lst : lst.remove('/FileRecords')
249 
250  for l in lst :
251  o = self.fsr.retrieveObject( l )
252  if hasattr(o, "configureDirectAccess") :
253  o.configureDirectAccess()
254  # lead worker sends everything, as completeness guarantee
255  if self._gmpc.nodeID == 0 :
256  self.objectsOut.append( (0, l, pickle.dumps(o)) )
257  else :
258  # only add the Event Counter
259  # and non-Empty Keyed Containers (ignore empty ones)
260  if l == '/FileRecords/EventCountFSR' :
261  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
262  self.objectsOut.append( tup )
263  else :
264  # It's a Keyed Container
265  assert "KeyedContainer" in o.__class__.__name__
266  nObjects = o.numberOfObjects()
267  if nObjects :
268  self.log.debug("Keyed Container %s with %i objects"\
269  %(l, nObjects))
270  tup = (self._gmpc.nodeID, l, pickle.dumps(o))
271  self.objectsOut.append( tup )
272  self.log.debug('Done with FSR store, just to send to Writer.')
273 
274  if self.objectsOut :
275  self.log.debug('%i FSR objects to Writer'%(len(self.objectsOut)))
276  for ob in self.objectsOut :
277  self.log.debug('\t%s'%(ob[0]))
278  self.q.put( self.objectsOut )
279  else :
280  self.log.info('Valid FSR Store, but no data to send to Writer')
281  self.log.info('SendFSR complete')
282  self.q.put( 'END_FSR' )
283  return SUCCESS
284 

Member Data Documentation

GaudiMP.pTools.FileRecordsAgent._gmpc
private

Definition at line 208 of file pTools.py.

GaudiMP.pTools.FileRecordsAgent.fsr

Definition at line 209 of file pTools.py.

GaudiMP.pTools.FileRecordsAgent.log

Definition at line 211 of file pTools.py.

GaudiMP.pTools.FileRecordsAgent.objectsIn

Definition at line 212 of file pTools.py.

GaudiMP.pTools.FileRecordsAgent.objectsOut

Definition at line 213 of file pTools.py.

GaudiMP.pTools.FileRecordsAgent.q

Definition at line 210 of file pTools.py.


The documentation for this class was generated from the following file: