2 from Configurables
import EvtCounter
3 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
4 from ROOT
import TBufferFile, TBuffer
6 from multiprocessing
import Process, Queue, JoinableQueue, Event
7 from multiprocessing
import cpu_count, current_process
8 from multiprocessing.queues
import Empty
11 from ROOT
import TParallelMergingFile
40 WAIT_INITIALISE = 60*5
41 WAIT_FIRST_EVENT = 60*3
42 WAIT_SINGLE_EVENT = 60*6
56 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
60 aidatypes = ( gbl.AIDA.IHistogram,
61 gbl.AIDA.IHistogram1D,
62 gbl.AIDA.IHistogram2D,
63 gbl.AIDA.IHistogram3D,
66 gbl.AIDA.IBaseHistogram )
69 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
72 WRITERTYPES = {
'EvtCollectionStream' :
"tuples",
73 'InputCopyStream' :
"events",
74 'OutputStream' :
"events",
75 'RecordStream' :
"records",
76 'RunRecordStream' :
"records",
77 'SequentialOutputStream' :
"events",
78 'TagCollectionStream' :
"tuples" }
82 class MiniWriter( object ) :
84 A class to represent a writer in the GaudiPython configuration
85 It can be non-trivial to access the name of the output file; it may be
86 specified in the DataSvc, or just on the writer, may be a list, or string
87 Also, there are three different types of writer (events, records, tuples)
88 so this bootstrap class provides easy access to this info while configuring
90 def __init__( self, writer, wType, config ) :
99 self.OptItemList =
None
101 self.wName = writer.getName()
104 self.datasvcName =
None
105 self.svcOutput =
None
106 if hasattr( self.w,
"Output" ) :
107 self.woutput = self.w.Output
108 self.getItemLists( config )
109 self.set( self.wName, self.w.Output )
114 self.datasvcName = self.w.EvtDataSvc
115 datasvc = config[ self.datasvcName ]
116 if hasattr( datasvc,
"Output" ) :
117 self.getItemLists( config )
118 self.set( self.datasvcName, datasvc.Output )
121 def getNewName( self, replaceThis, withThis, extra='' ) :
128 assert replaceThis.__class__.__name__ ==
'str'
129 assert withThis.__class__.__name__ ==
'str'
132 if old.__class__.__name__ ==
'list' :
135 new = old.replace( replaceThis, withThis )
142 def getItemLists( self, config ) :
144 if hasattr( self.w,
"ItemList" ) :
145 self.ItemList = self.w.ItemList
147 datasvc = config[ self.w.EvtDataSvc ]
148 if hasattr( datasvc,
"ItemList" ) :
149 self.ItemList = datasvc.ItemList
151 if hasattr( self.w,
"OptItemList" ) :
152 self.OptItemList = self.w.OptItemList
154 datasvc = config[ self.w.EvtDataSvc ]
155 if hasattr( datasvc,
"OptItemList" ) :
156 self.OptItemList = datasvc.OptItemList
159 def set( self, key, output ) :
164 def __repr__( self ) :
168 s +=
"Writer : %s\n"%( self.wName )
169 s +=
"Writer Type : %s\n"%( self.wType )
170 s +=
"Writer Output : %s\n"%( self.output )
171 s +=
"DataSvc : %s\n"%( self.datasvcName )
172 s +=
"DataSvc Output : %s\n"%( self.svcOutput )
174 s +=
"Key for config : %s\n"%( self.key )
175 s +=
"Output File : %s\n"%( self.output )
176 s +=
"ItemList : %s\n"%( self.ItemList )
177 s +=
"OptItemList : %s\n"%( self.OptItemList )
183 class CollectHistograms( PyAlgorithm ) :
185 GaudiPython algorithm used to clean up histos on the Reader and Workers
186 Only has a finalize method()
187 This retrieves a dictionary of path:histo objects and sends it to the
188 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
189 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
190 at the writer end, there will be an error.
192 def __init__( self, gmpcomponent ) :
193 PyAlgorithm.__init__( self )
194 self._gmpc = gmpcomponent
195 self.log = self._gmpc.log
199 def finalize( self ) :
200 self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
201 self._gmpc.hDict = self._gmpc.dumpHistograms( )
202 ks = self._gmpc.hDict.keys()
203 self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
209 reps = len(ks)/chunk + 1
210 for i
in xrange(reps) :
211 someKeys = ks[i*chunk : (i+1)*chunk]
212 smalld = dict( [(key, self._gmpc.hDict[key])
for key
in someKeys] )
213 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
215 self.log.debug(
'Signalling end of histos to Writer')
216 self._gmpc.hq.put(
'HISTOS_SENT' )
217 self.log.debug(
'Waiting on Sync Event' )
218 self._gmpc.sEvent.wait()
219 self.log.debug(
'Histo Sync Event set, clearing and returning' )
220 self._gmpc.hvt.clearStore()
221 root = gbl.DataObject()
223 self._gmpc.hvt.setRoot(
'/stat', root )
228 class EventCommunicator( object ) :
232 def __init__( self, GMPComponent, qin, qout ) :
233 self._gmpc = GMPComponent
234 self.log = self._gmpc.log
253 def send( self, item ) :
256 assert item.__class__.__name__ ==
'tuple'
257 startTransmission = time.time()
258 self.qout.put( item )
261 while self.qout._buffer : time.sleep( NAP )
262 self.qoutTime += time.time()-startTransmission
263 self.sizeSent += item[1].Length()
267 def receive( self, timeout=None ) :
269 startWait = time.time()
271 itemIn = self.qin.get( timeout=timeout )
274 self.qinTime += time.time()-startWait
276 if itemIn.__class__.__name__ ==
'tuple' :
277 self.sizeRecv += itemIn[1].Length()
283 self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
284 %(self._gmpc.nodeType))
287 def finalize( self ) :
288 self.log.info(
'Finalize Event Communicator : %s %s'%(self._gmpc, self._gmpc.nodeType))
292 if self._gmpc.nodeType ==
'Reader' : downstream = self._gmpc.nWorkers
293 elif self._gmpc.nodeType ==
'Writer' : downstream = 0
294 elif self._gmpc.nodeType ==
'Worker' : downstream = 1
296 for i
in xrange(downstream) :
297 self.qout.put(
'FINISHED' )
298 if self._gmpc.nodeType !=
'Writer' :
303 def statistics( self ) :
304 self.log.name =
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
305 self.log.info (
'Items Sent : %i'%(self.nSent) )
306 self.log.info (
'Items Received : %i'%(self.nRecv) )
307 self.log.info (
'Data Sent : %i'%(self.sizeSent) )
308 self.log.info (
'Data Received : %i'%(self.sizeRecv) )
309 self.log.info (
'Q-out Time : %5.2f'%(self.qoutTime) )
310 self.log.info (
'Q-in Time : %5.2f'%(self.qinTime ) )
314 class TESSerializer( object ) :
315 def __init__( self, gaudiTESSerializer, evtDataSvc,
316 nodeType, nodeID, log ) :
317 self.T = gaudiTESSerializer
318 self.evt = evtDataSvc
326 self.nodeType = nodeType
329 def Load( self, tbuf ) :
330 root = gbl.DataObject()
332 self.evt.setRoot(
'/Event', root )
334 self.T.loadBuffer( tbuf )
335 self.tLoad += (time.time() - t)
337 self.buffersIn.append( tbuf.Length() )
340 tb = TBufferFile( TBuffer.kWrite )
341 self.T.dumpBuffer(tb)
342 self.tDump += ( time.time()-t )
344 self.buffersOut.append( tb.Length() )
347 evIn =
"Events Loaded : %i"%( self.nIn )
348 evOut =
"Events Dumped : %i"%( self.nOut )
349 din =
sum( self.buffersIn )
350 dataIn =
"Data Loaded : %i"%(din)
351 dataInMb =
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
353 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
354 %( din/(self.nIn*MB) )
355 maxIn =
"Max Buf Loaded : %5.2f Mb"\
356 %( max(self.buffersIn)/MB )
358 avgIn =
"Avg Buf Loaded : N/A"
359 maxIn =
"Max Buf Loaded : N/A"
360 dout =
sum( self.buffersOut )
361 dataOut =
"Data Dumped : %i"%(dout)
362 dataOutMb =
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
364 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
365 %( din/(self.nOut*MB) )
366 maxOut =
"Max Buf Dumped : %5.2f Mb"\
367 %( max(self.buffersOut)/MB )
369 avgOut =
"Avg Buf Dumped : N/A"
370 maxOut =
"Max Buf Dumped : N/A"
371 dumpTime =
"Total Dump Time : %5.2f"%( self.tDump )
372 loadTime =
"Total Load Time : %5.2f"%( self.tLoad )
386 self.log.name =
"%s-%i TESSerializer"%(self.nodeType, self.nodeID)
388 self.log.info( line )
389 self.log.name =
"%s-%i"%(self.nodeType, self.nodeID)
393 class GMPComponent( object ) :
400 def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
410 self.nodeType = nodeType
411 current_process().name = nodeType
414 self.initEvent, eventLoopSyncer, self.finalEvent = events
415 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer
418 self.nWorkers, self.sEvent, self.config, self.log = params
419 self.subworkers = subworkers
423 self.currentEvent =
None
429 ks = self.config.keys()
431 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
433 if k
in ks: self.app = k
437 qPair, histq, fq = self.queues
440 if self.nodeType ==
'Reader' or self.nodeType ==
'Worker' :
443 self.evcom = EventCommunicator( self, qin, qout )
446 assert self.nodeType ==
'Writer'
450 ec = EventCommunicator( self, q,
None )
451 self.evcoms.append( ec )
466 self.log.name =
'%s-%i'%(self.nodeType, self.nodeID)
473 self.firstEvTime = 0.0
476 self.proc = Process( target=self.Engine )
488 def processConfiguration( self ) :
492 def SetupGaudiPython( self ) :
497 from AlgSmapShot
import SmapShot
498 smapsLog = self.nodeType+
'-'+str(self.nodeID)+
'.smp'
499 ss = SmapShot( logname=smapsLog )
500 self.a.addAlgorithm( ss )
501 self.evt = self.a.evtsvc()
502 self.hvt = self.a.histsvc()
503 self.fsr = self.a.filerecordsvc()
504 self.inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
505 self.pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator' )
506 self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
507 self.TS = TESSerializer( self.ts, self.evt,
508 self.nodeType, self.nodeID, self.log )
511 def StartGaudiPython( self ) :
516 def LoadTES( self, tbufferfile ) :
517 root = gbl.DataObject()
519 self.evt.setRoot(
'/Event', root )
520 self.ts.loadBuffer(tbufferfile)
523 if self.app !=
'Gauss':
529 lst = [
'/Event/Gen/Header',
530 '/Event/Rec/Header' ]
534 n = self.evt[path].evtNumber()
544 n = self.evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
551 if self.nIn > 0
or self.nOut > 0 :
554 self.log.warning(
'Could not determine Event Number')
557 if self.nodeID == -1:
558 self.num = self.num + 1
562 def IdentifyWriters( self ) :
567 keys = [
"events",
"records",
"tuples",
"histos" ]
572 wkeys = WRITERTYPES.keys()
573 for v
in self.config.values() :
574 if v.__class__.__name__
in wkeys :
575 writerType = WRITERTYPES[ v.__class__.__name__ ]
576 d[writerType].append( MiniWriter(v, writerType, self.config) )
577 if self.nodeID == 0 :
578 self.log.info(
'Writer Found : %s'%(v.name()))
581 if 'HistogramPersistencySvc' in self.config.keys() :
582 hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
583 d[
"histos" ].append( hfile )
586 def dumpHistograms( self ) :
588 Method used by the GaudiPython algorithm CollectHistos
589 to obtain a dictionary of form { path : object }
590 representing the Histogram Store
592 nlist = self.hvt.getHistoNames( )
594 objects = 0 ; histos = 0
598 if type(o)
in aidatypes :
605 print 'WARNING : no histograms to recover?'
608 def Initialize( self ) :
610 self.processConfiguration( )
611 self.SetupGaudiPython( )
613 self.StartGaudiPython( )
615 if self.app ==
'Gauss':
617 tool = self.a.tool(
"ToolSvc.EvtCounter" )
618 self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
622 self.iTime = time.time() - start
624 def Finalize( self ) :
628 self.log.info(
'%s-%i Finalized'%(self.nodeType, self.nodeID) )
629 self.finalEvent.set()
630 self.fTime = time.time() - start
633 self.log.name =
"%s-%i Audit"%(self.nodeType, self.nodeID)
634 allTime =
"Alive Time : %5.2f"%(self.tTime)
635 initTime =
"Init Time : %5.2f"%(self.iTime)
636 frstTime =
"1st Event Time : %5.2f"%(self.firstEvTime)
637 runTime =
"Run Time : %5.2f"%(self.rTime)
638 finTime =
"Finalise Time : %5.2f"%(self.fTime)
639 tup = ( allTime, initTime, frstTime, runTime, finTime )
642 self.log.name =
"%s-%i"%(self.nodeType, self.nodeID)
648 class Reader( GMPComponent ) :
649 def __init__( self, queues, events, params, subworkers ) :
650 GMPComponent.__init__(self,
'Reader', -1, queues, events, params, subworkers )
652 def processConfiguration( self ) :
657 self.config[
'ApplicationMgr' ].TopAlg = []
658 self.config[
'ApplicationMgr' ].OutStream = []
659 if "HistogramPersistencySvc" in self.config.keys() :
660 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
661 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M'
662 self.evtMax = self.config[
'ApplicationMgr' ].EvtMax
664 def DumpEvent( self ) :
665 tb = TBufferFile( TBuffer.kWrite )
667 self.ts.dumpBuffer( tb )
671 def DoFirstEvent( self ) :
674 startFirst = time.time()
675 self.log.info(
'Reader : First Event')
676 if self.nOut == self.evtMax :
677 self.log.info(
'evtMax( %i ) reached'%(self.evtMax))
683 if not bool(self.evt[
'/Event']) :
684 self.log.warning(
'No More Events! (So Far : %i)'%(self.nOut))
689 if self.app ==
"Gauss":
690 lst = self.evt.getHistoNames()
693 lst = self.evt.getList()
694 if self.app ==
"DaVinci":
695 daqnode = self.evt.retrieveObject(
'/Event/DAQ' ).
registry()
697 self.evt.getList( daqnode, lst, daqnode.address().
par() )
699 self.log.critical(
'Reader could not acquire TES List!')
702 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
705 self.currentEvent = self.getEventNumber( )
707 self.log.info(
'First Event Sent')
708 self.evcom.send( (self.currentEvent, tb) )
710 self.eventLoopSyncer.set()
711 self.evt.clearStore( )
712 self.firstEvTime = time.time()-startFirst
719 libc = ctypes.CDLL(
'libc.so.6')
720 name = str(self.nodeType) + str(self.nodeID) +
'\0'
721 libc.prctl(15,name,0,0,0)
724 startEngine = time.time()
725 self.log.name =
'Reader'
726 self.log.info(
'Reader Process starting')
731 self.a.addAlgorithm( CollectHistograms(self) )
733 self.log.info(
'Reader Beginning Distribution')
734 sc = self.DoFirstEvent( )
736 self.log.info(
'Reader First Event OK')
738 self.log.critical(
'Reader Failed on First Event')
744 if self.nOut == self.evtMax :
745 self.log.info(
'evtMax( %i ) reached'%(self.evtMax))
748 if not self.stat.isSuccess() :
749 self.log.critical(
'Reader is Damaged!' )
754 self.rTime += (time.time()-t)
755 if not bool(self.evt[
'/Event']) :
756 self.log.warning(
'No More Events! (So Far : %i)'%(self.nOut))
758 self.currentEvent = self.getEventNumber( )
760 self.evcom.send( (self.currentEvent, tb) )
763 self.eventLoopSyncer.set()
764 self.evt.clearStore( )
765 self.log.info(
'Setting <Last> Event')
769 self.log.info(
'Reader : Event Distribution complete.' )
770 self.evcom.finalize()
772 self.tTime = time.time() - startEngine
776 class Subworker(GMPComponent):
777 def __init__( self, workerID, queues, events, params, subworkers ) :
778 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
780 self.writerDict = self.IdentifyWriters( )
782 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
783 self.log.info(
"Subworker-%i Created OK"%(self.nodeID))
784 self.eventOutput =
True
790 libc = ctypes.CDLL(
'libc.so.6')
791 name = str(self.nodeType) + str(self.nodeID) +
'\0'
792 libc.prctl(15,name,0,0,0)
795 startEngine = time.time()
796 msg = self.a.service(
'MessageSvc')
797 msg.Format =
'[' + self.log.name +
'] % F%18W%S%7W%R%T %0W%M'
799 self.log.name =
"Worker-%i"%(self.nodeID)
800 self.log.info(
"Subworker %i starting Engine"%(self.nodeID))
801 self.filerecordsAgent = FileRecordsAgent(self)
804 self.log.info(
'EVT WRITERS ON WORKER : %i'\
805 %( len(self.writerDict[
'events'])))
807 nEventWriters = len( self.writerDict[
"events" ] )
808 self.a.addAlgorithm( CollectHistograms(self) )
813 packet = self.evcom.receive( )
816 if packet ==
'FINISHED' :
break
817 evtNumber, tbin = packet
818 if self.cntr !=
None:
820 self.cntr.setEventCounter( evtNumber )
826 sc = self.a.executeEvent()
828 self.firstEvTime = time.time()-t
830 self.rTime += (time.time()-t)
834 self.log.name =
"Worker-%i"%(self.nodeID)
835 self.log.warning(
'Did not Execute Event')
836 self.evt.clearStore()
838 if self.isEventPassed() :
841 self.log.name =
"Worker-%i"%(self.nodeID)
842 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
843 self.evt.clearStore()
845 if self.eventOutput :
848 self.currentEvent = self.getEventNumber( )
850 self.evcom.send( (self.currentEvent, tb) )
852 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
853 self.eventLoopSyncer.set()
854 self.evt.clearStore( )
855 self.log.name =
"Worker-%i"%(self.nodeID)
856 self.log.info(
'Setting <Last> Event %s' %(self.nodeID))
859 self.evcom.finalize()
861 self.filerecordsAgent.SendFileRecords()
862 self.tTime = time.time()-startEngine
867 def SetServices(self,a, evt, hvt, fsr, inc, pers, ts , cntr):
873 self.inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
877 self.TS = TESSerializer( self.ts, self.evt,
878 self.nodeType, self.nodeID, self.log )
881 def getCheckAlgs( self ) :
883 For some output writers, a check is performed to see if the event has
884 executed certain algorithms.
885 These reside in the AcceptAlgs property for those writers
890 for m
in self.writerDict[
"events" ] :
891 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
892 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
893 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
894 return (acc, req, vet)
897 def checkExecutedPassed( self, algName ) :
898 if self.a.algorithm( algName )._ialg.isExecuted()\
899 and self.a.algorithm( algName )._ialg.filterPassed() :
904 def isEventPassed( self ) :
906 Check the algorithm status for an event.
907 Depending on output writer settings, the event
908 may be declined based on various criteria.
909 This is a transcript of the check that occurs in GaudiSvc::OutputStream
913 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
915 for name
in self.acceptAlgs :
916 if self.checkExecutedPassed( name ) :
922 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
923 for name
in self.requireAlgs :
924 if self.checkExecutedPassed( name ) :
927 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
930 self.log.debug(
'self.vetoAlgs is %s'%(str(self.vetoAlgs)))
931 for name
in self.vetoAlgs :
932 if self.checkExecutedPassed( name ) :
935 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
940 class Worker( GMPComponent ) :
941 def __init__( self, workerID, queues, events, params , subworkers ) :
942 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
944 self.writerDict = self.IdentifyWriters( )
946 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
947 self.log.name =
"Worker-%i"%(self.nodeID)
948 self.log.info(
"Worker-%i Created OK"%(self.nodeID))
949 self.eventOutput =
True
951 def processConfiguration( self ) :
957 self.config[
'EventSelector' ].Input = []
958 self.config[
'ApplicationMgr' ].OutStream = []
959 if "HistogramPersistencySvc" in self.config.keys() :
960 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
961 formatHead =
'[Worker-%i] '%(self.nodeID)
962 self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M'
964 for key, lst
in self.writerDict.iteritems() :
965 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
967 for m
in self.writerDict[
"tuples" ] :
970 newName = m.getNewName(
'.',
'.w%i.'%(self.nodeID) )
971 self.config[ m.key ].Output = newName
979 if self.app ==
"Gauss":
981 if "ToolSvc.EvtCounter" not in self.config:
982 from Configurables
import EvtCounter
983 counter = EvtCounter()
985 counter = self.config[
"ToolSvc.EvtCounter"]
986 counter.UseIncident =
False
989 self.log.warning(
'Cannot configure EvtCounter')
996 libc = ctypes.CDLL(
'libc.so.6')
997 name = str(self.nodeType) + str(self.nodeID) +
'\0'
998 libc.prctl(15,name,0,0,0)
1000 startEngine = time.time()
1001 self.log.info(
"Worker %i starting Engine"%(self.nodeID))
1003 self.filerecordsAgent = FileRecordsAgent(self)
1006 self.log.info(
'EVT WRITERS ON WORKER : %i'\
1007 %( len(self.writerDict[
'events'])))
1009 nEventWriters = len( self.writerDict[
"events" ] )
1013 for m
in self.writerDict[
"events" ] :
1014 for item
in m.ItemList :
1015 hsh = item.find(
'#' )
1018 itemList.add( item )
1019 for item
in m.OptItemList :
1020 hsh = item.find(
'#' )
1023 optItemList.add( item )
1025 itemList -= optItemList
1026 for item
in sorted( itemList ):
1027 self.log.info(
' adding ItemList Item to ts : %s' % ( item ) )
1028 self.ts.addItem( item )
1029 for item
in sorted( optItemList ):
1030 self.log.info(
' adding Optional Item to ts : %s' % ( item ) )
1031 self.ts.addOptItem( item )
1033 self.log.info(
'There is no Event Output for this app' )
1034 self.eventOutput =
False
1039 packet = self.evcom.receive( )
1042 if packet ==
'FINISHED' :
break
1043 evtNumber, tbin = packet
1044 if self.cntr !=
None:
1045 self.cntr.setEventCounter( evtNumber )
1052 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1053 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1057 for k
in self.subworkers:
1058 k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1060 self.a.addAlgorithm( CollectHistograms(self) )
1062 self.TS.Load( tbin )
1065 sc = self.a.executeEvent()
1067 self.firstEvTime = time.time()-t
1069 self.rTime += (time.time()-t)
1073 self.log.warning(
'Did not Execute Event')
1074 self.evt.clearStore()
1076 if self.isEventPassed() :
1079 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
1080 self.evt.clearStore()
1082 if self.eventOutput :
1085 self.currentEvent = self.getEventNumber( )
1086 tb = self.TS.Dump( )
1087 self.evcom.send( (self.currentEvent, tb) )
1089 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1090 self.eventLoopSyncer.set()
1091 self.evt.clearStore( )
1092 self.log.info(
'Setting <Last> Event')
1093 self.lastEvent.set()
1095 self.evcom.finalize()
1096 self.log.info(
'Worker-%i Finished Processing Events'%(self.nodeID) )
1098 self.filerecordsAgent.SendFileRecords()
1100 self.tTime = time.time()-startEngine
1103 for k
in self.subworkers:
1104 self.log.info(
'Join subworkers')
1107 def getCheckAlgs( self ) :
1109 For some output writers, a check is performed to see if the event has
1110 executed certain algorithms.
1111 These reside in the AcceptAlgs property for those writers
1116 for m
in self.writerDict[
"events" ] :
1117 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
1118 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
1119 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
1120 return (acc, req, vet)
1123 def checkExecutedPassed( self, algName ) :
1124 if self.a.algorithm( algName )._ialg.isExecuted()\
1125 and self.a.algorithm( algName )._ialg.filterPassed() :
1130 def isEventPassed( self ) :
1132 Check the algorithm status for an event.
1133 Depending on output writer settings, the event
1134 may be declined based on various criteria.
1135 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1139 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1140 if self.acceptAlgs :
1141 for name
in self.acceptAlgs :
1142 if self.checkExecutedPassed( name ) :
1148 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
1149 for name
in self.requireAlgs :
1150 if self.checkExecutedPassed( name ) :
1153 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
1156 self.log.debug(
'self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1157 for name
in self.vetoAlgs :
1158 if self.checkExecutedPassed( name ) :
1161 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
1167 class Writer( GMPComponent ) :
1168 def __init__( self, queues, events, params, subworkers ) :
1169 GMPComponent.__init__(self,
'Writer', -2, queues, events, params, subworkers )
1171 self.writerDict = self.IdentifyWriters( )
1173 self.status = [
False]*self.nWorkers
1174 self.log.name =
"Writer--2"
1176 def processConfiguration( self ) :
1180 self.config[
'ApplicationMgr' ].TopAlg = []
1181 self.config[
'EventSelector' ].Input = []
1183 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M'
1186 for key, lst
in self.writerDict.iteritems() :
1187 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
1193 for m
in self.writerDict[
"events" ] :
1194 self.log.debug(
'Processing Event Writer : %s'%(m) )
1195 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers )
1196 self.config[ m.key ].Output = newName
1205 for m
in self.writerDict[
"records" ] :
1206 self.log.debug(
'Processing FileRecords Writer: %s'%(m) )
1207 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers,
1208 extra=
" OPT='RECREATE'" )
1209 self.config[ m.key ].Output = newName
1212 hs =
"HistogramPersistencySvc"
1214 if hs
in self.config.keys() :
1215 n = self.config[ hs ].OutputFile
1217 newName=self.config[hs].OutputFile.replace(
'.',\
1218 '.p%i.'%(self.nWorkers))
1219 self.config[ hs ].OutputFile = newName
1221 def Engine( self ) :
1225 libc = ctypes.CDLL(
'libc.so.6')
1226 name = str(self.nodeType) + str(self.nodeID) +
'\0'
1227 libc.prctl(15,name,0,0,0)
1229 startEngine = time.time()
1231 self.histoAgent = HistoAgent( self )
1232 self.filerecordsAgent = FileRecordsAgent( self )
1237 stopCriteria = self.nWorkers
1239 current = (current+1)%self.nWorkers
1240 packet = self.evcoms[current].receive( timeout=0.01 )
1243 if packet ==
'FINISHED' :
1244 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
1246 self.status[current] =
True
1247 if all(self.status) :
1248 self.log.info(
'FINISHED recd from all workers, break loop')
1253 evtNumber, tbin = packet
1254 self.TS.Load( tbin )
1256 self.a.executeEvent()
1257 self.rTime += ( time.time()-t )
1258 self.currentEvent = self.getEventNumber( )
1259 self.evt.clearStore( )
1260 self.eventLoopSyncer.set()
1261 self.log.name =
"Writer--2"
1262 self.log.info(
'Setting <Last> Event')
1263 self.lastEvent.set()
1266 [ e.finalize()
for e
in self.evcoms ]
1268 sc = self.histoAgent.Receive()
1269 sc = self.histoAgent.RebuildHistoStore()
1270 if sc.isSuccess() : self.log.info(
'Histo Store rebuilt ok' )
1271 else : self.log.warning(
'Histo Store Error in Rebuild' )
1274 sc = self.filerecordsAgent.Receive()
1275 self.filerecordsAgent.Rebuild()
1286 class Coord( object ) :
1287 def __init__( self, nWorkers, config, log ) :
1290 self.config = config
1292 self.log.name =
'GaudiPython-Parallel-Logger'
1293 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning' )
1297 self.nWorkers = cpu_count()
1299 self.nWorkers = nWorkers
1302 self.qs = self.SetupQueues( )
1303 self.hq = JoinableQueue( )
1304 self.fq = JoinableQueue( )
1307 self.sInit = Syncer( self.nWorkers, self.log,
1308 limit=WAIT_INITIALISE,
1309 step=STEP_INITIALISE )
1310 self.sRun = Syncer( self.nWorkers, self.log,
1312 limit=WAIT_SINGLE_EVENT,
1314 firstEvent=WAIT_FIRST_EVENT )
1315 self.sFin = Syncer( self.nWorkers, self.log,
1316 limit=WAIT_FINALISE,
1317 step=STEP_FINALISE )
1319 self.histSyncEvent =
Event()
1322 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1324 self.subworkers = []
1326 for i
in range(1, self.nWorkers ) :
1327 sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1328 self.subworkers.append( sub )
1329 self.reader=
Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1331 wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1332 self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1335 self.system.append(self.writer)
1336 self.system.append(wk)
1337 self.system.append(self.reader)
1339 def getSyncEvents( self, nodeID ) :
1340 init = self.sInit.d[nodeID].event
1341 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1342 fin = self.sFin.d[nodeID].event
1343 return ( init, run, fin )
1345 def getQueues( self, nodeID ) :
1346 eventQ = self.qs[ nodeID ]
1349 return ( eventQ, histQ, fsrQ )
1354 self.log.name =
'GaudiPython-Parallel-Logger'
1355 self.log.info(
'INITIALISING SYSTEM' )
1358 for p
in self.system :
1361 sc = self.sInit.syncAll(step=
"Initialise")
1362 if sc == SUCCESS:
pass
1363 else : self.Terminate() ;
return FAILURE
1366 self.log.name =
'GaudiPython-Parallel-Logger'
1367 self.log.info(
'RUNNING SYSTEM' )
1368 sc = self.sRun.syncAll(step=
"Run")
1369 if sc == SUCCESS:
pass
1370 else : self.Terminate() ;
return FAILURE
1373 self.log.name =
'GaudiPython-Parallel-Logger'
1374 self.log.info(
'FINALISING SYSTEM' )
1375 sc = self.sFin.syncAll(step=
"Finalise")
1376 if sc == SUCCESS:
pass
1377 else : self.Terminate() ;
return FAILURE
1380 self.log.info(
"Cleanly join all Processes" )
1382 self.log.info(
"Report Total Success to Main.py" )
1385 def Terminate( self ) :
1387 children = multiprocessing.active_children()
1397 self.system.reverse()
1398 for s
in self.system :
1402 def SetupQueues( self ) :
1410 rwk = JoinableQueue()
1412 workersWriter = [ JoinableQueue()
for i
in xrange(self.nWorkers) ]
1415 d[-2] = (workersWriter,
None)
1416 for i
in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
Essential information of the event used in examples It can be identified by "/Event".
double sum(double x, double y, double z)
def Reader(readerType, filename, qacross, qToEngine)
NamedRange_< CONTAINER > range(const CONTAINER &cnt, const std::string &name)
simple function to create the named range form arbitrary container