2 from Configurables
import EvtCounter
3 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
4 from ROOT
import TBufferFile, TBuffer
6 from multiprocessing
import Process, Queue, JoinableQueue, Event
7 from multiprocessing
import cpu_count, current_process
8 from multiprocessing.queues
import Empty
11 from ROOT
import TParallelMergingFile
40 WAIT_INITIALISE = 60*5
41 WAIT_FIRST_EVENT = 60*3
42 WAIT_SINGLE_EVENT = 60*6
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
60 aidatypes = ( gbl.AIDA.IHistogram,
61 gbl.AIDA.IHistogram1D,
62 gbl.AIDA.IHistogram2D,
63 gbl.AIDA.IHistogram3D,
66 gbl.AIDA.IBaseHistogram )
69 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
72 WRITERTYPES = {
'EvtCollectionStream' :
"tuples",
73 'InputCopyStream' :
"events",
74 'OutputStream' :
"events",
75 'RecordStream' :
"records",
76 'RunRecordStream' :
"records",
77 'SequentialOutputStream' :
"events",
78 'TagCollectionStream' :
"tuples" }
84 A class to represent a writer in the GaudiPython configuration
85 It can be non-trivial to access the name of the output file; it may be
86 specified in the DataSvc, or just on the writer, may be a list, or string
87 Also, there are three different types of writer (events, records, tuples)
88 so this bootstrap class provides easy access to this info while configuring
106 if hasattr( self.
w,
"Output" ) :
109 self.
set( self.
wName, self.w.Output )
116 if hasattr( datasvc,
"Output" ) :
128 assert replaceThis.__class__.__name__ ==
'str'
129 assert withThis.__class__.__name__ ==
'str'
132 if old.__class__.__name__ ==
'list' :
135 new = old.replace( replaceThis, withThis )
144 if hasattr( self.
w,
"ItemList" ) :
147 datasvc = config[ self.w.EvtDataSvc ]
148 if hasattr( datasvc,
"ItemList" ) :
151 if hasattr( self.
w,
"OptItemList" ) :
154 datasvc = config[ self.w.EvtDataSvc ]
155 if hasattr( datasvc,
"OptItemList" ) :
159 def set( self, key, output ) :
168 s +=
"Writer : %s\n"%( self.
wName )
169 s +=
"Writer Type : %s\n"%( self.
wType )
170 s +=
"Writer Output : %s\n"%( self.
output )
172 s +=
"DataSvc Output : %s\n"%( self.
svcOutput )
174 s +=
"Key for config : %s\n"%( self.
key )
175 s +=
"Output File : %s\n"%( self.
output )
176 s +=
"ItemList : %s\n"%( self.
ItemList )
185 GaudiPython algorithm used to clean up histos on the Reader and Workers
186 Only has a finalize method()
187 This retrieves a dictionary of path:histo objects and sends it to the
188 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
189 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
190 at the writer end, there will be an error.
193 PyAlgorithm.__init__( self )
200 self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
201 self._gmpc.hDict = self._gmpc.dumpHistograms( )
202 ks = self._gmpc.hDict.keys()
203 self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
209 reps = len(ks)/chunk + 1
210 for i
in xrange(reps) :
211 someKeys = ks[i*chunk : (i+1)*chunk]
212 smalld = dict( [(key, self._gmpc.hDict[key])
for key
in someKeys] )
213 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
215 self.log.debug(
'Signalling end of histos to Writer')
216 self._gmpc.hq.put(
'HISTOS_SENT' )
217 self.log.debug(
'Waiting on Sync Event' )
218 self._gmpc.sEvent.wait()
219 self.log.debug(
'Histo Sync Event set, clearing and returning' )
220 self._gmpc.hvt.clearStore()
221 root = gbl.DataObject()
223 self._gmpc.hvt.setRoot(
'/stat', root )
256 assert item.__class__.__name__ ==
'tuple'
257 startTransmission = time.time()
258 self.qout.put( item )
261 while self.qout._buffer : time.sleep( NAP )
262 self.
qoutTime += time.time()-startTransmission
269 startWait = time.time()
271 itemIn = self.qin.get( timeout=timeout )
274 self.
qinTime += time.time()-startWait
276 if itemIn.__class__.__name__ ==
'tuple' :
283 self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
284 %(self._gmpc.nodeType))
288 self.log.info(
'Finalize Event Communicator : %s %s'%(self.
_gmpc, self._gmpc.nodeType))
292 if self._gmpc.nodeType ==
'Reader' : downstream = self._gmpc.nWorkers
293 elif self._gmpc.nodeType ==
'Writer' : downstream = 0
294 elif self._gmpc.nodeType ==
'Worker' : downstream = 1
296 for i
in xrange(downstream) :
297 self.qout.put(
'FINISHED' )
298 if self._gmpc.nodeType !=
'Writer' :
304 self.log.name =
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
305 self.log.info (
'Items Sent : %i'%(self.
nSent) )
306 self.log.info (
'Items Received : %i'%(self.
nRecv) )
307 self.log.info (
'Data Sent : %i'%(self.
sizeSent) )
308 self.log.info (
'Data Received : %i'%(self.
sizeRecv) )
309 self.log.info (
'Q-out Time : %5.2f'%(self.
qoutTime) )
310 self.log.info (
'Q-in Time : %5.2f'%(self.
qinTime ) )
315 def __init__( self, gaudiTESSerializer, evtDataSvc,
316 nodeType, nodeID, log ) :
317 self.
T = gaudiTESSerializer
330 root = gbl.DataObject()
332 self.evt.setRoot(
'/Event', root )
334 self.T.loadBuffer( tbuf )
335 self.
tLoad += (time.time() - t)
337 self.buffersIn.append( tbuf.Length() )
340 tb = TBufferFile( TBuffer.kWrite )
341 self.T.dumpBuffer(tb)
342 self.
tDump += ( time.time()-t )
344 self.buffersOut.append( tb.Length() )
347 evIn =
"Events Loaded : %i"%( self.
nIn )
348 evOut =
"Events Dumped : %i"%( self.
nOut )
350 dataIn =
"Data Loaded : %i"%(din)
351 dataInMb =
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
353 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
354 %( din/(self.
nIn*MB) )
355 maxIn =
"Max Buf Loaded : %5.2f Mb"\
358 avgIn =
"Avg Buf Loaded : N/A"
359 maxIn =
"Max Buf Loaded : N/A"
361 dataOut =
"Data Dumped : %i"%(dout)
362 dataOutMb =
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
364 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
365 %( din/(self.
nOut*MB) )
366 maxOut =
"Max Buf Dumped : %5.2f Mb"\
369 avgOut =
"Avg Buf Dumped : N/A"
370 maxOut =
"Max Buf Dumped : N/A"
371 dumpTime =
"Total Dump Time : %5.2f"%( self.
tDump )
372 loadTime =
"Total Load Time : %5.2f"%( self.
tLoad )
386 self.log.name =
"%s-%i TESSerializer"%(self.
nodeType, self.
nodeID)
388 self.log.info( line )
400 def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
411 current_process().name = nodeType
418 self.nWorkers, self.sEvent, self.config, self.
log = params
429 ks = self.config.keys()
431 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
433 if k
in ks: self.
app = k
437 qPair, histq, fq = self.
queues
451 self.evcoms.append( ec )
497 from AlgSmapShot
import SmapShot
499 ss = SmapShot( logname=smapsLog )
500 self.a.addAlgorithm( ss )
501 self.
evt = self.a.evtsvc()
502 self.
hvt = self.a.histsvc()
503 self.
fsr = self.a.filerecordsvc()
504 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
505 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator' )
506 self.
ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.
pers )
517 root = gbl.DataObject()
519 self.evt.setRoot(
'/Event', root )
520 self.ts.loadBuffer(tbufferfile)
523 if self.
app !=
'Gauss':
529 lst = [
'/Event/Gen/Header',
530 '/Event/Rec/Header' ]
534 n = self.
evt[path].evtNumber()
544 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
551 if self.
nIn > 0
or self.
nOut > 0 :
554 self.log.warning(
'Could not determine Event Number')
567 keys = [
"events",
"records",
"tuples",
"histos" ]
572 wkeys = WRITERTYPES.keys()
573 for v
in self.config.values() :
574 if v.__class__.__name__
in wkeys :
575 writerType = WRITERTYPES[ v.__class__.__name__ ]
576 d[writerType].append(
MiniWriter(v, writerType, self.config) )
578 self.log.info(
'Writer Found : %s'%(v.name()))
581 if 'HistogramPersistencySvc' in self.config.keys() :
582 hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
583 d[
"histos" ].append( hfile )
588 Method used by the GaudiPython algorithm CollectHistos
589 to obtain a dictionary of form { path : object }
590 representing the Histogram Store
592 nlist = self.hvt.getHistoNames( )
594 objects = 0 ; histos = 0
598 if type(o)
in aidatypes :
605 print 'WARNING : no histograms to recover?'
615 if self.
app ==
'Gauss':
617 tool = self.a.tool(
"ToolSvc.EvtCounter" )
618 self.
cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
622 self.
iTime = time.time() - start
629 self.finalEvent.set()
630 self.
fTime = time.time() - start
634 allTime =
"Alive Time : %5.2f"%(self.
tTime)
635 initTime =
"Init Time : %5.2f"%(self.
iTime)
636 frstTime =
"1st Event Time : %5.2f"%(self.
firstEvTime)
637 runTime =
"Run Time : %5.2f"%(self.
rTime)
638 finTime =
"Finalise Time : %5.2f"%(self.
fTime)
639 tup = ( allTime, initTime, frstTime, runTime, finTime )
649 def __init__( self, queues, events, params, subworkers ) :
650 GMPComponent.__init__(self,
'Reader', -1, queues, events, params, subworkers )
657 self.config[
'ApplicationMgr' ].TopAlg = []
658 self.config[
'ApplicationMgr' ].OutStream = []
659 if "HistogramPersistencySvc" in self.config.keys() :
660 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
661 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M'
662 self.
evtMax = self.config[
'ApplicationMgr' ].EvtMax
665 tb = TBufferFile( TBuffer.kWrite )
667 self.ts.dumpBuffer( tb )
674 startFirst = time.time()
675 self.log.info(
'Reader : First Event')
677 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
683 if not bool(self.
evt[
'/Event']) :
684 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
690 lst = self.evt.getHistoNames()
693 lst = self.evt.getList()
694 if self.
app ==
"DaVinci":
695 daqnode = self.evt.retrieveObject(
'/Event/DAQ' ).
registry()
697 self.evt.getList( daqnode, lst, daqnode.address().
par() )
699 self.log.critical(
'Reader could not acquire TES List!')
702 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
707 self.log.info(
'First Event Sent')
710 self.eventLoopSyncer.set()
711 self.evt.clearStore( )
719 libc = ctypes.CDLL(
'libc.so.6')
721 libc.prctl(15,name,0,0,0)
724 startEngine = time.time()
725 self.log.name =
'Reader'
726 self.log.info(
'Reader Process starting')
733 self.log.info(
'Reader Beginning Distribution')
736 self.log.info(
'Reader First Event OK')
738 self.log.critical(
'Reader Failed on First Event')
745 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
748 if not self.stat.isSuccess() :
749 self.log.critical(
'Reader is Damaged!' )
754 self.
rTime += (time.time()-t)
755 if not bool(self.
evt[
'/Event']) :
756 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
763 self.eventLoopSyncer.set()
764 self.evt.clearStore( )
765 self.log.info(
'Setting <Last> Event')
769 self.log.info(
'Reader : Event Distribution complete.' )
770 self.evcom.finalize()
772 self.
tTime = time.time() - startEngine
777 def __init__( self, workerID, queues, events, params, subworkers ) :
778 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
783 self.log.info(
"Subworker-%i Created OK"%(self.
nodeID))
790 libc = ctypes.CDLL(
'libc.so.6')
792 libc.prctl(15,name,0,0,0)
795 startEngine = time.time()
796 msg = self.a.service(
'MessageSvc')
797 msg.Format =
'[' + self.log.name +
'] % F%18W%S%7W%R%T %0W%M'
799 self.log.name =
"Worker-%i"%(self.
nodeID)
800 self.log.info(
"Subworker %i starting Engine"%(self.
nodeID))
804 self.log.info(
'EVT WRITERS ON WORKER : %i'\
807 nEventWriters = len( self.
writerDict[
"events" ] )
813 packet = self.evcom.receive( )
816 if packet ==
'FINISHED' :
break
817 evtNumber, tbin = packet
818 if self.
cntr !=
None:
820 self.cntr.setEventCounter( evtNumber )
826 sc = self.a.executeEvent()
830 self.
rTime += (time.time()-t)
834 self.log.name =
"Worker-%i"%(self.
nodeID)
835 self.log.warning(
'Did not Execute Event')
836 self.evt.clearStore()
841 self.log.name =
"Worker-%i"%(self.
nodeID)
842 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
843 self.evt.clearStore()
852 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
853 self.eventLoopSyncer.set()
854 self.evt.clearStore( )
855 self.log.name =
"Worker-%i"%(self.
nodeID)
856 self.log.info(
'Setting <Last> Event %s' %(self.
nodeID))
859 self.evcom.finalize()
861 self.filerecordsAgent.SendFileRecords()
862 self.
tTime = time.time()-startEngine
873 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
883 For some output writers, a check is performed to see if the event has
884 executed certain algorithms.
885 These reside in the AcceptAlgs property for those writers
891 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
892 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
893 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
894 return (acc, req, vet)
898 if self.a.algorithm( algName )._ialg.isExecuted()\
899 and self.a.algorithm( algName )._ialg.filterPassed() :
906 Check the algorithm status for an event.
907 Depending on output writer settings, the event
908 may be declined based on various criteria.
909 This is a transcript of the check that occurs in GaudiSvc::OutputStream
913 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
915 for name
in self.acceptAlgs :
922 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
923 for name
in self.requireAlgs :
927 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
930 self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
935 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
941 def __init__( self, workerID, queues, events, params , subworkers ) :
942 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
947 self.log.name =
"Worker-%i"%(self.
nodeID)
948 self.log.info(
"Worker-%i Created OK"%(self.
nodeID))
957 self.config[
'EventSelector' ].Input = []
958 self.config[
'ApplicationMgr' ].OutStream = []
959 if "HistogramPersistencySvc" in self.config.keys() :
960 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
961 formatHead =
'[Worker-%i] '%(self.
nodeID)
962 self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M'
964 for key, lst
in self.writerDict.iteritems() :
965 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
970 newName = m.getNewName(
'.',
'.w%i.'%(self.
nodeID) )
971 self.config[ m.key ].Output = newName
981 if "ToolSvc.EvtCounter" not in self.config:
982 from Configurables
import EvtCounter
983 counter = EvtCounter()
985 counter = self.config[
"ToolSvc.EvtCounter"]
986 counter.UseIncident =
False
989 self.log.warning(
'Cannot configure EvtCounter')
996 libc = ctypes.CDLL(
'libc.so.6')
998 libc.prctl(15,name,0,0,0)
1000 startEngine = time.time()
1001 self.log.info(
"Worker %i starting Engine"%(self.
nodeID))
1006 self.log.info(
'EVT WRITERS ON WORKER : %i'\
1009 nEventWriters = len( self.
writerDict[
"events" ] )
1014 for item
in m.ItemList :
1015 hsh = item.find(
'#' )
1018 itemList.add( item )
1019 for item
in m.OptItemList :
1020 hsh = item.find(
'#' )
1023 optItemList.add( item )
1025 itemList -= optItemList
1026 for item
in sorted( itemList ):
1027 self.log.info(
' adding ItemList Item to ts : %s' % ( item ) )
1028 self.ts.addItem( item )
1029 for item
in sorted( optItemList ):
1030 self.log.info(
' adding Optional Item to ts : %s' % ( item ) )
1031 self.ts.addOptItem( item )
1033 self.log.info(
'There is no Event Output for this app' )
1039 packet = self.evcom.receive( )
1042 if packet ==
'FINISHED' :
break
1043 evtNumber, tbin = packet
1044 if self.
cntr !=
None:
1045 self.cntr.setEventCounter( evtNumber )
1052 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1053 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1062 self.TS.Load( tbin )
1065 sc = self.a.executeEvent()
1069 self.
rTime += (time.time()-t)
1073 self.log.warning(
'Did not Execute Event')
1074 self.evt.clearStore()
1079 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
1080 self.evt.clearStore()
1086 tb = self.TS.Dump( )
1089 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1090 self.eventLoopSyncer.set()
1091 self.evt.clearStore( )
1092 self.log.info(
'Setting <Last> Event')
1093 self.lastEvent.set()
1095 self.evcom.finalize()
1096 self.log.info(
'Worker-%i Finished Processing Events'%(self.
nodeID) )
1098 self.filerecordsAgent.SendFileRecords()
1104 self.log.info(
'Join subworkers')
1109 For some output writers, a check is performed to see if the event has
1110 executed certain algorithms.
1111 These reside in the AcceptAlgs property for those writers
1117 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
1118 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
1119 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
1120 return (acc, req, vet)
1124 if self.a.algorithm( algName )._ialg.isExecuted()\
1125 and self.a.algorithm( algName )._ialg.filterPassed() :
1132 Check the algorithm status for an event.
1133 Depending on output writer settings, the event
1134 may be declined based on various criteria.
1135 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1139 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1140 if self.acceptAlgs :
1141 for name
in self.acceptAlgs :
1148 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
1149 for name
in self.requireAlgs :
1153 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
1156 self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
1161 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
1168 def __init__( self, queues, events, params, subworkers ) :
1169 GMPComponent.__init__(self,
'Writer', -2, queues, events, params, subworkers )
1174 self.log.name =
"Writer--2"
1180 self.config[
'ApplicationMgr' ].TopAlg = []
1181 self.config[
'EventSelector' ].Input = []
1183 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M'
1186 for key, lst
in self.writerDict.iteritems() :
1187 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
1194 self.log.debug(
'Processing Event Writer : %s'%(m) )
1195 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers )
1196 self.config[ m.key ].Output = newName
1206 self.log.debug(
'Processing FileRecords Writer: %s'%(m) )
1207 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers,
1208 extra=
" OPT='RECREATE'" )
1209 self.config[ m.key ].Output = newName
1212 hs =
"HistogramPersistencySvc"
1214 if hs
in self.config.keys() :
1215 n = self.config[ hs ].OutputFile
1217 newName=self.config[hs].OutputFile.replace(
'.',\
1218 '.p%i.'%(self.nWorkers))
1219 self.config[ hs ].OutputFile = newName
1225 libc = ctypes.CDLL(
'libc.so.6')
1227 libc.prctl(15,name,0,0,0)
1229 startEngine = time.time()
1237 stopCriteria = self.nWorkers
1239 current = (current+1)%self.nWorkers
1240 packet = self.
evcoms[current].receive( timeout=0.01 )
1243 if packet ==
'FINISHED' :
1244 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
1246 self.
status[current] =
True
1248 self.log.info(
'FINISHED recd from all workers, break loop')
1253 evtNumber, tbin = packet
1254 self.TS.Load( tbin )
1256 self.a.executeEvent()
1257 self.
rTime += ( time.time()-t )
1259 self.evt.clearStore( )
1260 self.eventLoopSyncer.set()
1261 self.log.name =
"Writer--2"
1262 self.log.info(
'Setting <Last> Event')
1263 self.lastEvent.set()
1266 [ e.finalize()
for e
in self.
evcoms ]
1268 sc = self.histoAgent.Receive()
1269 sc = self.histoAgent.RebuildHistoStore()
1270 if sc.isSuccess() : self.log.info(
'Histo Store rebuilt ok' )
1271 else : self.log.warning(
'Histo Store Error in Rebuild' )
1274 sc = self.filerecordsAgent.Receive()
1275 self.filerecordsAgent.Rebuild()
1292 self.log.name =
'GaudiPython-Parallel-Logger'
1293 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning' )
1303 self.
hq = JoinableQueue( )
1304 self.
fq = JoinableQueue( )
1308 limit=WAIT_INITIALISE,
1309 step=STEP_INITIALISE )
1312 limit=WAIT_SINGLE_EVENT,
1314 firstEvent=WAIT_FIRST_EVENT )
1316 limit=WAIT_FINALISE,
1317 step=STEP_FINALISE )
1328 self.subworkers.append( sub )
1335 self.system.append(self.
writer)
1336 self.system.append(wk)
1337 self.system.append(self.
reader)
1340 init = self.sInit.d[nodeID].event
1341 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1342 fin = self.sFin.d[nodeID].event
1343 return ( init, run, fin )
1346 eventQ = self.
qs[ nodeID ]
1349 return ( eventQ, histQ, fsrQ )
1354 self.log.name =
'GaudiPython-Parallel-Logger'
1355 self.log.info(
'INITIALISING SYSTEM' )
1361 sc = self.sInit.syncAll(step=
"Initialise")
1362 if sc == SUCCESS:
pass
1363 else : self.
Terminate() ;
return FAILURE
1366 self.log.name =
'GaudiPython-Parallel-Logger'
1367 self.log.info(
'RUNNING SYSTEM' )
1368 sc = self.sRun.syncAll(step=
"Run")
1369 if sc == SUCCESS:
pass
1370 else : self.
Terminate() ;
return FAILURE
1373 self.log.name =
'GaudiPython-Parallel-Logger'
1374 self.log.info(
'FINALISING SYSTEM' )
1375 sc = self.sFin.syncAll(step=
"Finalise")
1376 if sc == SUCCESS:
pass
1377 else : self.
Terminate() ;
return FAILURE
1380 self.log.info(
"Cleanly join all Processes" )
1382 self.log.info(
"Report Total Success to Main.py" )
1387 children = multiprocessing.active_children()
1397 self.system.reverse()
1410 rwk = JoinableQueue()
1412 workersWriter = [ JoinableQueue()
for i
in xrange(self.
nWorkers) ]
1415 d[-2] = (workersWriter,
None)
1416 for i
in xrange(self.
nWorkers) : d[i] = (rwk, workersWriter[i])
double sum(double x, double y, double z)
StatusCode execute()
The action to be performed by the algorithm on an event.
Python Algorithm base class.
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container
StatusCode finalize()
the default (empty) implementation of IStateful::finalize() method