2 from GaudiPython 
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
     3 from ROOT 
import TBufferFile, TBuffer
     5 from multiprocessing 
import Process, Queue, JoinableQueue, Event
     6 from multiprocessing 
import cpu_count, current_process
     7 from multiprocessing.queues 
import Empty
    10 from ROOT 
import TParallelMergingFile
    39 WAIT_INITIALISE   = 60*5
    40 WAIT_FIRST_EVENT  = 60*3
    41 WAIT_SINGLE_EVENT = 60*6
    55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
    59 aidatypes = ( gbl.AIDA.IHistogram,
    60               gbl.AIDA.IHistogram1D,
    61               gbl.AIDA.IHistogram2D,
    62               gbl.AIDA.IHistogram3D,
    65               gbl.AIDA.IBaseHistogram  )  
    68 thtypes   = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
    71 WRITERTYPES  = {  
'EvtCollectionStream'    : 
"tuples",
    72                   'InputCopyStream'        : 
"events",
    73                   'OutputStream'           : 
"events",
    74                   'RecordStream'           : 
"records",
    75                   'RunRecordStream'        : 
"records",
    76                   'SequentialOutputStream' : 
"events",
    77                   'TagCollectionStream'    : 
"tuples"   }
    83     A class to represent a writer in the GaudiPython configuration    84     It can be non-trivial to access the name of the output file; it may be    85     specified in the DataSvc, or just on the writer, may be a list, or string    86     Also, there are three different types of writer (events, records, tuples)    87     so this bootstrap class provides easy access to this info while configuring   105         if hasattr( self.
w, 
"Output" ) :
   108             self.
set( self.
wName, self.w.Output )
   115             if hasattr( datasvc, 
"Output" ) :
   127         assert replaceThis.__class__.__name__ == 
'str'   128         assert    withThis.__class__.__name__ == 
'str'   131         if old.__class__.__name__ == 
'list' :
   134         new = old.replace( replaceThis, withThis )
   143         if hasattr( self.
w, 
"ItemList" ) :
   146             datasvc = config[ self.w.EvtDataSvc ]
   147             if hasattr( datasvc, 
"ItemList" ) :
   150         if hasattr( self.
w, 
"OptItemList" ) :
   153             datasvc = config[ self.w.EvtDataSvc ]
   154             if hasattr( datasvc, 
"OptItemList" ) :
   158     def set( self, key, output ) :
   167         s += 
"Writer         : %s\n"%( self.
wName  )
   168         s += 
"Writer Type    : %s\n"%( self.
wType  )
   169         s += 
"Writer Output  : %s\n"%( self.
output )
   171         s += 
"DataSvc Output : %s\n"%( self.
svcOutput   )
   173         s += 
"Key for config : %s\n"%( self.
key    )
   174         s += 
"Output File    : %s\n"%( self.
output )
   175         s += 
"ItemList       : %s\n"%( self.
ItemList )
   184     GaudiPython algorithm used to clean up histos on the Reader and Workers   185     Only has a finalize method()   186     This retrieves a dictionary of path:histo objects and sends it to the   187     writer.  It then waits for a None flag : THIS IS IMPORTANT, as if   188     the algorithm returns before ALL histos have been COMPLETELY RECEIVED   189     at the writer end, there will be an error.   192         PyAlgorithm.__init__( self )
   199     self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
   200         self._gmpc.hDict = self._gmpc.dumpHistograms( )
   201         ks = self._gmpc.hDict.keys()
   202         self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
   208         reps = len(ks)/chunk + 1
   209         for i 
in xrange(reps) :
   210             someKeys = ks[i*chunk : (i+1)*chunk]
   211             smalld = dict( [(key, self._gmpc.hDict[key]) 
for key 
in someKeys] )
   212             self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
   214     self.log.debug(
'Signalling end of histos to Writer')
   215         self._gmpc.hq.put( 
'HISTOS_SENT' )
   216         self.log.debug( 
'Waiting on Sync Event' )
   217         self._gmpc.sEvent.wait()
   218         self.log.debug( 
'Histo Sync Event set, clearing and returning' )
   219         self._gmpc.hvt.clearStore()
   220         root = gbl.DataObject()
   222         self._gmpc.hvt.setRoot( 
'/stat', root )
   255         assert item.__class__.__name__ == 
'tuple'   256         startTransmission = time.time()
   257         self.qout.put( item )
   260         while self.qout._buffer : time.sleep( NAP )
   261         self.
qoutTime += time.time()-startTransmission
   268         startWait = time.time()
   270             itemIn = self.qin.get( timeout=timeout )
   273         self.
qinTime += time.time()-startWait
   275         if itemIn.__class__.__name__ == 
'tuple' :
   282             self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
   283                                     %(self._gmpc.nodeType))
   287         self.log.info(
'Finalize Event Communicator : %s %s'%(self.
_gmpc, self._gmpc.nodeType))
   291         if   self._gmpc.nodeType == 
'Reader' : downstream = self._gmpc.nWorkers
   292         elif self._gmpc.nodeType == 
'Writer' : downstream = 0
   293         elif self._gmpc.nodeType == 
'Worker' : downstream = 1
   295         for i 
in xrange(downstream) :
   296             self.qout.put( 
'FINISHED' )
   297         if self._gmpc.nodeType != 
'Writer' :
   303         self.log.name = 
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
   304         self.log.info ( 
'Items Sent     : %i'%(self.
nSent) )
   305         self.log.info ( 
'Items Received : %i'%(self.
nRecv) )
   306         self.log.info ( 
'Data  Sent     : %i'%(self.
sizeSent) )
   307         self.log.info ( 
'Data  Received : %i'%(self.
sizeRecv) )
   308         self.log.info ( 
'Q-out Time     : %5.2f'%(self.
qoutTime) )
   309         self.log.info ( 
'Q-in  Time     : %5.2f'%(self.
qinTime ) )
   314     def __init__( self, gaudiTESSerializer, evtDataSvc,
   315                         nodeType, nodeID, log ) :
   316         self.
T   = gaudiTESSerializer
   329         root = gbl.DataObject()
   331         self.evt.setRoot( 
'/Event', root )
   333         self.T.loadBuffer( tbuf )
   334         self.
tLoad   += (time.time() - t)
   336         self.buffersIn.append( tbuf.Length() )
   339         tb = TBufferFile( TBuffer.kWrite )
   340         self.T.dumpBuffer(tb)
   341         self.
tDump += ( time.time()-t )
   343         self.buffersOut.append( tb.Length() )
   346         evIn       = 
"Events Loaded    : %i"%( self.
nIn  )
   347         evOut      = 
"Events Dumped    : %i"%( self.
nOut )
   349         dataIn     = 
"Data Loaded      : %i"%(din)
   350         dataInMb   = 
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
   352             avgIn      = 
"Avg Buf Loaded   : %5.2f Mb"\
   353                           %( din/(self.
nIn*MB) )
   354             maxIn      = 
"Max Buf Loaded   : %5.2f Mb"\
   357             avgIn      = 
"Avg Buf Loaded   : N/A"   358             maxIn      = 
"Max Buf Loaded   : N/A"   360         dataOut    = 
"Data Dumped      : %i"%(dout)
   361         dataOutMb  = 
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
   363             avgOut     = 
"Avg Buf Dumped   : %5.2f Mb"\
   364                           %( din/(self.
nOut*MB) )
   365             maxOut     = 
"Max Buf Dumped   : %5.2f Mb"\
   368             avgOut     = 
"Avg Buf Dumped   : N/A"   369             maxOut     = 
"Max Buf Dumped   : N/A"   370         dumpTime   = 
"Total Dump Time  : %5.2f"%( self.
tDump )
   371         loadTime   = 
"Total Load Time  : %5.2f"%( self.
tLoad )
   385         self.log.name = 
"%s-%i TESSerializer"%(self.
nodeType, self.
nodeID)
   387             self.log.info( line )
   399     def __init__( self, nodeType, nodeID, queues, events, params, subworkers   ) :
   410         current_process().name = nodeType
   417         self.nWorkers, self.sEvent, self.config, self.
log = params
   428         ks = self.config.keys()
   430         list = [
"Brunel", 
"DaVinci", 
"Boole", 
"Gauss"]
   432            if k 
in ks: self.
app = k
   436         qPair, histq, fq = self.
queues   450                 self.evcoms.append( ec )
   496             from AlgSmapShot 
import SmapShot
   498             ss = SmapShot( logname=smapsLog )
   499             self.a.addAlgorithm( ss )
   500         self.
evt = self.a.evtsvc()
   501         self.
hvt = self.a.histsvc()
   502         self.
fsr = self.a.filerecordsvc()
   503         self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
   504         self.
pers = self.a.service( 
'EventPersistencySvc', 
'IAddressCreator' )
   505         self.
ts  = gbl.GaudiMP.TESSerializer( self.evt._idp, self.
pers )
   516         root = gbl.DataObject()
   518         self.evt.setRoot( 
'/Event', root )
   519         self.ts.loadBuffer(tbufferfile)
   522       if self.
app != 
'Gauss':
   528         lst = [ 
'/Event/Gen/Header',
   529                 '/Event/Rec/Header' ]
   533                 n = self.
evt[path].evtNumber()
   543             n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
   550         if self.
nIn > 0 
or self.
nOut > 0 :
   553             self.log.warning(
'Could not determine Event Number')
   566         keys = [ 
"events", 
"records", 
"tuples", 
"histos" ]
   571         wkeys = WRITERTYPES.keys()
   572         for v 
in self.config.values() :
   573             if v.__class__.__name__ 
in wkeys :
   574                 writerType = WRITERTYPES[ v.__class__.__name__ ]
   575                 d[writerType].append( 
MiniWriter(v, writerType, self.config) )
   577                     self.log.info(
'Writer Found : %s'%(v.name()))
   580         if 'HistogramPersistencySvc' in  self.config.keys() :
   581             hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
   582             d[ 
"histos" ].append( hfile )
   587         Method used by the GaudiPython algorithm CollectHistos   588         to obtain a dictionary of form { path : object }   589         representing the Histogram Store   591         nlist = self.hvt.getHistoNames( )
   593         objects = 0 ; histos = 0
   597                 if type(o) 
in aidatypes :
   604             print 'WARNING : no histograms to recover?'   614         if self.
app == 
'Gauss':
   616             tool = self.a.tool( 
"ToolSvc.EvtCounter" )
   617             self.
cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
   621         self.
iTime = time.time() - start
   628         self.finalEvent.set()
   629         self.
fTime = time.time() - start
   633         allTime  = 
"Alive Time     : %5.2f"%(self.
tTime)
   634         initTime = 
"Init Time      : %5.2f"%(self.
iTime)
   635         frstTime = 
"1st Event Time : %5.2f"%(self.
firstEvTime)
   636         runTime  = 
"Run Time       : %5.2f"%(self.
rTime)
   637         finTime  = 
"Finalise Time  : %5.2f"%(self.
fTime)
   638         tup = ( allTime, initTime, frstTime, runTime, finTime )
   648     def __init__( self, queues, events, params, subworkers  ) :
   649         GMPComponent.__init__(self, 
'Reader', -1, queues, events, params, subworkers  )
   656         self.config[ 
'ApplicationMgr' ].TopAlg    = []
   657         self.config[ 
'ApplicationMgr' ].OutStream = []
   658         if "HistogramPersistencySvc" in self.config.keys() :
   659             self.config[ 
'HistogramPersistencySvc' ].OutputFile = 
''   660         self.config[
'MessageSvc'].Format    = 
'[Reader]% F%18W%S%7W%R%T %0W%M'   661         self.
evtMax = self.config[ 
'ApplicationMgr' ].EvtMax
   664         tb = TBufferFile( TBuffer.kWrite )
   666         self.ts.dumpBuffer( tb )
   673         startFirst = time.time()
   674         self.log.info(
'Reader : First Event')
   676             self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
   682             if not bool(self.
evt[
'/Event']) :
   683                 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
   689                    lst = self.evt.getHistoNames()
   692                     lst = self.evt.getList()
   693                     if self.
app == 
"DaVinci":
   694                       daqnode = self.evt.retrieveObject( 
'/Event/DAQ' ).
registry()
   696                       self.evt.getList( daqnode, lst, daqnode.address().
par() )
   698                     self.log.critical(
'Reader could not acquire TES List!')
   701                 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
   706                 self.log.info(
'First Event Sent')
   709                 self.eventLoopSyncer.set()
   710                 self.evt.clearStore( )
   718         libc = ctypes.CDLL(
'libc.so.6')
   720         libc.prctl(15,name,0,0,0)
   723         startEngine = time.time()
   724         self.log.name = 
'Reader'   725         self.log.info(
'Reader Process starting')
   732         self.log.info(
'Reader Beginning Distribution')
   735             self.log.info(
'Reader First Event OK')
   737             self.log.critical(
'Reader Failed on First Event')
   744                 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
   747             if not self.stat.isSuccess() :
   748                 self.log.critical( 
'Reader is Damaged!' )
   753             self.
rTime += (time.time()-t)
   754             if not bool(self.
evt[
'/Event']) :
   755                 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
   762             self.eventLoopSyncer.set()
   763             self.evt.clearStore( )
   764         self.log.info(
'Setting <Last> Event')
   768         self.log.info( 
'Reader : Event Distribution complete.' )
   769         self.evcom.finalize()
   771         self.
tTime = time.time() - startEngine
   776     def __init__( self, workerID, queues, events, params, subworkers  ) :
   777         GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
   782         self.log.info(
"Subworker-%i Created OK"%(self.
nodeID))
   789         libc = ctypes.CDLL(
'libc.so.6')
   791         libc.prctl(15,name,0,0,0)
   794         startEngine = time.time()
   795         msg = self.a.service(
'MessageSvc')
   796         msg.Format = 
'[' + self.log.name + 
'] % F%18W%S%7W%R%T %0W%M'   798         self.log.name = 
"Worker-%i"%(self.
nodeID)
   799         self.log.info(
"Subworker %i starting Engine"%(self.
nodeID))
   803         self.log.info(
'EVT WRITERS ON WORKER : %i'\
   806         nEventWriters = len( self.
writerDict[ 
"events" ] )
   812             packet = self.evcom.receive( )
   815             if packet == 
'FINISHED' : 
break   816             evtNumber, tbin = packet    
   817             if self.
cntr != 
None:
   819                 self.cntr.setEventCounter( evtNumber )
   825             sc = self.a.executeEvent()
   829                 self.
rTime += (time.time()-t)
   833             self.log.name = 
"Worker-%i"%(self.
nodeID)
   834                 self.log.warning(
'Did not Execute Event')
   835                 self.evt.clearStore()
   840                 self.log.name = 
"Worker-%i"%(self.
nodeID)
   841                 self.log.warning( 
'Event did not pass : %i'%(evtNumber) )
   842                 self.evt.clearStore()
   851             self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
   852             self.eventLoopSyncer.set()
   853             self.evt.clearStore( )
   854         self.log.name = 
"Worker-%i"%(self.
nodeID)
   855         self.log.info(
'Setting <Last> Event %s' %(self.
nodeID))
   858         self.evcom.finalize()
   860         self.filerecordsAgent.SendFileRecords()
   861         self.
tTime = time.time()-startEngine
   872         self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
   882         For some output writers, a check is performed to see if the event has   883         executed certain algorithms.   884         These reside in the AcceptAlgs property for those writers   890             if hasattr(m.w, 
'AcceptAlgs')  : acc += m.w.AcceptAlgs
   891             if hasattr(m.w, 
'RequireAlgs') : req += m.w.RequireAlgs
   892             if hasattr(m.w, 
'VetoAlgs')    : vet += m.w.VetoAlgs
   893         return (acc, req, vet)
   897         if  self.a.algorithm( algName )._ialg.isExecuted()\
   898         and self.a.algorithm( algName )._ialg.filterPassed() :
   905         Check the algorithm status for an event.   906         Depending on output writer settings, the event   907           may be declined based on various criteria.   908         This is a transcript of the check that occurs in GaudiSvc::OutputStream   912         self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
   914             for name 
in self.acceptAlgs :
   921         self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
   922         for name 
in self.requireAlgs :
   926                 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
   929         self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
   934                 self.log.info( 
'Evt declined : (vetoAlgs) : %s'%(name) )
   940     def __init__( self, workerID, queues, events, params , subworkers ) :
   941         GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
   946         self.log.name = 
"Worker-%i"%(self.
nodeID)
   947         self.log.info(
"Worker-%i Created OK"%(self.
nodeID))
   956         self.config[ 
'EventSelector'  ].Input     = []
   957         self.config[ 
'ApplicationMgr' ].OutStream = []
   958         if "HistogramPersistencySvc" in self.config.keys() :
   959             self.config[ 
'HistogramPersistencySvc' ].OutputFile = 
''   960         formatHead = 
'[Worker-%i] '%(self.
nodeID)
   961         self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M'   963         for key, lst 
in self.writerDict.iteritems() :
   964             self.log.info( 
'Writer Type : %s\t : %i'%(key, len(lst)) )
   969             newName = m.getNewName( 
'.', 
'.w%i.'%(self.
nodeID) )
   970             self.config[ m.key ].Output = newName
   980             if "ToolSvc.EvtCounter" not in self.config:
   981                 from Configurables 
import EvtCounter
   982                 counter = EvtCounter()
   984                 counter = self.config[
"ToolSvc.EvtCounter"]
   985             counter.UseIncident = 
False   988             self.log.warning(
'Cannot configure EvtCounter')
   995         libc = ctypes.CDLL(
'libc.so.6')
   997         libc.prctl(15,name,0,0,0)
   999         startEngine = time.time()
  1000         self.log.info(
"Worker %i starting Engine"%(self.
nodeID))
  1005         self.log.info(
'EVT WRITERS ON WORKER : %i'\
  1008         nEventWriters = len( self.
writerDict[ 
"events" ] )
  1013                 for item 
in m.ItemList :
  1014                     hsh = item.find( 
'#' )
  1017                     itemList.add( item )
  1018                 for item 
in m.OptItemList :
  1019                     hsh = item.find( 
'#' )
  1022                     optItemList.add( item )
  1024             itemList -= optItemList
  1025             for item 
in sorted( itemList ):
  1026                 self.log.info( 
' adding ItemList Item to ts : %s' % ( item ) )
  1027                 self.ts.addItem( item )
  1028             for item 
in sorted( optItemList ):
  1029                 self.log.info( 
' adding Optional Item to ts : %s' % ( item ) )
  1030                 self.ts.addOptItem( item )
  1032             self.log.info( 
'There is no Event Output for this app' )
  1038             packet = self.evcom.receive( )
  1041             if packet == 
'FINISHED' : 
break  1042             evtNumber, tbin = packet    
  1043             if self.
cntr != 
None:
  1044                 self.cntr.setEventCounter( evtNumber )
  1051                 self.log.info(
"Fork new subworkers and disconnect from CondDB")
  1052                 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
  1061             self.TS.Load( tbin )
  1064             sc = self.a.executeEvent()
  1068                 self.
rTime += (time.time()-t)
  1072                 self.log.warning(
'Did not Execute Event')
  1073                 self.evt.clearStore()
  1078                 self.log.warning( 
'Event did not pass : %i'%(evtNumber) )
  1079                 self.evt.clearStore()
  1085                 tb = self.TS.Dump( )
  1088             self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
  1089             self.eventLoopSyncer.set()
  1090             self.evt.clearStore( )
  1091         self.log.info(
'Setting <Last> Event')
  1092         self.lastEvent.set()
  1094         self.evcom.finalize()
  1095         self.log.info( 
'Worker-%i Finished Processing Events'%(self.
nodeID) )
  1097         self.filerecordsAgent.SendFileRecords()
  1103             self.log.info(
'Join subworkers')
  1108         For some output writers, a check is performed to see if the event has  1109         executed certain algorithms.  1110         These reside in the AcceptAlgs property for those writers  1116             if hasattr(m.w, 
'AcceptAlgs')  : acc += m.w.AcceptAlgs
  1117             if hasattr(m.w, 
'RequireAlgs') : req += m.w.RequireAlgs
  1118             if hasattr(m.w, 
'VetoAlgs')    : vet += m.w.VetoAlgs
  1119         return (acc, req, vet)
  1123         if  self.a.algorithm( algName )._ialg.isExecuted()\
  1124         and self.a.algorithm( algName )._ialg.filterPassed() :
  1131         Check the algorithm status for an event.  1132         Depending on output writer settings, the event  1133           may be declined based on various criteria.  1134         This is a transcript of the check that occurs in GaudiSvc::OutputStream  1138         self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
  1139         if self.acceptAlgs :
  1140             for name 
in self.acceptAlgs :
  1147         self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
  1148         for name 
in self.requireAlgs :
  1152                 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
  1155         self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
  1160                 self.log.info( 
'Evt declined : (vetoAlgs) : %s'%(name) )
  1167     def __init__( self, queues, events, params, subworkers  ) :
  1168         GMPComponent.__init__(self,
'Writer', -2, queues, events, params, subworkers )
  1173         self.log.name = 
"Writer--2"  1179         self.config[ 
'ApplicationMgr' ].TopAlg = []
  1180         self.config[ 
'EventSelector'  ].Input  = []
  1182         self.config[
'MessageSvc'].Format = 
'[Writer] % F%18W%S%7W%R%T %0W%M'  1185         for key, lst 
in self.writerDict.iteritems() :
  1186             self.log.info( 
'Writer Type : %s\t : %i'%(key, len(lst)) )
  1193             self.log.debug( 
'Processing Event Writer : %s'%(m) )
  1194             newName = m.getNewName( 
'.', 
'.p%i.'%self.nWorkers )
  1195             self.config[ m.key ].Output = newName
  1205             self.log.debug( 
'Processing FileRecords Writer: %s'%(m) )
  1206             newName = m.getNewName( 
'.', 
'.p%i.'%self.nWorkers,
  1207                                        extra=
" OPT='RECREATE'" )
  1208             self.config[ m.key ].Output = newName
  1211         hs = 
"HistogramPersistencySvc"  1213         if hs 
in self.config.keys() :
  1214             n = self.config[ hs ].OutputFile
  1216             newName=self.config[hs].OutputFile.replace(
'.',\
  1217                                                        '.p%i.'%(self.nWorkers))
  1218             self.config[ hs ].OutputFile = newName
  1224         libc = ctypes.CDLL(
'libc.so.6')
  1226         libc.prctl(15,name,0,0,0)
  1228         startEngine = time.time()
  1236         stopCriteria = self.nWorkers
  1238             current = (current+1)%self.nWorkers
  1239             packet = self.
evcoms[current].receive( timeout=0.01 )
  1242             if packet == 
'FINISHED' :
  1243                 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
  1245                 self.
status[current] = 
True  1247                     self.log.info(
'FINISHED recd from all workers, break loop')
  1252             evtNumber, tbin = packet    
  1253             self.TS.Load( tbin )
  1255             self.a.executeEvent()
  1256             self.
rTime += ( time.time()-t )
  1258             self.evt.clearStore( )
  1259             self.eventLoopSyncer.set()
  1260         self.log.name = 
"Writer--2"  1261         self.log.info(
'Setting <Last> Event')
  1262         self.lastEvent.set()
  1265         [ e.finalize() 
for e 
in self.
evcoms ]
  1267         sc = self.histoAgent.Receive()
  1268         sc = self.histoAgent.RebuildHistoStore()
  1269         if sc.isSuccess() : self.log.info( 
'Histo Store rebuilt ok' )
  1270         else              : self.log.warning( 
'Histo Store Error in Rebuild' )
  1273         sc = self.filerecordsAgent.Receive()
  1274         self.filerecordsAgent.Rebuild()
  1291         self.log.name = 
'GaudiPython-Parallel-Logger'  1292         self.log.info( 
'GaudiPython Parallel Process Co-ordinator beginning' )
  1302         self.
hq = JoinableQueue( )       
  1303         self.
fq = JoinableQueue( )       
  1307                              limit=WAIT_INITIALISE,
  1308                              step=STEP_INITIALISE    )
  1311                              limit=WAIT_SINGLE_EVENT,
  1313                              firstEvent=WAIT_FIRST_EVENT )
  1315                              limit=WAIT_FINALISE,
  1316                              step=STEP_FINALISE )
  1327             self.subworkers.append( sub )
  1334         self.system.append(self.
writer)
  1335         self.system.append(wk)
  1336         self.system.append(self.
reader)
  1339         init = self.sInit.d[nodeID].event
  1340         run  = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
  1341         fin  = self.sFin.d[nodeID].event
  1342         return ( init, run, fin )
  1345         eventQ = self.
qs[ nodeID ]
  1348         return ( eventQ, histQ, fsrQ )
  1353         self.log.name = 
'GaudiPython-Parallel-Logger'  1354         self.log.info( 
'INITIALISING SYSTEM' )
  1360         sc = self.sInit.syncAll(step=
"Initialise")
  1361         if sc == SUCCESS: 
pass  1362         else  : self.
Terminate() ; 
return FAILURE
  1365         self.log.name = 
'GaudiPython-Parallel-Logger'  1366         self.log.info( 
'RUNNING SYSTEM' )
  1367         sc = self.sRun.syncAll(step=
"Run")
  1368         if sc == SUCCESS: 
pass  1369         else  : self.
Terminate() ; 
return FAILURE
  1372         self.log.name = 
'GaudiPython-Parallel-Logger'  1373         self.log.info( 
'FINALISING SYSTEM' )
  1374         sc = self.sFin.syncAll(step=
"Finalise")
  1375         if sc == SUCCESS: 
pass  1376         else  : self.
Terminate() ; 
return FAILURE
  1379         self.log.info( 
"Cleanly join all Processes" )
  1381         self.log.info( 
"Report Total Success to Main.py" )
  1386         children = multiprocessing.active_children()
  1396         self.system.reverse()
  1409         rwk = JoinableQueue()
  1411         workersWriter = [ JoinableQueue() 
for i 
in xrange(self.
nWorkers) ]
  1414         d[-2] = (workersWriter, 
None)    
  1415         for i 
in xrange(self.
nWorkers) : d[i] =  (rwk, workersWriter[i])
 
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
 
def __init__(self, queues, events, params, subworkers)
 
def getNewName(self, replaceThis, withThis, extra='')
 
StatusCode finalize() override
 
StatusCode execute() override
 
def receive(self, timeout=None)
 
def __init__(self, GMPComponent, qin, qout)
 
double sum(double x, double y, double z)
 
def __init__(self, gmpcomponent)
 
def IdentifyWriters(self)
 
def __init__(self, workerID, queues, events, params, subworkers)
 
def checkExecutedPassed(self, algName)
 
def processConfiguration(self)
 
def LoadTES(self, tbufferfile)
 
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container 
 
def getItemLists(self, config)
 
def __init__(self, queues, events, params, subworkers)
 
def set(self, key, output)
 
def __init__(self, nWorkers, config, log)
 
def StartGaudiPython(self)
 
def __init__(self, writer, wType, config)
 
def processConfiguration(self)
 
def getSyncEvents(self, nodeID)
 
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
 
def __init__(self, workerID, queues, events, params, subworkers)
 
Python Algorithm base class. 
 
def processConfiguration(self)
 
def SetupGaudiPython(self)
 
def getQueues(self, nodeID)
 
def checkExecutedPassed(self, algName)
 
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
 
def processConfiguration(self)