2 from Configurables
import EvtCounter
3 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
4 from ROOT
import TBufferFile, TBuffer
6 from multiprocessing
import Process, Queue, JoinableQueue, Event
7 from multiprocessing
import cpu_count, current_process
8 from multiprocessing.queues
import Empty
40 WAIT_INITIALISE = 60*5
41 WAIT_FIRST_EVENT = 60*3
42 WAIT_SINGLE_EVENT = 60*6
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
60 aidatypes = ( gbl.AIDA.IHistogram,
61 gbl.AIDA.IHistogram1D,
62 gbl.AIDA.IHistogram2D,
63 gbl.AIDA.IHistogram3D,
66 gbl.AIDA.IBaseHistogram )
69 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
72 WRITERTYPES = {
'EvtCollectionStream' :
"tuples",
73 'InputCopyStream' :
"events",
74 'OutputStream' :
"events",
75 'RecordStream' :
"records",
76 'RunRecordStream' :
"records",
77 'SequentialOutputStream' :
"events",
78 'TagCollectionStream' :
"tuples" }
84 A class to represent a writer in the GaudiPython configuration
85 It can be non-trivial to access the name of the output file; it may be
86 specified in the DataSvc, or just on the writer, may be a list, or string
87 Also, there are three different types of writer (events, records, tuples)
88 so this bootstrap class provides easy access to this info while configuring
106 if hasattr( self.
w,
"Output" ) :
109 self.
set( self.
wName, self.w.Output )
116 if hasattr( datasvc,
"Output" ) :
128 assert replaceThis.__class__.__name__ ==
'str'
129 assert withThis.__class__.__name__ ==
'str'
132 if old.__class__.__name__ ==
'list' :
135 new = old.replace( replaceThis, withThis )
144 if hasattr( self.
w,
"ItemList" ) :
147 datasvc = config[ self.w.EvtDataSvc ]
148 if hasattr( datasvc,
"ItemList" ) :
151 if hasattr( self.
w,
"OptItemList" ) :
154 datasvc = config[ self.w.EvtDataSvc ]
155 if hasattr( datasvc,
"OptItemList" ) :
159 def set( self, key, output ) :
168 s +=
"Writer : %s\n"%( self.
wName )
169 s +=
"Writer Type : %s\n"%( self.
wType )
170 s +=
"Writer Output : %s\n"%( self.
output )
172 s +=
"DataSvc Output : %s\n"%( self.
svcOutput )
174 s +=
"Key for config : %s\n"%( self.
key )
175 s +=
"Output File : %s\n"%( self.
output )
176 s +=
"ItemList : %s\n"%( self.
ItemList )
185 GaudiPython algorithm used to clean up histos on the Reader and Workers
186 Only has a finalize method()
187 This retrieves a dictionary of path:histo objects and sends it to the
188 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
189 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
190 at the writer end, there will be an error.
193 PyAlgorithm.__init__( self )
200 self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
201 self._gmpc.hDict = self._gmpc.dumpHistograms( )
202 ks = self._gmpc.hDict.keys()
203 self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
208 reps = len(ks)/chunk + 1
209 for i
in xrange(reps) :
210 someKeys = ks[i*chunk : (i+1)*chunk]
211 smalld = dict( [(key, self._gmpc.hDict[key])
for key
in someKeys] )
212 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
214 self.log.debug(
'Signalling end of histos to Writer')
215 self._gmpc.hq.put(
'HISTOS_SENT' )
216 self.log.debug(
'Waiting on Sync Event' )
217 self._gmpc.sEvent.wait()
218 self.log.debug(
'Histo Sync Event set, clearing and returning' )
219 self._gmpc.hvt.clearStore()
220 root = gbl.DataObject()
222 self._gmpc.hvt.setRoot(
'/stat', root )
255 assert item.__class__.__name__ ==
'tuple'
256 startTransmission = time.time()
257 self.qout.put( item )
260 while self.qout._buffer : time.sleep( NAP )
261 self.
qoutTime += time.time()-startTransmission
268 startWait = time.time()
270 itemIn = self.qin.get( timeout=timeout )
273 self.
qinTime += time.time()-startWait
275 if itemIn.__class__.__name__ ==
'tuple' :
282 self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
283 %(self._gmpc.nodeType))
287 self.log.info(
'Finalize Event Communicator : %s'%(self._gmpc.nodeType))
291 if self._gmpc.nodeType ==
'Reader' : downstream = self._gmpc.nWorkers
292 elif self._gmpc.nodeType ==
'Writer' : downstream = 0
293 elif self._gmpc.nodeType ==
'Worker' : downstream = 1
294 for i
in xrange(downstream) :
295 self.qout.put(
'FINISHED' )
296 if self._gmpc.nodeType !=
'Writer' :
302 self.log.name =
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
303 self.log.info (
'Items Sent : %i'%(self.
nSent) )
304 self.log.info (
'Items Received : %i'%(self.
nRecv) )
305 self.log.info (
'Data Sent : %i'%(self.
sizeSent) )
306 self.log.info (
'Data Received : %i'%(self.
sizeRecv) )
307 self.log.info (
'Q-out Time : %5.2f'%(self.
qoutTime) )
308 self.log.info (
'Q-in Time : %5.2f'%(self.
qinTime ) )
313 def __init__( self, gaudiTESSerializer, evtDataSvc,
314 nodeType, nodeID, log ) :
315 self.
T = gaudiTESSerializer
328 root = gbl.DataObject()
330 self.evt.setRoot(
'/Event', root )
332 self.T.loadBuffer( tbuf )
333 self.
tLoad += (time.time() - t)
335 self.buffersIn.append( tbuf.Length() )
338 tb = TBufferFile( TBuffer.kWrite )
339 self.T.dumpBuffer(tb)
340 self.
tDump += ( time.time()-t )
342 self.buffersOut.append( tb.Length() )
345 evIn =
"Events Loaded : %i"%( self.
nIn )
346 evOut =
"Events Dumped : %i"%( self.
nOut )
348 dataIn =
"Data Loaded : %i"%(din)
349 dataInMb =
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
351 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
352 %( din/(self.
nIn*MB) )
353 maxIn =
"Max Buf Loaded : %5.2f Mb"\
356 avgIn =
"Avg Buf Loaded : N/A"
357 maxIn =
"Max Buf Loaded : N/A"
359 dataOut =
"Data Dumped : %i"%(dout)
360 dataOutMb =
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
362 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
363 %( din/(self.
nOut*MB) )
364 maxOut =
"Max Buf Dumped : %5.2f Mb"\
367 avgOut =
"Avg Buf Dumped : N/A"
368 maxOut =
"Max Buf Dumped : N/A"
369 dumpTime =
"Total Dump Time : %5.2f"%( self.
tDump )
370 loadTime =
"Total Load Time : %5.2f"%( self.
tLoad )
384 self.log.name =
"%s-%i TESSerializer"%(self.
nodeType, self.
nodeID)
386 self.log.info( line )
398 def __init__( self, nodeType, nodeID, queues, events, params ) :
409 current_process().name = nodeType
416 self.nWorkers, self.sEvent, self.config, self.
log = params
423 qPair, histq, fq = queues
437 self.evcoms.append( ec )
467 ks = self.config.keys()
469 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
471 if k
in ks: self.
app = k
491 from AlgSmapShot
import SmapShot
493 ss = SmapShot( logname=smapsLog )
494 self.a.addAlgorithm( ss )
495 self.
evt = self.a.evtsvc()
496 self.
hvt = self.a.histsvc()
497 self.
fsr = self.a.filerecordsvc()
498 self.
inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
499 self.
pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator' )
500 self.
ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.
pers )
511 root = gbl.DataObject()
513 self.evt.setRoot(
'/Event', root )
514 self.ts.loadBuffer(tbufferfile)
517 if self.
app !=
'Gauss':
523 lst = [
'/Event/Gen/Header',
524 '/Event/Rec/Header' ]
528 n = self.
evt[path].evtNumber()
537 n = self.
evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
543 if self.
nIn > 0
or self.
nOut > 0 :
546 self.log.warning(
'Could not determine Event Number')
551 self.log.info(
'Evt Number %d' % self.
num )
559 keys = [
"events",
"records",
"tuples",
"histos" ]
564 wkeys = WRITERTYPES.keys()
565 for v
in self.config.values() :
566 if v.__class__.__name__
in wkeys :
567 writerType = WRITERTYPES[ v.__class__.__name__ ]
568 d[writerType].append(
MiniWriter(v, writerType, self.config) )
570 self.log.info(
'Writer Found : %s'%(v.name()))
573 if 'HistogramPersistencySvc' in self.config.keys() :
574 hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
575 d[
"histos" ].append( hfile )
580 Method used by the GaudiPython algorithm CollectHistos
581 to obtain a dictionary of form { path : object }
582 representing the Histogram Store
584 nlist = self.hvt.getHistoNames( )
586 objects = 0 ; histos = 0
590 if type(o)
in aidatypes :
597 print 'WARNING : no histograms to recover?'
609 tool = self.a.tool(
"ToolSvc.EvtCounter" )
610 self.
cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
611 self.log.info(
"interface %s" %self.
cntr)
615 self.
iTime = time.time() - start
622 self.finalEvent.set()
623 self.
fTime = time.time() - start
627 allTime =
"Alive Time : %5.2f"%(self.
tTime)
628 initTime =
"Init Time : %5.2f"%(self.
iTime)
629 frstTime =
"1st Event Time : %5.2f"%(self.
firstEvTime)
630 runTime =
"Run Time : %5.2f"%(self.
rTime)
631 finTime =
"Finalise Time : %5.2f"%(self.
fTime)
632 tup = ( allTime, initTime, frstTime, runTime, finTime )
643 GMPComponent.__init__(self,
'Reader', -1, queues, events, params )
650 self.config[
'ApplicationMgr' ].TopAlg = []
651 self.config[
'ApplicationMgr' ].OutStream = []
652 if "HistogramPersistencySvc" in self.config.keys() :
653 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
654 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M'
655 self.
evtMax = self.config[
'ApplicationMgr' ].EvtMax
658 tb = TBufferFile( TBuffer.kWrite )
660 self.ts.dumpBuffer( tb )
667 startFirst = time.time()
668 self.log.info(
'Reader : First Event')
670 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
676 if not bool(self.
evt[
'/Event']) :
677 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
683 lst = self.evt.getHistoNames()
686 lst = self.evt.getList()
687 if self.
app ==
"DaVinci":
688 daqnode = self.evt.retrieveObject(
'/Event/DAQ' ).registry()
690 self.evt.getList( daqnode, lst, daqnode.address().
par() )
692 self.log.critical(
'Reader could not acquire TES List!')
695 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
700 self.log.info(
'First Event Sent')
703 self.eventLoopSyncer.set()
704 self.evt.clearStore( )
710 startEngine = time.time()
711 self.log.name =
'Reader'
712 self.log.info(
'Reader Process starting')
719 self.log.info(
'Reader Beginning Distribution')
722 self.log.info(
'Reader First Event OK')
724 self.log.critical(
'Reader Failed on First Event')
731 self.log.info(
'evtMax( %i ) reached'%(self.
evtMax))
734 if not self.stat.isSuccess() :
735 self.log.critical(
'Reader is Damaged!' )
740 self.
rTime += (time.time()-t)
741 if not bool(self.
evt[
'/Event']) :
742 self.log.warning(
'No More Events! (So Far : %i)'%(self.
nOut))
749 self.eventLoopSyncer.set()
750 self.evt.clearStore( )
751 self.log.info(
'Setting <Last> Event')
755 self.log.info(
'Reader : Event Distribution complete.' )
756 self.evcom.finalize()
758 self.
tTime = time.time() - startEngine
764 def __init__( self, workerID, queues, events, params ) :
765 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params)
770 self.log.debug(
"Worker-%i Created OK"%(self.
nodeID))
778 self.config[
'EventSelector' ].Input = []
779 self.config[
'ApplicationMgr' ].OutStream = []
780 if "HistogramPersistencySvc" in self.config.keys() :
781 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
782 formatHead =
'[Worker-%i] '%(self.
nodeID)
783 self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M'
785 for key, lst
in self.writerDict.iteritems() :
786 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
791 newName = m.getNewName(
'.',
'.w%i.'%(self.
nodeID) )
792 self.config[ m.key ].Output = newName
801 if "ToolSvc.EvtCounter" not in self.config:
802 from Configurables
import EvtCounter
803 counter = EvtCounter()
805 counter = self.config[
"ToolSvc.EvtCounter"]
806 counter.UseIncident =
False
809 self.log.warning(
'Cannot configure EvtCounter')
812 startEngine = time.time()
813 self.log.name =
"Worker-%i"%(self.
nodeID)
814 self.log.info(
"Worker %i starting Engine"%(self.
nodeID))
819 self.log.info(
'EVT WRITERS ON WORKER : %i'\
822 nEventWriters = len( self.
writerDict[
"events" ] )
827 for item
in m.ItemList :
828 hsh = item.find(
'#' )
832 for item
in m.OptItemList :
833 hsh = item.find(
'#' )
836 optItemList.add( item )
838 itemList -= optItemList
839 for item
in sorted( itemList ):
840 self.log.info(
' adding ItemList Item to ts : %s' % ( item ) )
841 self.ts.addItem( item )
842 for item
in sorted( optItemList ):
843 self.log.info(
' adding Optional Item to ts : %s' % ( item ) )
844 self.ts.addOptItem( item )
846 self.log.info(
'There is no Event Output for this app' )
853 self.log.name =
"Worker-%i"%(self.
nodeID)
856 packet = self.evcom.receive( )
859 if packet ==
'FINISHED' :
break
860 evtNumber, tbin = packet
861 if self.
cntr !=
None:
863 self.cntr.setEventCounter( evtNumber )
869 sc = self.a.executeEvent()
873 self.
rTime += (time.time()-t)
877 self.log.warning(
'Did not Execute Event')
878 self.evt.clearStore()
883 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
884 self.evt.clearStore()
893 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
894 self.eventLoopSyncer.set()
895 self.evt.clearStore( )
896 self.log.info(
'Setting <Last> Event')
899 self.evcom.finalize()
900 self.log.info(
'Worker-%i Finished Processing Events'%(self.
nodeID) )
902 self.filerecordsAgent.SendFileRecords()
904 self.
tTime = time.time()-startEngine
909 For some output writers, a check is performed to see if the event has
910 executed certain algorithms.
911 These reside in the AcceptAlgs property for those writers
917 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
918 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
919 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
920 return (acc, req, vet)
924 if self.a.algorithm( algName )._ialg.isExecuted()\
925 and self.a.algorithm( algName )._ialg.filterPassed() :
932 Check the algorithm status for an event.
933 Depending on output writer settings, the event
934 may be declined based on various criteria.
935 This is a transcript of the check that occurs in GaudiSvc::OutputStream
939 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
941 for name
in self.acceptAlgs :
948 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
949 for name
in self.requireAlgs :
953 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
956 self.log.debug(
'self.vetoAlgs is %s'%(str(self.
vetoAlgs)))
961 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
969 GMPComponent.__init__(self,
'Writer', -2, queues, events, params)
974 self.log.name =
"Writer--2"
980 self.config[
'ApplicationMgr' ].TopAlg = []
981 self.config[
'EventSelector' ].Input = []
983 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M'
986 for key, lst
in self.writerDict.iteritems() :
987 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
994 self.log.debug(
'Processing Event Writer : %s'%(m) )
995 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers )
996 self.config[ m.key ].Output = newName
1006 self.log.debug(
'Processing FileRecords Writer: %s'%(m) )
1007 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers,
1008 extra=
" OPT='RECREATE'" )
1009 self.config[ m.key ].Output = newName
1012 hs =
"HistogramPersistencySvc"
1014 if hs
in self.config.keys() :
1015 n = self.config[ hs ].OutputFile
1017 newName=self.config[hs].OutputFile.replace(
'.',\
1018 '.p%i.'%(self.nWorkers))
1019 self.config[ hs ].OutputFile = newName
1022 startEngine = time.time()
1030 stopCriteria = self.nWorkers
1032 current = (current+1)%self.nWorkers
1033 packet = self.
evcoms[current].receive( timeout=0.01 )
1036 if packet ==
'FINISHED' :
1037 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
1038 self.
status[current] =
True
1040 self.log.info(
'FINISHED recd from all workers, break loop')
1045 evtNumber, tbin = packet
1046 self.TS.Load( tbin )
1048 self.a.executeEvent()
1049 self.
rTime += ( time.time()-t )
1051 self.evt.clearStore( )
1052 self.eventLoopSyncer.set()
1053 self.log.name =
"Writer--2"
1054 self.log.info(
'Setting <Last> Event')
1055 self.lastEvent.set()
1058 [ e.finalize()
for e
in self.
evcoms ]
1060 sc = self.histoAgent.Receive()
1061 sc = self.histoAgent.RebuildHistoStore()
1062 if sc.isSuccess() : self.log.info(
'Histo Store rebuilt ok' )
1063 else : self.log.warning(
'Histo Store Error in Rebuild' )
1066 sc = self.filerecordsAgent.Receive()
1067 self.filerecordsAgent.Rebuild()
1084 self.log.name =
'GaudiPython-Parallel-Logger'
1085 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning' )
1095 self.
hq = JoinableQueue( )
1096 self.
fq = JoinableQueue( )
1100 limit=WAIT_INITIALISE,
1101 step=STEP_INITIALISE )
1104 limit=WAIT_SINGLE_EVENT,
1106 firstEvent=WAIT_FIRST_EVENT )
1108 limit=WAIT_FINALISE,
1109 step=STEP_FINALISE )
1121 self.workers.append( wk )
1125 self.system.append(self.
writer)
1126 [ self.system.append(w)
for w
in self.
workers ]
1127 self.system.append(self.
reader)
1130 init = self.sInit.d[nodeID].event
1131 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1132 fin = self.sFin.d[nodeID].event
1133 return ( init, run, fin )
1136 eventQ = self.
qs[ nodeID ]
1139 return ( eventQ, histQ, fsrQ )
1144 self.log.name =
'GaudiPython-Parallel-Logger'
1145 self.log.info(
'INITIALISING SYSTEM' )
1148 sc = self.sInit.syncAll(step=
"Initialise")
1149 if sc == SUCCESS:
pass
1150 else : self.
Terminate() ;
return FAILURE
1153 self.log.name =
'GaudiPython-Parallel-Logger'
1154 self.log.info(
'RUNNING SYSTEM' )
1155 sc = self.sRun.syncAll(step=
"Run")
1156 if sc == SUCCESS:
pass
1157 else : self.
Terminate() ;
return FAILURE
1160 self.log.name =
'GaudiPython-Parallel-Logger'
1161 self.log.info(
'FINALISING SYSTEM' )
1162 sc = self.sFin.syncAll(step=
"Finalise")
1163 if sc == SUCCESS:
pass
1164 else : self.
Terminate() ;
return FAILURE
1167 self.log.info(
"Cleanly join all Processes" )
1169 self.log.info(
"Report Total Success to Main.py" )
1174 children = multiprocessing.active_children()
1184 self.system.reverse()
1197 rwk = JoinableQueue()
1199 workersWriter = [ JoinableQueue()
for i
in xrange(self.
nWorkers) ]
1202 d[-2] = (workersWriter,
None)
1203 for i
in xrange(self.
nWorkers) : d[i] = (rwk, workersWriter[i])