GaudiMP.GMPBase.Worker Class Reference
Inheritance diagram for GaudiMP.GMPBase.Worker:
Collaboration diagram for GaudiMP.GMPBase.Worker:

Public Member Functions

def __init__ (self, workerID, queues, events, params, subworkers)
 
def processConfiguration (self)
 
def Engine (self)
 
def getCheckAlgs (self)
 
def checkExecutedPassed (self, algName)
 
def isEventPassed (self)
 
def __init__ (self, workerID, queues, events, params, subworkers)
 
def processConfiguration (self)
 
def Engine (self)
 
def getCheckAlgs (self)
 
def checkExecutedPassed (self, algName)
 
def isEventPassed (self)
 
- Public Member Functions inherited from GaudiMP.GMPBase.GMPComponent
def __init__ (self, nodeType, nodeID, queues, events, params, subworkers)
 
def Start (self)
 
def Engine (self)
 
def processConfiguration (self)
 
def SetupGaudiPython (self)
 
def StartGaudiPython (self)
 
def LoadTES (self, tbufferfile)
 
def getEventNumber (self)
 
def IdentifyWriters (self)
 
def dumpHistograms (self)
 
def Initialize (self)
 
def Finalize (self)
 
def Report (self)
 
def __init__ (self, nodeType, nodeID, queues, events, params, subworkers)
 
def Start (self)
 
def Engine (self)
 
def processConfiguration (self)
 
def SetupGaudiPython (self)
 
def StartGaudiPython (self)
 
def LoadTES (self, tbufferfile)
 
def getEventNumber (self)
 
def IdentifyWriters (self)
 
def dumpHistograms (self)
 
def Initialize (self)
 
def Finalize (self)
 
def Report (self)
 

Public Attributes

 writerDict
 
 vetoAlgs
 
 eventOutput
 
 app
 
 filerecordsAgent
 
 nIn
 
 firstEvTime
 
 currentEvent
 
 tTime
 
- Public Attributes inherited from GaudiMP.GMPBase.GMPComponent
 nodeType
 
 finalEvent
 
 lastEvent
 
 log
 
 subworkers
 
 nodeID
 
 currentEvent
 
 queues
 
 num
 
 app
 
 evcom
 
 evcoms
 
 hq
 
 fq
 
 nIn
 
 nOut
 
 stat
 
 iTime
 
 rTime
 
 fTime
 
 firstEvTime
 
 tTime
 
 proc
 
 a
 
 evt
 
 hvt
 
 fsr
 
 inc
 
 pers
 
 ts
 
 TS
 
 cntr
 

Detailed Description

Definition at line 940 of file GMPBase.py.

Constructor & Destructor Documentation

def GaudiMP.GMPBase.Worker.__init__ (   self,
  workerID,
  queues,
  events,
  params,
  subworkers 
)

Definition at line 941 of file GMPBase.py.

941  def __init__( self, workerID, queues, events, params , subworkers ) :
942  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
943  # Identify the writer streams
945  # Identify the accept/veto checks for each event
946  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
947  self.log.name = "Worker-%i"%(self.nodeID)
948  self.log.info("Worker-%i Created OK"%(self.nodeID))
949  self.eventOutput = True
950 
def getCheckAlgs(self)
Definition: GMPBase.py:1107
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:941
def GaudiMP.GMPBase.Worker.__init__ (   self,
  workerID,
  queues,
  events,
  params,
  subworkers 
)

Definition at line 941 of file GMPBase.py.

941  def __init__( self, workerID, queues, events, params , subworkers ) :
942  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
943  # Identify the writer streams
945  # Identify the accept/veto checks for each event
946  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
947  self.log.name = "Worker-%i"%(self.nodeID)
948  self.log.info("Worker-%i Created OK"%(self.nodeID))
949  self.eventOutput = True
950 
def getCheckAlgs(self)
Definition: GMPBase.py:1107
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:941

Member Function Documentation

def GaudiMP.GMPBase.Worker.checkExecutedPassed (   self,
  algName 
)

Definition at line 1123 of file GMPBase.py.

1123  def checkExecutedPassed( self, algName ) :
1124  if self.a.algorithm( algName )._ialg.isExecuted()\
1125  and self.a.algorithm( algName )._ialg.filterPassed() :
1126  return True
1127  else :
1128  return False
1129 
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1123
def GaudiMP.GMPBase.Worker.checkExecutedPassed (   self,
  algName 
)

Definition at line 1123 of file GMPBase.py.

1123  def checkExecutedPassed( self, algName ) :
1124  if self.a.algorithm( algName )._ialg.isExecuted()\
1125  and self.a.algorithm( algName )._ialg.filterPassed() :
1126  return True
1127  else :
1128  return False
1129 
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1123
def GaudiMP.GMPBase.Worker.Engine (   self)

Definition at line 991 of file GMPBase.py.

991  def Engine( self ) :
992 
993  # rename process
994  import os
995  import ctypes
996  libc = ctypes.CDLL('libc.so.6')
997  name = str(self.nodeType) + str(self.nodeID) + '\0'
998  libc.prctl(15,name,0,0,0)
999 
1000  startEngine = time.time()
1001  self.log.info("Worker %i starting Engine"%(self.nodeID))
1002  self.Initialize()
1004 
1005  # populate the TESSerializer itemlist
1006  self.log.info('EVT WRITERS ON WORKER : %i'\
1007  %( len(self.writerDict['events'])))
1008 
1009  nEventWriters = len( self.writerDict[ "events" ] )
1010  if nEventWriters :
1011  itemList = set()
1012  optItemList = set()
1013  for m in self.writerDict[ "events" ] :
1014  for item in m.ItemList :
1015  hsh = item.find( '#' )
1016  if hsh != -1:
1017  item = item[:hsh]
1018  itemList.add( item )
1019  for item in m.OptItemList :
1020  hsh = item.find( '#' )
1021  if hsh != -1:
1022  item = item[:hsh]
1023  optItemList.add( item )
1024  # If an item is mandatory and optional, keep it only in the optional list
1025  itemList -= optItemList
1026  for item in sorted( itemList ):
1027  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
1028  self.ts.addItem( item )
1029  for item in sorted( optItemList ):
1030  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
1031  self.ts.addOptItem( item )
1032  else :
1033  self.log.info( 'There is no Event Output for this app' )
1034  self.eventOutput = False
1035 
1036  # Begin processing
1037  Go = True
1038  while Go :
1039  packet = self.evcom.receive( )
1040  if packet : pass
1041  else : continue
1042  if packet == 'FINISHED' : break
1043  evtNumber, tbin = packet # unpack
1044  if self.cntr != None:
1045  self.cntr.setEventCounter( evtNumber )
1046 
1047  # subworkers are forked before the first event is processed
1048  # reader-thread for ConDB must be closed and reopened in each subworker
1049  # this is done by disconnect()
1050  if self.nIn == 0:
1051 
1052  self.log.info("Fork new subworkers and disconnect from CondDB")
1053  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1054  condDB.disconnect()
1055 
1056  # Fork subworkers and share services
1057  for k in self.subworkers:
1058  k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1059  k.Start()
1060  self.a.addAlgorithm( CollectHistograms(self) )
1061  self.nIn += 1
1062  self.TS.Load( tbin )
1063 
1064  t = time.time()
1065  sc = self.a.executeEvent()
1066  if self.nIn == 1 :
1067  self.firstEvTime = time.time()-t
1068  else :
1069  self.rTime += (time.time()-t)
1070  if sc.isSuccess() :
1071  pass
1072  else :
1073  self.log.warning('Did not Execute Event')
1074  self.evt.clearStore()
1075  continue
1076  if self.isEventPassed() :
1077  pass
1078  else :
1079  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
1080  self.evt.clearStore()
1081  continue
1082  if self.eventOutput :
1083  # It may be the case of generating Event Tags; hence
1084  # no event output
1086  tb = self.TS.Dump( )
1087  self.evcom.send( (self.currentEvent, tb) )
1088  self.nOut += 1
1089  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
1090  self.eventLoopSyncer.set()
1091  self.evt.clearStore( )
1092  self.log.info('Setting <Last> Event')
1093  self.lastEvent.set()
1094 
1095  self.evcom.finalize()
1096  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
1097  # Now send the FileRecords and stop/finalize the appMgr
1098  self.filerecordsAgent.SendFileRecords()
1099  self.Finalize()
1100  self.tTime = time.time()-startEngine
1101  self.Report()
1102 
1103  for k in self.subworkers:
1104  self.log.info('Join subworkers')
1105  k.proc.join()
1106 
def isEventPassed(self)
Definition: GMPBase.py:1130
def GaudiMP.GMPBase.Worker.Engine (   self)

Definition at line 991 of file GMPBase.py.

991  def Engine( self ) :
992 
993  # rename process
994  import os
995  import ctypes
996  libc = ctypes.CDLL('libc.so.6')
997  name = str(self.nodeType) + str(self.nodeID) + '\0'
998  libc.prctl(15,name,0,0,0)
999 
1000  startEngine = time.time()
1001  self.log.info("Worker %i starting Engine"%(self.nodeID))
1002  self.Initialize()
1004 
1005  # populate the TESSerializer itemlist
1006  self.log.info('EVT WRITERS ON WORKER : %i'\
1007  %( len(self.writerDict['events'])))
1008 
1009  nEventWriters = len( self.writerDict[ "events" ] )
1010  if nEventWriters :
1011  itemList = set()
1012  optItemList = set()
1013  for m in self.writerDict[ "events" ] :
1014  for item in m.ItemList :
1015  hsh = item.find( '#' )
1016  if hsh != -1:
1017  item = item[:hsh]
1018  itemList.add( item )
1019  for item in m.OptItemList :
1020  hsh = item.find( '#' )
1021  if hsh != -1:
1022  item = item[:hsh]
1023  optItemList.add( item )
1024  # If an item is mandatory and optional, keep it only in the optional list
1025  itemList -= optItemList
1026  for item in sorted( itemList ):
1027  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
1028  self.ts.addItem( item )
1029  for item in sorted( optItemList ):
1030  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
1031  self.ts.addOptItem( item )
1032  else :
1033  self.log.info( 'There is no Event Output for this app' )
1034  self.eventOutput = False
1035 
1036  # Begin processing
1037  Go = True
1038  while Go :
1039  packet = self.evcom.receive( )
1040  if packet : pass
1041  else : continue
1042  if packet == 'FINISHED' : break
1043  evtNumber, tbin = packet # unpack
1044  if self.cntr != None:
1045  self.cntr.setEventCounter( evtNumber )
1046 
1047  # subworkers are forked before the first event is processed
1048  # reader-thread for ConDB must be closed and reopened in each subworker
1049  # this is done by disconnect()
1050  if self.nIn == 0:
1051 
1052  self.log.info("Fork new subworkers and disconnect from CondDB")
1053  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1054  condDB.disconnect()
1055 
1056  # Fork subworkers and share services
1057  for k in self.subworkers:
1058  k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1059  k.Start()
1060  self.a.addAlgorithm( CollectHistograms(self) )
1061  self.nIn += 1
1062  self.TS.Load( tbin )
1063 
1064  t = time.time()
1065  sc = self.a.executeEvent()
1066  if self.nIn == 1 :
1067  self.firstEvTime = time.time()-t
1068  else :
1069  self.rTime += (time.time()-t)
1070  if sc.isSuccess() :
1071  pass
1072  else :
1073  self.log.warning('Did not Execute Event')
1074  self.evt.clearStore()
1075  continue
1076  if self.isEventPassed() :
1077  pass
1078  else :
1079  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
1080  self.evt.clearStore()
1081  continue
1082  if self.eventOutput :
1083  # It may be the case of generating Event Tags; hence
1084  # no event output
1086  tb = self.TS.Dump( )
1087  self.evcom.send( (self.currentEvent, tb) )
1088  self.nOut += 1
1089  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
1090  self.eventLoopSyncer.set()
1091  self.evt.clearStore( )
1092  self.log.info('Setting <Last> Event')
1093  self.lastEvent.set()
1094 
1095  self.evcom.finalize()
1096  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
1097  # Now send the FileRecords and stop/finalize the appMgr
1098  self.filerecordsAgent.SendFileRecords()
1099  self.Finalize()
1100  self.tTime = time.time()-startEngine
1101  self.Report()
1102 
1103  for k in self.subworkers:
1104  self.log.info('Join subworkers')
1105  k.proc.join()
1106 
def isEventPassed(self)
Definition: GMPBase.py:1130
def GaudiMP.GMPBase.Worker.getCheckAlgs (   self)
For some output writers, a check is performed to see if the event has
executed certain algorithms.
These reside in the AcceptAlgs property for those writers

Definition at line 1107 of file GMPBase.py.

1107  def getCheckAlgs( self ) :
1108  '''
1109  For some output writers, a check is performed to see if the event has
1110  executed certain algorithms.
1111  These reside in the AcceptAlgs property for those writers
1112  '''
1113  acc = []
1114  req = []
1115  vet = []
1116  for m in self.writerDict[ "events" ] :
1117  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
1118  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
1119  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
1120  return (acc, req, vet)
1121 
1122 
def getCheckAlgs(self)
Definition: GMPBase.py:1107
def GaudiMP.GMPBase.Worker.getCheckAlgs (   self)
For some output writers, a check is performed to see if the event has
executed certain algorithms.
These reside in the AcceptAlgs property for those writers

Definition at line 1107 of file GMPBase.py.

1107  def getCheckAlgs( self ) :
1108  '''
1109  For some output writers, a check is performed to see if the event has
1110  executed certain algorithms.
1111  These reside in the AcceptAlgs property for those writers
1112  '''
1113  acc = []
1114  req = []
1115  vet = []
1116  for m in self.writerDict[ "events" ] :
1117  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
1118  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
1119  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
1120  return (acc, req, vet)
1121 
1122 
def getCheckAlgs(self)
Definition: GMPBase.py:1107
def GaudiMP.GMPBase.Worker.isEventPassed (   self)
Check the algorithm status for an event.
Depending on output writer settings, the event
  may be declined based on various criteria.
This is a transcript of the check that occurs in GaudiSvc::OutputStream

Definition at line 1130 of file GMPBase.py.

1130  def isEventPassed( self ) :
1131  '''
1132  Check the algorithm status for an event.
1133  Depending on output writer settings, the event
1134  may be declined based on various criteria.
1135  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1136  '''
1137  passed = False
1138 
1139  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1140  if self.acceptAlgs :
1141  for name in self.acceptAlgs :
1142  if self.checkExecutedPassed( name ) :
1143  passed = True
1144  break
1145  else :
1146  passed = True
1147 
1148  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
1149  for name in self.requireAlgs :
1150  if self.checkExecutedPassed( name ) :
1151  pass
1152  else :
1153  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
1154  passed = False
1155 
1156  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1157  for name in self.vetoAlgs :
1158  if self.checkExecutedPassed( name ) :
1159  pass
1160  else :
1161  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
1162  passed = False
1163  return passed
1164 
1165 # =============================================================================
1166 
def isEventPassed(self)
Definition: GMPBase.py:1130
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1123
def GaudiMP.GMPBase.Worker.isEventPassed (   self)
Check the algorithm status for an event.
Depending on output writer settings, the event
  may be declined based on various criteria.
This is a transcript of the check that occurs in GaudiSvc::OutputStream

Definition at line 1130 of file GMPBase.py.

1130  def isEventPassed( self ) :
1131  '''
1132  Check the algorithm status for an event.
1133  Depending on output writer settings, the event
1134  may be declined based on various criteria.
1135  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1136  '''
1137  passed = False
1138 
1139  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1140  if self.acceptAlgs :
1141  for name in self.acceptAlgs :
1142  if self.checkExecutedPassed( name ) :
1143  passed = True
1144  break
1145  else :
1146  passed = True
1147 
1148  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
1149  for name in self.requireAlgs :
1150  if self.checkExecutedPassed( name ) :
1151  pass
1152  else :
1153  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
1154  passed = False
1155 
1156  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1157  for name in self.vetoAlgs :
1158  if self.checkExecutedPassed( name ) :
1159  pass
1160  else :
1161  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
1162  passed = False
1163  return passed
1164 
1165 # =============================================================================
1166 
def isEventPassed(self)
Definition: GMPBase.py:1130
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1123
def GaudiMP.GMPBase.Worker.processConfiguration (   self)

Definition at line 951 of file GMPBase.py.

951  def processConfiguration( self ) :
952 
953  # Worker :
954  # No input
955  # No output
956  # No Histos
957  self.config[ 'EventSelector' ].Input = []
958  self.config[ 'ApplicationMgr' ].OutStream = []
959  if "HistogramPersistencySvc" in self.config.keys() :
960  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
961  formatHead = '[Worker-%i] '%(self.nodeID)
962  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
963 
964  for key, lst in self.writerDict.iteritems() :
965  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
966 
967  for m in self.writerDict[ "tuples" ] :
968  # rename Tuple output file with an appendix
969  # based on worker id, for merging later
970  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
971  self.config[ m.key ].Output = newName
972 
973  # Suppress INFO Output for all but Worker-0
974  #if self.nodeID == 0 :
975  # pass
976  #else :
977  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
978 
979  if self.app == "Gauss":
980  try:
981  if "ToolSvc.EvtCounter" not in self.config:
982  from Configurables import EvtCounter
983  counter = EvtCounter()
984  else:
985  counter = self.config["ToolSvc.EvtCounter"]
986  counter.UseIncident = False
987  except:
988  # ignore errors when trying to change the configuration of the EvtCounter
989  self.log.warning('Cannot configure EvtCounter')
990 
def processConfiguration(self)
Definition: GMPBase.py:951
def GaudiMP.GMPBase.Worker.processConfiguration (   self)

Definition at line 951 of file GMPBase.py.

951  def processConfiguration( self ) :
952 
953  # Worker :
954  # No input
955  # No output
956  # No Histos
957  self.config[ 'EventSelector' ].Input = []
958  self.config[ 'ApplicationMgr' ].OutStream = []
959  if "HistogramPersistencySvc" in self.config.keys() :
960  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
961  formatHead = '[Worker-%i] '%(self.nodeID)
962  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
963 
964  for key, lst in self.writerDict.iteritems() :
965  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
966 
967  for m in self.writerDict[ "tuples" ] :
968  # rename Tuple output file with an appendix
969  # based on worker id, for merging later
970  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
971  self.config[ m.key ].Output = newName
972 
973  # Suppress INFO Output for all but Worker-0
974  #if self.nodeID == 0 :
975  # pass
976  #else :
977  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
978 
979  if self.app == "Gauss":
980  try:
981  if "ToolSvc.EvtCounter" not in self.config:
982  from Configurables import EvtCounter
983  counter = EvtCounter()
984  else:
985  counter = self.config["ToolSvc.EvtCounter"]
986  counter.UseIncident = False
987  except:
988  # ignore errors when trying to change the configuration of the EvtCounter
989  self.log.warning('Cannot configure EvtCounter')
990 
def processConfiguration(self)
Definition: GMPBase.py:951

Member Data Documentation

GaudiMP.GMPBase.Worker.app

Definition at line 979 of file GMPBase.py.

GaudiMP.GMPBase.Worker.currentEvent

Definition at line 1085 of file GMPBase.py.

GaudiMP.GMPBase.Worker.eventOutput

Definition at line 949 of file GMPBase.py.

GaudiMP.GMPBase.Worker.filerecordsAgent

Definition at line 1003 of file GMPBase.py.

GaudiMP.GMPBase.Worker.firstEvTime

Definition at line 1067 of file GMPBase.py.

GaudiMP.GMPBase.Worker.nIn

Definition at line 1050 of file GMPBase.py.

GaudiMP.GMPBase.Worker.tTime

Definition at line 1100 of file GMPBase.py.

GaudiMP.GMPBase.Worker.vetoAlgs

Definition at line 946 of file GMPBase.py.

GaudiMP.GMPBase.Worker.writerDict

Definition at line 944 of file GMPBase.py.


The documentation for this class was generated from the following file: