2 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
3 from ROOT
import TBufferFile, TBuffer
5 from multiprocessing
import Process, Queue, JoinableQueue, Event
6 from multiprocessing
import cpu_count, current_process
7 from multiprocessing.queues
import Empty
10 from ROOT
import TParallelMergingFile
39 WAIT_INITIALISE = 60*5
40 WAIT_FIRST_EVENT = 60*3
41 WAIT_SINGLE_EVENT = 60*6
55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
59 aidatypes = ( gbl.AIDA.IHistogram,
60 gbl.AIDA.IHistogram1D,
61 gbl.AIDA.IHistogram2D,
62 gbl.AIDA.IHistogram3D,
65 gbl.AIDA.IBaseHistogram )
68 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
71 WRITERTYPES = {
'EvtCollectionStream' :
"tuples",
72 'InputCopyStream' :
"events",
73 'OutputStream' :
"events",
74 'RecordStream' :
"records",
75 'RunRecordStream' :
"records",
76 'SequentialOutputStream' :
"events",
77 'TagCollectionStream' :
"tuples" }
83 A class to represent a writer in the GaudiPython configuration 84 It can be non-trivial to access the name of the output file; it may be 85 specified in the DataSvc, or just on the writer, may be a list, or string 86 Also, there are three different types of writer (events, records, tuples) 87 so this bootstrap class provides easy access to this info while configuring 105 if hasattr( self.
w,
"Output" ) :
108 self.
set( self.
wName, self.w.Output )
115 if hasattr( datasvc,
"Output" ) :
127 assert replaceThis.__class__.__name__ ==
'str' 128 assert withThis.__class__.__name__ ==
'str' 131 if old.__class__.__name__ ==
'list' :
134 new = old.replace( replaceThis, withThis )
143 if hasattr( self.
w,
"ItemList" ) :
146 datasvc = config[ self.w.EvtDataSvc ]
147 if hasattr( datasvc,
"ItemList" ) :
150 if hasattr( self.
w,
"OptItemList" ) :
153 datasvc = config[ self.w.EvtDataSvc ]
154 if hasattr( datasvc,
"OptItemList" ) :
158 def set( self, key, output ) :
167 s +=
"Writer : %s\n"%( self.
wName )
168 s +=
"Writer Type : %s\n"%( self.
wType )
169 s +=
"Writer Output : %s\n"%( self.
output )
171 s +=
"DataSvc Output : %s\n"%( self.
svcOutput )
173 s +=
"Key for config : %s\n"%( self.
key )
174 s +=
"Output File : %s\n"%( self.
output )
175 s +=
"ItemList : %s\n"%( self.
ItemList )
184 GaudiPython algorithm used to clean up histos on the Reader and Workers 185 Only has a finalize method() 186 This retrieves a dictionary of path:histo objects and sends it to the 187 writer. It then waits for a None flag : THIS IS IMPORTANT, as if 188 the algorithm returns before ALL histos have been COMPLETELY RECEIVED 189 at the writer end, there will be an error. 192 PyAlgorithm.__init__( self )
199 self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
200 self._gmpc.hDict = self._gmpc.dumpHistograms( )
201 ks = self._gmpc.hDict.keys()
202 self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
208 reps = len(ks)/chunk + 1
209 for i
in xrange(reps) :
210 someKeys = ks[i*chunk : (i+1)*chunk]
211 smalld = dict( [(key, self._gmpc.hDict[key])
for key
in someKeys] )
212 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
214 self.log.debug(
'Signalling end of histos to Writer')
215 self._gmpc.hq.put(
'HISTOS_SENT' )
216 self.log.debug(
'Waiting on Sync Event' )
217 self._gmpc.sEvent.wait()
218 self.log.debug(
'Histo Sync Event set, clearing and returning' )
219 self._gmpc.hvt.clearStore()
220 root = gbl.DataObject()
222 self._gmpc.hvt.setRoot(
'/stat', root )
255 assert item.__class__.__name__ ==
'tuple' 256 startTransmission = time.time()
257 self.qout.put( item )
260 while self.qout._buffer : time.sleep( NAP )
261 self.
qoutTime += time.time()-startTransmission
268 startWait = time.time()
270 itemIn = self.qin.get( timeout=timeout )
273 self.
qinTime += time.time()-startWait
275 if itemIn.__class__.__name__ ==
'tuple' :
282 self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
283 %(self._gmpc.nodeType))
287 self.log.info(
'Finalize Event Communicator : %s %s'%(self.
_gmpc, self._gmpc.nodeType))
291 if self._gmpc.nodeType ==
'Reader' : downstream = self._gmpc.nWorkers
292 elif self._gmpc.nodeType ==
'Writer' : downstream = 0
293 elif self._gmpc.nodeType ==
'Worker' : downstream = 1
295 for i
in xrange(downstream) :
296 self.qout.put(
'FINISHED' )
297 if self._gmpc.nodeType !=
'Writer' :
303 self.log.name =
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
304 self.log.info (
'Items Sent : %i'%(self.
nSent) )
305 self.log.info (
'Items Received : %i'%(self.
nRecv) )
306 self.log.info (
'Data Sent : %i'%(self.
sizeSent) )
307 self.log.info (
'Data Received : %i'%(self.
sizeRecv) )
308 self.log.info (
'Q-out Time : %5.2f'%(self.
qoutTime) )
309 self.log.info (
'Q-in Time : %5.2f'%(self.
qinTime ) )
314 def __init__( self, gaudiTESSerializer, evtDataSvc,
315 nodeType, nodeID, log ) :
316 self.
T = gaudiTESSerializer
329 root = gbl.DataObject()
331 self.evt.setRoot(
'/Event', root )
333 self.T.loadBuffer( tbuf )
334 self.
tLoad += (time.time() - t)
336 self.buffersIn.append( tbuf.Length() )
339 tb = TBufferFile( TBuffer.kWrite )
340 self.T.dumpBuffer(tb)
341 self.
tDump += ( time.time()-t )
343 self.buffersOut.append( tb.Length() )
346 evIn =
"Events Loaded : %i"%( self.
nIn )
347 evOut =
"Events Dumped : %i"%( self.
nOut )
349 dataIn =
"Data Loaded : %i"%(din)
350 dataInMb =
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
352 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
353 %( din/(self.
nIn*MB) )
354 maxIn =
"Max Buf Loaded : %5.2f Mb"\
357 avgIn =
"Avg Buf Loaded : N/A" 358 maxIn =
"Max Buf Loaded : N/A" 360 dataOut =
"Data Dumped : %i"%(dout)
361 dataOutMb =
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
363 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
364 %( din/(self.
nOut*MB) )
365 maxOut =
"Max Buf Dumped : %5.2f Mb"\
368 avgOut =
"Avg Buf Dumped : N/A" 369 maxOut =
"Max Buf Dumped : N/A" 370 dumpTime =
"Total Dump Time : %5.2f"%( self.
tDump )
371 loadTime =
"Total Load Time : %5.2f"%( self.
tLoad )
385 self.log.name =
"%s-%i TESSerializer"%(self.
nodeType, self.
nodeID)
387 self.log.info( line )
399 def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
410 current_process().name = nodeType
417 self.nWorkers, self.sEvent, self.config, self.
log = params
428 ks = self.config.keys()
430 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
432 if k
in ks: self.
app = k
436 qPair, histq, fq = self.
queues 450 self.evcoms.append( ec )
496 from AlgSmapShot
import SmapShot
498 ss = SmapShot( logname=smapsLog )
499 self.a.addAlgorithm( ss )
500 self.
evt = self.a.evtsvc()
501 self.
hvt = self.a.histsvc()
502 self.
fsr = self.a.filerecordsvc()
503 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
504 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator' )
505 self.
ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.
pers )
516 root = gbl.DataObject()
518 self.evt.setRoot(
'/Event', root )
519 self.ts.loadBuffer(tbufferfile)
522 if self.
app !=
'Gauss':
528 lst = [
'/Event/Gen/Header',
529 '/Event/Rec/Header' ]
533 n = self.
evt[path].evtNumber()
543 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
550 if self.
nIn > 0
or self.
nOut > 0 :
553 self.log.warning(
'Could not determine Event Number')
566 keys = [
"events",
"records",
"tuples",
"histos" ]
571 wkeys = WRITERTYPES.keys()
572 for v
in self.config.values() :
573 if v.__class__.__name__
in wkeys :
574 writerType = WRITERTYPES[ v.__class__.__name__ ]
575 d[writerType].append(
MiniWriter(v, writerType, self.config) )
577 self.log.info(
'Writer Found : %s'%(v.name()))
580 if 'HistogramPersistencySvc' in self.config.keys() :
581 hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
582 d[
"histos" ].append( hfile )
587 Method used by the GaudiPython algorithm CollectHistos 588 to obtain a dictionary of form { path : object } 589 representing the Histogram Store 591 nlist = self.hvt.getHistoNames( )
593 objects = 0 ; histos = 0
597 if type(o)
in aidatypes :
604 print 'WARNING : no histograms to recover?' 614 if self.
app ==
'Gauss':
616 tool = self.a.tool(
"ToolSvc.EvtCounter" )
617 self.
cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
621 self.
iTime = time.time() - start
628 self.finalEvent.set()
629 self.
fTime = time.time() - start
633 allTime =
"Alive Time : %5.2f"%(self.
tTime)
634 initTime =
"Init Time : %5.2f"%(self.
iTime)
635 frstTime =
"1st Event Time : %5.2f"%(self.
firstEvTime)
636 runTime =
"Run Time : %5.2f"%(self.
rTime)
637 finTime =
"Finalise Time : %5.2f"%(self.
fTime)
638 tup = ( allTime, initTime, frstTime, runTime, finTime )
648 def __init__( self, queues, events, params, subworkers ) :
649 GMPComponent.__init__(self,
'Reader', -1, queues, events, params, subworkers )
656 self.config[
'ApplicationMgr' ].TopAlg = []
657 self.config[
'ApplicationMgr' ].OutStream = []
658 if "HistogramPersistencySvc" in self.config.keys() :
659 self.config[
'HistogramPersistencySvc' ].OutputFile =
'' 660 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M' 661 self.
evtMax = self.config[
'ApplicationMgr' ].EvtMax
664 tb = TBufferFile( TBuffer.kWrite )
666 self.ts.dumpBuffer( tb )
673 startFirst = time.time()
674 self.log.info(
'Reader : First Event')
676 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
682 if not bool(self.
evt[
'/Event']) :
683 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
689 lst = self.evt.getHistoNames()
692 lst = self.evt.getList()
693 if self.
app ==
"DaVinci":
694 daqnode = self.evt.retrieveObject(
'/Event/DAQ' ).
registry()
696 self.evt.getList( daqnode, lst, daqnode.address().
par() )
698 self.log.critical(
'Reader could not acquire TES List!')
701 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
706 self.log.info(
'First Event Sent')
709 self.eventLoopSyncer.set()
710 self.evt.clearStore( )
718 libc = ctypes.CDLL(
'libc.so.6')
720 libc.prctl(15,name,0,0,0)
723 startEngine = time.time()
724 self.log.name =
'Reader' 725 self.log.info(
'Reader Process starting')
732 self.log.info(
'Reader Beginning Distribution')
735 self.log.info(
'Reader First Event OK')
737 self.log.critical(
'Reader Failed on First Event')
744 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
747 if not self.stat.isSuccess() :
748 self.log.critical(
'Reader is Damaged!' )
753 self.
rTime += (time.time()-t)
754 if not bool(self.
evt[
'/Event']) :
755 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
762 self.eventLoopSyncer.set()
763 self.evt.clearStore( )
764 self.log.info(
'Setting <Last> Event')
768 self.log.info(
'Reader : Event Distribution complete.' )
769 self.evcom.finalize()
771 self.
tTime = time.time() - startEngine
776 def __init__( self, workerID, queues, events, params, subworkers ) :
777 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
782 self.log.info(
"Subworker-%i Created OK"%(self.
nodeID))
789 libc = ctypes.CDLL(
'libc.so.6')
791 libc.prctl(15,name,0,0,0)
794 startEngine = time.time()
795 msg = self.a.service(
'MessageSvc')
796 msg.Format =
'[' + self.log.name +
'] % F%18W%S%7W%R%T %0W%M' 798 self.log.name =
"Worker-%i"%(self.
nodeID)
799 self.log.info(
"Subworker %i starting Engine"%(self.
nodeID))
803 self.log.info(
'EVT WRITERS ON WORKER : %i'\
806 nEventWriters = len( self.
writerDict[
"events" ] )
812 packet = self.evcom.receive( )
815 if packet ==
'FINISHED' :
break 816 evtNumber, tbin = packet
817 if self.
cntr !=
None:
819 self.cntr.setEventCounter( evtNumber )
825 sc = self.a.executeEvent()
829 self.
rTime += (time.time()-t)
833 self.log.name =
"Worker-%i"%(self.
nodeID)
834 self.log.warning(
'Did not Execute Event')
835 self.evt.clearStore()
840 self.log.name =
"Worker-%i"%(self.
nodeID)
841 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
842 self.evt.clearStore()
851 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
852 self.eventLoopSyncer.set()
853 self.evt.clearStore( )
854 self.log.name =
"Worker-%i"%(self.
nodeID)
855 self.log.info(
'Setting <Last> Event %s' %(self.
nodeID))
858 self.evcom.finalize()
860 self.filerecordsAgent.SendFileRecords()
861 self.
tTime = time.time()-startEngine
872 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
882 For some output writers, a check is performed to see if the event has 883 executed certain algorithms. 884 These reside in the AcceptAlgs property for those writers 890 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
891 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
892 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
893 return (acc, req, vet)
897 if self.a.algorithm( algName )._ialg.isExecuted()\
898 and self.a.algorithm( algName )._ialg.filterPassed() :
905 Check the algorithm status for an event. 906 Depending on output writer settings, the event 907 may be declined based on various criteria. 908 This is a transcript of the check that occurs in GaudiSvc::OutputStream 912 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
914 for name
in self.acceptAlgs :
921 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
922 for name
in self.requireAlgs :
926 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
929 self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
934 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
940 def __init__( self, workerID, queues, events, params , subworkers ) :
941 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
946 self.log.name =
"Worker-%i"%(self.
nodeID)
947 self.log.info(
"Worker-%i Created OK"%(self.
nodeID))
956 self.config[
'EventSelector' ].Input = []
957 self.config[
'ApplicationMgr' ].OutStream = []
958 if "HistogramPersistencySvc" in self.config.keys() :
959 self.config[
'HistogramPersistencySvc' ].OutputFile =
'' 960 formatHead =
'[Worker-%i] '%(self.
nodeID)
961 self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M' 963 for key, lst
in self.writerDict.iteritems() :
964 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
969 newName = m.getNewName(
'.',
'.w%i.'%(self.
nodeID) )
970 self.config[ m.key ].Output = newName
980 if "ToolSvc.EvtCounter" not in self.config:
981 from Configurables
import EvtCounter
982 counter = EvtCounter()
984 counter = self.config[
"ToolSvc.EvtCounter"]
985 counter.UseIncident =
False 988 self.log.warning(
'Cannot configure EvtCounter')
995 libc = ctypes.CDLL(
'libc.so.6')
997 libc.prctl(15,name,0,0,0)
999 startEngine = time.time()
1000 self.log.info(
"Worker %i starting Engine"%(self.
nodeID))
1005 self.log.info(
'EVT WRITERS ON WORKER : %i'\
1008 nEventWriters = len( self.
writerDict[
"events" ] )
1013 for item
in m.ItemList :
1014 hsh = item.find(
'#' )
1017 itemList.add( item )
1018 for item
in m.OptItemList :
1019 hsh = item.find(
'#' )
1022 optItemList.add( item )
1024 itemList -= optItemList
1025 for item
in sorted( itemList ):
1026 self.log.info(
' adding ItemList Item to ts : %s' % ( item ) )
1027 self.ts.addItem( item )
1028 for item
in sorted( optItemList ):
1029 self.log.info(
' adding Optional Item to ts : %s' % ( item ) )
1030 self.ts.addOptItem( item )
1032 self.log.info(
'There is no Event Output for this app' )
1038 packet = self.evcom.receive( )
1041 if packet ==
'FINISHED' :
break 1042 evtNumber, tbin = packet
1043 if self.
cntr !=
None:
1044 self.cntr.setEventCounter( evtNumber )
1051 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1052 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1061 self.TS.Load( tbin )
1064 sc = self.a.executeEvent()
1068 self.
rTime += (time.time()-t)
1072 self.log.warning(
'Did not Execute Event')
1073 self.evt.clearStore()
1078 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
1079 self.evt.clearStore()
1085 tb = self.TS.Dump( )
1088 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1089 self.eventLoopSyncer.set()
1090 self.evt.clearStore( )
1091 self.log.info(
'Setting <Last> Event')
1092 self.lastEvent.set()
1094 self.evcom.finalize()
1095 self.log.info(
'Worker-%i Finished Processing Events'%(self.
nodeID) )
1097 self.filerecordsAgent.SendFileRecords()
1103 self.log.info(
'Join subworkers')
1108 For some output writers, a check is performed to see if the event has 1109 executed certain algorithms. 1110 These reside in the AcceptAlgs property for those writers 1116 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
1117 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
1118 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
1119 return (acc, req, vet)
1123 if self.a.algorithm( algName )._ialg.isExecuted()\
1124 and self.a.algorithm( algName )._ialg.filterPassed() :
1131 Check the algorithm status for an event. 1132 Depending on output writer settings, the event 1133 may be declined based on various criteria. 1134 This is a transcript of the check that occurs in GaudiSvc::OutputStream 1138 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1139 if self.acceptAlgs :
1140 for name
in self.acceptAlgs :
1147 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
1148 for name
in self.requireAlgs :
1152 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
1155 self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
1160 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
1167 def __init__( self, queues, events, params, subworkers ) :
1168 GMPComponent.__init__(self,
'Writer', -2, queues, events, params, subworkers )
1173 self.log.name =
"Writer--2" 1179 self.config[
'ApplicationMgr' ].TopAlg = []
1180 self.config[
'EventSelector' ].Input = []
1182 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M' 1185 for key, lst
in self.writerDict.iteritems() :
1186 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
1193 self.log.debug(
'Processing Event Writer : %s'%(m) )
1194 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers )
1195 self.config[ m.key ].Output = newName
1205 self.log.debug(
'Processing FileRecords Writer: %s'%(m) )
1206 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers,
1207 extra=
" OPT='RECREATE'" )
1208 self.config[ m.key ].Output = newName
1211 hs =
"HistogramPersistencySvc" 1213 if hs
in self.config.keys() :
1214 n = self.config[ hs ].OutputFile
1216 newName=self.config[hs].OutputFile.replace(
'.',\
1217 '.p%i.'%(self.nWorkers))
1218 self.config[ hs ].OutputFile = newName
1224 libc = ctypes.CDLL(
'libc.so.6')
1226 libc.prctl(15,name,0,0,0)
1228 startEngine = time.time()
1236 stopCriteria = self.nWorkers
1238 current = (current+1)%self.nWorkers
1239 packet = self.
evcoms[current].receive( timeout=0.01 )
1242 if packet ==
'FINISHED' :
1243 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
1245 self.
status[current] =
True 1247 self.log.info(
'FINISHED recd from all workers, break loop')
1252 evtNumber, tbin = packet
1253 self.TS.Load( tbin )
1255 self.a.executeEvent()
1256 self.
rTime += ( time.time()-t )
1258 self.evt.clearStore( )
1259 self.eventLoopSyncer.set()
1260 self.log.name =
"Writer--2" 1261 self.log.info(
'Setting <Last> Event')
1262 self.lastEvent.set()
1265 [ e.finalize()
for e
in self.
evcoms ]
1267 sc = self.histoAgent.Receive()
1268 sc = self.histoAgent.RebuildHistoStore()
1269 if sc.isSuccess() : self.log.info(
'Histo Store rebuilt ok' )
1270 else : self.log.warning(
'Histo Store Error in Rebuild' )
1273 sc = self.filerecordsAgent.Receive()
1274 self.filerecordsAgent.Rebuild()
1291 self.log.name =
'GaudiPython-Parallel-Logger' 1292 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning' )
1302 self.
hq = JoinableQueue( )
1303 self.
fq = JoinableQueue( )
1307 limit=WAIT_INITIALISE,
1308 step=STEP_INITIALISE )
1311 limit=WAIT_SINGLE_EVENT,
1313 firstEvent=WAIT_FIRST_EVENT )
1315 limit=WAIT_FINALISE,
1316 step=STEP_FINALISE )
1327 self.subworkers.append( sub )
1334 self.system.append(self.
writer)
1335 self.system.append(wk)
1336 self.system.append(self.
reader)
1339 init = self.sInit.d[nodeID].event
1340 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1341 fin = self.sFin.d[nodeID].event
1342 return ( init, run, fin )
1345 eventQ = self.
qs[ nodeID ]
1348 return ( eventQ, histQ, fsrQ )
1353 self.log.name =
'GaudiPython-Parallel-Logger' 1354 self.log.info(
'INITIALISING SYSTEM' )
1360 sc = self.sInit.syncAll(step=
"Initialise")
1361 if sc == SUCCESS:
pass 1362 else : self.
Terminate() ;
return FAILURE
1365 self.log.name =
'GaudiPython-Parallel-Logger' 1366 self.log.info(
'RUNNING SYSTEM' )
1367 sc = self.sRun.syncAll(step=
"Run")
1368 if sc == SUCCESS:
pass 1369 else : self.
Terminate() ;
return FAILURE
1372 self.log.name =
'GaudiPython-Parallel-Logger' 1373 self.log.info(
'FINALISING SYSTEM' )
1374 sc = self.sFin.syncAll(step=
"Finalise")
1375 if sc == SUCCESS:
pass 1376 else : self.
Terminate() ;
return FAILURE
1379 self.log.info(
"Cleanly join all Processes" )
1381 self.log.info(
"Report Total Success to Main.py" )
1386 children = multiprocessing.active_children()
1396 self.system.reverse()
1409 rwk = JoinableQueue()
1411 workersWriter = [ JoinableQueue()
for i
in xrange(self.
nWorkers) ]
1414 d[-2] = (workersWriter,
None)
1415 for i
in xrange(self.
nWorkers) : d[i] = (rwk, workersWriter[i])
def __init__(self, nodeType, nodeID, queues, events, params, subworkers)
def __init__(self, queues, events, params, subworkers)
def getNewName(self, replaceThis, withThis, extra='')
StatusCode finalize() override
StatusCode execute() override
def receive(self, timeout=None)
def __init__(self, GMPComponent, qin, qout)
double sum(double x, double y, double z)
def __init__(self, gmpcomponent)
def IdentifyWriters(self)
def __init__(self, workerID, queues, events, params, subworkers)
def checkExecutedPassed(self, algName)
def processConfiguration(self)
def LoadTES(self, tbufferfile)
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container
def getItemLists(self, config)
def __init__(self, queues, events, params, subworkers)
def set(self, key, output)
def __init__(self, nWorkers, config, log)
def StartGaudiPython(self)
def __init__(self, writer, wType, config)
def processConfiguration(self)
def getSyncEvents(self, nodeID)
def SetServices(self, a, evt, hvt, fsr, inc, pers, ts, cntr)
def __init__(self, workerID, queues, events, params, subworkers)
Python Algorithm base class.
def processConfiguration(self)
def SetupGaudiPython(self)
def getQueues(self, nodeID)
def checkExecutedPassed(self, algName)
def __init__(self, gaudiTESSerializer, evtDataSvc, nodeType, nodeID, log)
def processConfiguration(self)