All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
GaudiMP.GMPBase.Worker Class Reference
Inheritance diagram for GaudiMP.GMPBase.Worker:
Collaboration diagram for GaudiMP.GMPBase.Worker:

Public Member Functions

def __init__ (self, workerID, queues, events, params, subworkers)
 
def processConfiguration (self)
 
def Engine (self)
 
def getCheckAlgs (self)
 
def checkExecutedPassed (self, algName)
 
def isEventPassed (self)
 
- Public Member Functions inherited from GaudiMP.GMPBase.GMPComponent
def __init__ (self, nodeType, nodeID, queues, events, params, subworkers)
 
def Start (self)
 
def Engine (self)
 
def processConfiguration (self)
 
def SetupGaudiPython (self)
 
def StartGaudiPython (self)
 
def LoadTES (self, tbufferfile)
 
def getEventNumber (self)
 
def IdentifyWriters (self)
 
def dumpHistograms (self)
 
def Initialize (self)
 
def Finalize (self)
 
def Report (self)
 

Public Attributes

 writerDict
 
 vetoAlgs
 
 eventOutput
 
 app
 
 filerecordsAgent
 
 nIn
 
 firstEvTime
 
 currentEvent
 
 tTime
 
- Public Attributes inherited from GaudiMP.GMPBase.GMPComponent
 nodeType
 
 finalEvent
 
 lastEvent
 
 log
 
 subworkers
 
 nodeID
 
 currentEvent
 
 queues
 
 num
 
 app
 
 evcom
 
 evcoms
 
 hq
 
 fq
 
 nIn
 
 nOut
 
 stat
 
 iTime
 
 rTime
 
 fTime
 
 firstEvTime
 
 tTime
 
 proc
 
 a
 
 evt
 
 hvt
 
 fsr
 
 inc
 
 pers
 
 ts
 
 TS
 
 cntr
 

Detailed Description

Definition at line 939 of file GMPBase.py.

Constructor & Destructor Documentation

def GaudiMP.GMPBase.Worker.__init__ (   self,
  workerID,
  queues,
  events,
  params,
  subworkers 
)

Definition at line 940 of file GMPBase.py.

940  def __init__( self, workerID, queues, events, params , subworkers ) :
941  GMPComponent.__init__(self,'Worker', workerID, queues, events, params, subworkers )
942  # Identify the writer streams
944  # Identify the accept/veto checks for each event
945  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
946  self.log.name = "Worker-%i"%(self.nodeID)
947  self.log.info("Worker-%i Created OK"%(self.nodeID))
948  self.eventOutput = True
949 
def getCheckAlgs(self)
Definition: GMPBase.py:1106
def __init__(self, workerID, queues, events, params, subworkers)
Definition: GMPBase.py:940

Member Function Documentation

def GaudiMP.GMPBase.Worker.checkExecutedPassed (   self,
  algName 
)

Definition at line 1122 of file GMPBase.py.

1122  def checkExecutedPassed( self, algName ) :
1123  if self.a.algorithm( algName )._ialg.isExecuted()\
1124  and self.a.algorithm( algName )._ialg.filterPassed() :
1125  return True
1126  else :
1127  return False
1128 
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1122
def GaudiMP.GMPBase.Worker.Engine (   self)

Definition at line 990 of file GMPBase.py.

990  def Engine( self ) :
991 
992  # rename process
993  import os
994  import ctypes
995  libc = ctypes.CDLL('libc.so.6')
996  name = str(self.nodeType) + str(self.nodeID) + '\0'
997  libc.prctl(15,name,0,0,0)
998 
999  startEngine = time.time()
1000  self.log.info("Worker %i starting Engine"%(self.nodeID))
1001  self.Initialize()
1003 
1004  # populate the TESSerializer itemlist
1005  self.log.info('EVT WRITERS ON WORKER : %i'\
1006  %( len(self.writerDict['events'])))
1007 
1008  nEventWriters = len( self.writerDict[ "events" ] )
1009  if nEventWriters :
1010  itemList = set()
1011  optItemList = set()
1012  for m in self.writerDict[ "events" ] :
1013  for item in m.ItemList :
1014  hsh = item.find( '#' )
1015  if hsh != -1:
1016  item = item[:hsh]
1017  itemList.add( item )
1018  for item in m.OptItemList :
1019  hsh = item.find( '#' )
1020  if hsh != -1:
1021  item = item[:hsh]
1022  optItemList.add( item )
1023  # If an item is mandatory and optional, keep it only in the optional list
1024  itemList -= optItemList
1025  for item in sorted( itemList ):
1026  self.log.info( ' adding ItemList Item to ts : %s' % ( item ) )
1027  self.ts.addItem( item )
1028  for item in sorted( optItemList ):
1029  self.log.info( ' adding Optional Item to ts : %s' % ( item ) )
1030  self.ts.addOptItem( item )
1031  else :
1032  self.log.info( 'There is no Event Output for this app' )
1033  self.eventOutput = False
1034 
1035  # Begin processing
1036  Go = True
1037  while Go :
1038  packet = self.evcom.receive( )
1039  if packet : pass
1040  else : continue
1041  if packet == 'FINISHED' : break
1042  evtNumber, tbin = packet # unpack
1043  if self.cntr != None:
1044  self.cntr.setEventCounter( evtNumber )
1045 
1046  # subworkers are forked before the first event is processed
1047  # reader-thread for ConDB must be closed and reopened in each subworker
1048  # this is done by disconnect()
1049  if self.nIn == 0:
1050 
1051  self.log.info("Fork new subworkers and disconnect from CondDB")
1052  condDB = self.a.service('CondDBCnvSvc', gbl.ICondDBReader)
1053  condDB.disconnect()
1054 
1055  # Fork subworkers and share services
1056  for k in self.subworkers:
1057  k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1058  k.Start()
1059  self.a.addAlgorithm( CollectHistograms(self) )
1060  self.nIn += 1
1061  self.TS.Load( tbin )
1062 
1063  t = time.time()
1064  sc = self.a.executeEvent()
1065  if self.nIn == 1 :
1066  self.firstEvTime = time.time()-t
1067  else :
1068  self.rTime += (time.time()-t)
1069  if sc.isSuccess() :
1070  pass
1071  else :
1072  self.log.warning('Did not Execute Event')
1073  self.evt.clearStore()
1074  continue
1075  if self.isEventPassed() :
1076  pass
1077  else :
1078  self.log.warning( 'Event did not pass : %i'%(evtNumber) )
1079  self.evt.clearStore()
1080  continue
1081  if self.eventOutput :
1082  # It may be the case of generating Event Tags; hence
1083  # no event output
1085  tb = self.TS.Dump( )
1086  self.evcom.send( (self.currentEvent, tb) )
1087  self.nOut += 1
1088  self.inc.fireIncident(gbl.Incident('Worker','EndEvent'))
1089  self.eventLoopSyncer.set()
1090  self.evt.clearStore( )
1091  self.log.info('Setting <Last> Event')
1092  self.lastEvent.set()
1093 
1094  self.evcom.finalize()
1095  self.log.info( 'Worker-%i Finished Processing Events'%(self.nodeID) )
1096  # Now send the FileRecords and stop/finalize the appMgr
1097  self.filerecordsAgent.SendFileRecords()
1098  self.Finalize()
1099  self.tTime = time.time()-startEngine
1100  self.Report()
1101 
1102  for k in self.subworkers:
1103  self.log.info('Join subworkers')
1104  k.proc.join()
1105 
def isEventPassed(self)
Definition: GMPBase.py:1129
def GaudiMP.GMPBase.Worker.getCheckAlgs (   self)
For some output writers, a check is performed to see if the event has
executed certain algorithms.
These reside in the AcceptAlgs property for those writers

Definition at line 1106 of file GMPBase.py.

1106  def getCheckAlgs( self ) :
1107  '''
1108  For some output writers, a check is performed to see if the event has
1109  executed certain algorithms.
1110  These reside in the AcceptAlgs property for those writers
1111  '''
1112  acc = []
1113  req = []
1114  vet = []
1115  for m in self.writerDict[ "events" ] :
1116  if hasattr(m.w, 'AcceptAlgs') : acc += m.w.AcceptAlgs
1117  if hasattr(m.w, 'RequireAlgs') : req += m.w.RequireAlgs
1118  if hasattr(m.w, 'VetoAlgs') : vet += m.w.VetoAlgs
1119  return (acc, req, vet)
1120 
1121 
def getCheckAlgs(self)
Definition: GMPBase.py:1106
def GaudiMP.GMPBase.Worker.isEventPassed (   self)
Check the algorithm status for an event.
Depending on output writer settings, the event
  may be declined based on various criteria.
This is a transcript of the check that occurs in GaudiSvc::OutputStream

Definition at line 1129 of file GMPBase.py.

1129  def isEventPassed( self ) :
1130  '''
1131  Check the algorithm status for an event.
1132  Depending on output writer settings, the event
1133  may be declined based on various criteria.
1134  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1135  '''
1136  passed = False
1137 
1138  self.log.debug('self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1139  if self.acceptAlgs :
1140  for name in self.acceptAlgs :
1141  if self.checkExecutedPassed( name ) :
1142  passed = True
1143  break
1144  else :
1145  passed = True
1146 
1147  self.log.debug('self.requireAlgs is %s'%(str(self.requireAlgs)))
1148  for name in self.requireAlgs :
1149  if self.checkExecutedPassed( name ) :
1150  pass
1151  else :
1152  self.log.info('Evt declined (requireAlgs) : %s'%(name) )
1153  passed = False
1154 
1155  self.log.debug('self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1156  for name in self.vetoAlgs :
1157  if self.checkExecutedPassed( name ) :
1158  pass
1159  else :
1160  self.log.info( 'Evt declined : (vetoAlgs) : %s'%(name) )
1161  passed = False
1162  return passed
1163 
1164 # =============================================================================
1165 
def isEventPassed(self)
Definition: GMPBase.py:1129
def checkExecutedPassed(self, algName)
Definition: GMPBase.py:1122
def GaudiMP.GMPBase.Worker.processConfiguration (   self)

Definition at line 950 of file GMPBase.py.

950  def processConfiguration( self ) :
951 
952  # Worker :
953  # No input
954  # No output
955  # No Histos
956  self.config[ 'EventSelector' ].Input = []
957  self.config[ 'ApplicationMgr' ].OutStream = []
958  if "HistogramPersistencySvc" in self.config.keys() :
959  self.config[ 'HistogramPersistencySvc' ].OutputFile = ''
960  formatHead = '[Worker-%i] '%(self.nodeID)
961  self.config['MessageSvc'].Format = formatHead+'% F%18W%S%7W%R%T %0W%M'
962 
963  for key, lst in self.writerDict.iteritems() :
964  self.log.info( 'Writer Type : %s\t : %i'%(key, len(lst)) )
965 
966  for m in self.writerDict[ "tuples" ] :
967  # rename Tuple output file with an appendix
968  # based on worker id, for merging later
969  newName = m.getNewName( '.', '.w%i.'%(self.nodeID) )
970  self.config[ m.key ].Output = newName
971 
972  # Suppress INFO Output for all but Worker-0
973  #if self.nodeID == 0 :
974  # pass
975  #else :
976  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
977 
978  if self.app == "Gauss":
979  try:
980  if "ToolSvc.EvtCounter" not in self.config:
981  from Configurables import EvtCounter
982  counter = EvtCounter()
983  else:
984  counter = self.config["ToolSvc.EvtCounter"]
985  counter.UseIncident = False
986  except:
987  # ignore errors when trying to change the configuration of the EvtCounter
988  self.log.warning('Cannot configure EvtCounter')
989 
def processConfiguration(self)
Definition: GMPBase.py:950

Member Data Documentation

GaudiMP.GMPBase.Worker.app

Definition at line 978 of file GMPBase.py.

GaudiMP.GMPBase.Worker.currentEvent

Definition at line 1084 of file GMPBase.py.

GaudiMP.GMPBase.Worker.eventOutput

Definition at line 948 of file GMPBase.py.

GaudiMP.GMPBase.Worker.filerecordsAgent

Definition at line 1002 of file GMPBase.py.

GaudiMP.GMPBase.Worker.firstEvTime

Definition at line 1066 of file GMPBase.py.

GaudiMP.GMPBase.Worker.nIn

Definition at line 1049 of file GMPBase.py.

GaudiMP.GMPBase.Worker.tTime

Definition at line 1099 of file GMPBase.py.

GaudiMP.GMPBase.Worker.vetoAlgs

Definition at line 945 of file GMPBase.py.

GaudiMP.GMPBase.Worker.writerDict

Definition at line 943 of file GMPBase.py.


The documentation for this class was generated from the following file: