2 from GaudiPython
import AppMgr, gbl, setOwnership, PyAlgorithm, SUCCESS,FAILURE, InterfaceCast
3 from ROOT
import TBufferFile, TBuffer
5 from multiprocessing
import Process, Queue, JoinableQueue, Event
6 from multiprocessing
import cpu_count, current_process
7 from multiprocessing.queues
import Empty
10 from ROOT
import TParallelMergingFile
39 WAIT_INITIALISE = 60*5
40 WAIT_FIRST_EVENT = 60*3
41 WAIT_SINGLE_EVENT = 60*6
55 aida2root = gbl.Gaudi.Utils.Aida2ROOT.aida2root
59 aidatypes = ( gbl.AIDA.IHistogram,
60 gbl.AIDA.IHistogram1D,
61 gbl.AIDA.IHistogram2D,
62 gbl.AIDA.IHistogram3D,
65 gbl.AIDA.IBaseHistogram )
68 thtypes = ( gbl.TH1D, gbl.TH2D, gbl.TH3D, gbl.TProfile, gbl.TProfile2D )
71 WRITERTYPES = {
'EvtCollectionStream' :
"tuples",
72 'InputCopyStream' :
"events",
73 'OutputStream' :
"events",
74 'RecordStream' :
"records",
75 'RunRecordStream' :
"records",
76 'SequentialOutputStream' :
"events",
77 'TagCollectionStream' :
"tuples" }
81 class MiniWriter( object ) :
83 A class to represent a writer in the GaudiPython configuration
84 It can be non-trivial to access the name of the output file; it may be
85 specified in the DataSvc, or just on the writer, may be a list, or string
86 Also, there are three different types of writer (events, records, tuples)
87 so this bootstrap class provides easy access to this info while configuring
89 def __init__( self, writer, wType, config ) :
98 self.OptItemList =
None
100 self.wName = writer.getName()
103 self.datasvcName =
None
104 self.svcOutput =
None
105 if hasattr( self.w,
"Output" ) :
106 self.woutput = self.w.Output
107 self.getItemLists( config )
108 self.set( self.wName, self.w.Output )
113 self.datasvcName = self.w.EvtDataSvc
114 datasvc = config[ self.datasvcName ]
115 if hasattr( datasvc,
"Output" ) :
116 self.getItemLists( config )
117 self.set( self.datasvcName, datasvc.Output )
120 def getNewName( self, replaceThis, withThis, extra='' ) :
127 assert replaceThis.__class__.__name__ ==
'str'
128 assert withThis.__class__.__name__ ==
'str'
131 if old.__class__.__name__ ==
'list' :
134 new = old.replace( replaceThis, withThis )
141 def getItemLists( self, config ) :
143 if hasattr( self.w,
"ItemList" ) :
144 self.ItemList = self.w.ItemList
146 datasvc = config[ self.w.EvtDataSvc ]
147 if hasattr( datasvc,
"ItemList" ) :
148 self.ItemList = datasvc.ItemList
150 if hasattr( self.w,
"OptItemList" ) :
151 self.OptItemList = self.w.OptItemList
153 datasvc = config[ self.w.EvtDataSvc ]
154 if hasattr( datasvc,
"OptItemList" ) :
155 self.OptItemList = datasvc.OptItemList
158 def set( self, key, output ) :
163 def __repr__( self ) :
167 s +=
"Writer : %s\n"%( self.wName )
168 s +=
"Writer Type : %s\n"%( self.wType )
169 s +=
"Writer Output : %s\n"%( self.output )
170 s +=
"DataSvc : %s\n"%( self.datasvcName )
171 s +=
"DataSvc Output : %s\n"%( self.svcOutput )
173 s +=
"Key for config : %s\n"%( self.key )
174 s +=
"Output File : %s\n"%( self.output )
175 s +=
"ItemList : %s\n"%( self.ItemList )
176 s +=
"OptItemList : %s\n"%( self.OptItemList )
182 class CollectHistograms( PyAlgorithm ) :
184 GaudiPython algorithm used to clean up histos on the Reader and Workers
185 Only has a finalize method()
186 This retrieves a dictionary of path:histo objects and sends it to the
187 writer. It then waits for a None flag : THIS IS IMPORTANT, as if
188 the algorithm returns before ALL histos have been COMPLETELY RECEIVED
189 at the writer end, there will be an error.
191 def __init__( self, gmpcomponent ) :
192 PyAlgorithm.__init__( self )
193 self._gmpc = gmpcomponent
194 self.log = self._gmpc.log
198 def finalize( self ) :
199 self.log.info(
'CollectHistograms Finalise (%s)'%(self._gmpc.nodeType))
200 self._gmpc.hDict = self._gmpc.dumpHistograms( )
201 ks = self._gmpc.hDict.keys()
202 self.log.info(
'%i Objects in Histogram Store'%(len(ks)))
208 reps = len(ks)/chunk + 1
209 for i
in xrange(reps) :
210 someKeys = ks[i*chunk : (i+1)*chunk]
211 smalld = dict( [(key, self._gmpc.hDict[key])
for key
in someKeys] )
212 self._gmpc.hq.put( (self._gmpc.nodeID, smalld) )
214 self.log.debug(
'Signalling end of histos to Writer')
215 self._gmpc.hq.put(
'HISTOS_SENT' )
216 self.log.debug(
'Waiting on Sync Event' )
217 self._gmpc.sEvent.wait()
218 self.log.debug(
'Histo Sync Event set, clearing and returning' )
219 self._gmpc.hvt.clearStore()
220 root = gbl.DataObject()
222 self._gmpc.hvt.setRoot(
'/stat', root )
227 class EventCommunicator( object ) :
231 def __init__( self, GMPComponent, qin, qout ) :
232 self._gmpc = GMPComponent
233 self.log = self._gmpc.log
252 def send( self, item ) :
255 assert item.__class__.__name__ ==
'tuple'
256 startTransmission = time.time()
257 self.qout.put( item )
260 while self.qout._buffer : time.sleep( NAP )
261 self.qoutTime += time.time()-startTransmission
262 self.sizeSent += item[1].Length()
266 def receive( self, timeout=None ) :
268 startWait = time.time()
270 itemIn = self.qin.get( timeout=timeout )
273 self.qinTime += time.time()-startWait
275 if itemIn.__class__.__name__ ==
'tuple' :
276 self.sizeRecv += itemIn[1].Length()
282 self._gmpc.log.warning(
'TASK_DONE called too often by : %s'\
283 %(self._gmpc.nodeType))
286 def finalize( self ) :
287 self.log.info(
'Finalize Event Communicator : %s %s'%(self._gmpc, self._gmpc.nodeType))
291 if self._gmpc.nodeType ==
'Reader' : downstream = self._gmpc.nWorkers
292 elif self._gmpc.nodeType ==
'Writer' : downstream = 0
293 elif self._gmpc.nodeType ==
'Worker' : downstream = 1
295 for i
in xrange(downstream) :
296 self.qout.put(
'FINISHED' )
297 if self._gmpc.nodeType !=
'Writer' :
302 def statistics( self ) :
303 self.log.name =
'%s-%i Audit '%(self._gmpc.nodeType,self._gmpc.nodeID)
304 self.log.info (
'Items Sent : %i'%(self.nSent) )
305 self.log.info (
'Items Received : %i'%(self.nRecv) )
306 self.log.info (
'Data Sent : %i'%(self.sizeSent) )
307 self.log.info (
'Data Received : %i'%(self.sizeRecv) )
308 self.log.info (
'Q-out Time : %5.2f'%(self.qoutTime) )
309 self.log.info (
'Q-in Time : %5.2f'%(self.qinTime ) )
313 class TESSerializer( object ) :
314 def __init__( self, gaudiTESSerializer, evtDataSvc,
315 nodeType, nodeID, log ) :
316 self.T = gaudiTESSerializer
317 self.evt = evtDataSvc
325 self.nodeType = nodeType
328 def Load( self, tbuf ) :
329 root = gbl.DataObject()
331 self.evt.setRoot(
'/Event', root )
333 self.T.loadBuffer( tbuf )
334 self.tLoad += (time.time() - t)
336 self.buffersIn.append( tbuf.Length() )
339 tb = TBufferFile( TBuffer.kWrite )
340 self.T.dumpBuffer(tb)
341 self.tDump += ( time.time()-t )
343 self.buffersOut.append( tb.Length() )
346 evIn =
"Events Loaded : %i"%( self.nIn )
347 evOut =
"Events Dumped : %i"%( self.nOut )
348 din =
sum( self.buffersIn )
349 dataIn =
"Data Loaded : %i"%(din)
350 dataInMb =
"Data Loaded (MB) : %5.2f Mb"%(din/MB)
352 avgIn =
"Avg Buf Loaded : %5.2f Mb"\
353 %( din/(self.nIn*MB) )
354 maxIn =
"Max Buf Loaded : %5.2f Mb"\
355 %( max(self.buffersIn)/MB )
357 avgIn =
"Avg Buf Loaded : N/A"
358 maxIn =
"Max Buf Loaded : N/A"
359 dout =
sum( self.buffersOut )
360 dataOut =
"Data Dumped : %i"%(dout)
361 dataOutMb =
"Data Dumped (MB) : %5.2f Mb"%(dout/MB)
363 avgOut =
"Avg Buf Dumped : %5.2f Mb"\
364 %( din/(self.nOut*MB) )
365 maxOut =
"Max Buf Dumped : %5.2f Mb"\
366 %( max(self.buffersOut)/MB )
368 avgOut =
"Avg Buf Dumped : N/A"
369 maxOut =
"Max Buf Dumped : N/A"
370 dumpTime =
"Total Dump Time : %5.2f"%( self.tDump )
371 loadTime =
"Total Load Time : %5.2f"%( self.tLoad )
385 self.log.name =
"%s-%i TESSerializer"%(self.nodeType, self.nodeID)
387 self.log.info( line )
388 self.log.name =
"%s-%i"%(self.nodeType, self.nodeID)
392 class GMPComponent( object ) :
399 def __init__( self, nodeType, nodeID, queues, events, params, subworkers ) :
409 self.nodeType = nodeType
410 current_process().name = nodeType
413 self.initEvent, eventLoopSyncer, self.finalEvent = events
414 self.eventLoopSyncer, self.lastEvent = eventLoopSyncer
417 self.nWorkers, self.sEvent, self.config, self.log = params
418 self.subworkers = subworkers
422 self.currentEvent =
None
428 ks = self.config.keys()
430 list = [
"Brunel",
"DaVinci",
"Boole",
"Gauss"]
432 if k
in ks: self.app = k
436 qPair, histq, fq = self.queues
439 if self.nodeType ==
'Reader' or self.nodeType ==
'Worker' :
442 self.evcom = EventCommunicator( self, qin, qout )
445 assert self.nodeType ==
'Writer'
449 ec = EventCommunicator( self, q,
None )
450 self.evcoms.append( ec )
465 self.log.name =
'%s-%i'%(self.nodeType, self.nodeID)
472 self.firstEvTime = 0.0
475 self.proc = Process( target=self.Engine )
487 def processConfiguration( self ) :
491 def SetupGaudiPython( self ) :
496 from AlgSmapShot
import SmapShot
497 smapsLog = self.nodeType+
'-'+str(self.nodeID)+
'.smp'
498 ss = SmapShot( logname=smapsLog )
499 self.a.addAlgorithm( ss )
500 self.evt = self.a.evtsvc()
501 self.hvt = self.a.histsvc()
502 self.fsr = self.a.filerecordsvc()
503 self.inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
504 self.pers = self.a.service(
'EventPersistencySvc',
'IAddressCreator' )
505 self.ts = gbl.GaudiMP.TESSerializer( self.evt._idp, self.pers )
506 self.TS = TESSerializer( self.ts, self.evt,
507 self.nodeType, self.nodeID, self.log )
510 def StartGaudiPython( self ) :
515 def LoadTES( self, tbufferfile ) :
516 root = gbl.DataObject()
518 self.evt.setRoot(
'/Event', root )
519 self.ts.loadBuffer(tbufferfile)
522 if self.app !=
'Gauss':
528 lst = [
'/Event/Gen/Header',
529 '/Event/Rec/Header' ]
533 n = self.evt[path].evtNumber()
543 n = self.evt[
'/Event/DAQ/RawEvent'].banks(16)[0].data()[4]
550 if self.nIn > 0
or self.nOut > 0 :
553 self.log.warning(
'Could not determine Event Number')
556 if self.nodeID == -1:
557 self.num = self.num + 1
561 def IdentifyWriters( self ) :
566 keys = [
"events",
"records",
"tuples",
"histos" ]
571 wkeys = WRITERTYPES.keys()
572 for v
in self.config.values() :
573 if v.__class__.__name__
in wkeys :
574 writerType = WRITERTYPES[ v.__class__.__name__ ]
575 d[writerType].append( MiniWriter(v, writerType, self.config) )
576 if self.nodeID == 0 :
577 self.log.info(
'Writer Found : %s'%(v.name()))
580 if 'HistogramPersistencySvc' in self.config.keys() :
581 hfile =self.config[
'HistogramPersistencySvc'].getProp(
'OutputFile')
582 d[
"histos" ].append( hfile )
585 def dumpHistograms( self ) :
587 Method used by the GaudiPython algorithm CollectHistos
588 to obtain a dictionary of form { path : object }
589 representing the Histogram Store
591 nlist = self.hvt.getHistoNames( )
593 objects = 0 ; histos = 0
597 if type(o)
in aidatypes :
604 print 'WARNING : no histograms to recover?'
607 def Initialize( self ) :
609 self.processConfiguration( )
610 self.SetupGaudiPython( )
612 self.StartGaudiPython( )
614 if self.app ==
'Gauss':
616 tool = self.a.tool(
"ToolSvc.EvtCounter" )
617 self.cntr = InterfaceCast( gbl.IEventCounter )( tool.getInterface() )
621 self.iTime = time.time() - start
623 def Finalize( self ) :
627 self.log.info(
'%s-%i Finalized'%(self.nodeType, self.nodeID) )
628 self.finalEvent.set()
629 self.fTime = time.time() - start
632 self.log.name =
"%s-%i Audit"%(self.nodeType, self.nodeID)
633 allTime =
"Alive Time : %5.2f"%(self.tTime)
634 initTime =
"Init Time : %5.2f"%(self.iTime)
635 frstTime =
"1st Event Time : %5.2f"%(self.firstEvTime)
636 runTime =
"Run Time : %5.2f"%(self.rTime)
637 finTime =
"Finalise Time : %5.2f"%(self.fTime)
638 tup = ( allTime, initTime, frstTime, runTime, finTime )
641 self.log.name =
"%s-%i"%(self.nodeType, self.nodeID)
647 class Reader( GMPComponent ) :
648 def __init__( self, queues, events, params, subworkers ) :
649 GMPComponent.__init__(self,
'Reader', -1, queues, events, params, subworkers )
651 def processConfiguration( self ) :
656 self.config[
'ApplicationMgr' ].TopAlg = []
657 self.config[
'ApplicationMgr' ].OutStream = []
658 if "HistogramPersistencySvc" in self.config.keys() :
659 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
660 self.config[
'MessageSvc'].Format =
'[Reader]% F%18W%S%7W%R%T %0W%M'
661 self.evtMax = self.config[
'ApplicationMgr' ].EvtMax
663 def DumpEvent( self ) :
664 tb = TBufferFile( TBuffer.kWrite )
666 self.ts.dumpBuffer( tb )
670 def DoFirstEvent( self ) :
673 startFirst = time.time()
674 self.log.info(
'Reader : First Event')
675 if self.nOut == self.evtMax :
676 self.log.info(
'evtMax( %i ) reached'%(self.evtMax))
682 if not bool(self.evt[
'/Event']) :
683 self.log.warning(
'No More Events! (So Far : %i)'%(self.nOut))
688 if self.app ==
"Gauss":
689 lst = self.evt.getHistoNames()
692 lst = self.evt.getList()
693 if self.app ==
"DaVinci":
694 daqnode = self.evt.retrieveObject(
'/Event/DAQ' ).
registry()
696 self.evt.getList( daqnode, lst, daqnode.address().
par() )
698 self.log.critical(
'Reader could not acquire TES List!')
701 self.log.info(
'Reader : TES List : %i items'%(len(lst)))
704 self.currentEvent = self.getEventNumber( )
706 self.log.info(
'First Event Sent')
707 self.evcom.send( (self.currentEvent, tb) )
709 self.eventLoopSyncer.set()
710 self.evt.clearStore( )
711 self.firstEvTime = time.time()-startFirst
718 libc = ctypes.CDLL(
'libc.so.6')
719 name = str(self.nodeType) + str(self.nodeID) +
'\0'
720 libc.prctl(15,name,0,0,0)
723 startEngine = time.time()
724 self.log.name =
'Reader'
725 self.log.info(
'Reader Process starting')
730 self.a.addAlgorithm( CollectHistograms(self) )
732 self.log.info(
'Reader Beginning Distribution')
733 sc = self.DoFirstEvent( )
735 self.log.info(
'Reader First Event OK')
737 self.log.critical(
'Reader Failed on First Event')
743 if self.nOut == self.evtMax :
744 self.log.info(
'evtMax( %i ) reached'%(self.evtMax))
747 if not self.stat.isSuccess() :
748 self.log.critical(
'Reader is Damaged!' )
753 self.rTime += (time.time()-t)
754 if not bool(self.evt[
'/Event']) :
755 self.log.warning(
'No More Events! (So Far : %i)'%(self.nOut))
757 self.currentEvent = self.getEventNumber( )
759 self.evcom.send( (self.currentEvent, tb) )
762 self.eventLoopSyncer.set()
763 self.evt.clearStore( )
764 self.log.info(
'Setting <Last> Event')
768 self.log.info(
'Reader : Event Distribution complete.' )
769 self.evcom.finalize()
771 self.tTime = time.time() - startEngine
775 class Subworker(GMPComponent):
776 def __init__( self, workerID, queues, events, params, subworkers ) :
777 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
779 self.writerDict = self.IdentifyWriters( )
781 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
782 self.log.info(
"Subworker-%i Created OK"%(self.nodeID))
783 self.eventOutput =
True
789 libc = ctypes.CDLL(
'libc.so.6')
790 name = str(self.nodeType) + str(self.nodeID) +
'\0'
791 libc.prctl(15,name,0,0,0)
794 startEngine = time.time()
795 msg = self.a.service(
'MessageSvc')
796 msg.Format =
'[' + self.log.name +
'] % F%18W%S%7W%R%T %0W%M'
798 self.log.name =
"Worker-%i"%(self.nodeID)
799 self.log.info(
"Subworker %i starting Engine"%(self.nodeID))
800 self.filerecordsAgent = FileRecordsAgent(self)
803 self.log.info(
'EVT WRITERS ON WORKER : %i'\
804 %( len(self.writerDict[
'events'])))
806 nEventWriters = len( self.writerDict[
"events" ] )
807 self.a.addAlgorithm( CollectHistograms(self) )
812 packet = self.evcom.receive( )
815 if packet ==
'FINISHED' :
break
816 evtNumber, tbin = packet
817 if self.cntr !=
None:
819 self.cntr.setEventCounter( evtNumber )
825 sc = self.a.executeEvent()
827 self.firstEvTime = time.time()-t
829 self.rTime += (time.time()-t)
833 self.log.name =
"Worker-%i"%(self.nodeID)
834 self.log.warning(
'Did not Execute Event')
835 self.evt.clearStore()
837 if self.isEventPassed() :
840 self.log.name =
"Worker-%i"%(self.nodeID)
841 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
842 self.evt.clearStore()
844 if self.eventOutput :
847 self.currentEvent = self.getEventNumber( )
849 self.evcom.send( (self.currentEvent, tb) )
851 self.inc.fireIncident(gbl.Incident(
'Subworker',
'EndEvent'))
852 self.eventLoopSyncer.set()
853 self.evt.clearStore( )
854 self.log.name =
"Worker-%i"%(self.nodeID)
855 self.log.info(
'Setting <Last> Event %s' %(self.nodeID))
858 self.evcom.finalize()
860 self.filerecordsAgent.SendFileRecords()
861 self.tTime = time.time()-startEngine
866 def SetServices(self,a, evt, hvt, fsr, inc, pers, ts , cntr):
872 self.inc = self.a.service(
'IncidentSvc',
'IIncidentSvc')
876 self.TS = TESSerializer( self.ts, self.evt,
877 self.nodeType, self.nodeID, self.log )
880 def getCheckAlgs( self ) :
882 For some output writers, a check is performed to see if the event has
883 executed certain algorithms.
884 These reside in the AcceptAlgs property for those writers
889 for m
in self.writerDict[
"events" ] :
890 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
891 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
892 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
893 return (acc, req, vet)
896 def checkExecutedPassed( self, algName ) :
897 if self.a.algorithm( algName )._ialg.isExecuted()\
898 and self.a.algorithm( algName )._ialg.filterPassed() :
903 def isEventPassed( self ) :
905 Check the algorithm status for an event.
906 Depending on output writer settings, the event
907 may be declined based on various criteria.
908 This is a transcript of the check that occurs in GaudiSvc::OutputStream
912 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
914 for name
in self.acceptAlgs :
915 if self.checkExecutedPassed( name ) :
921 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
922 for name
in self.requireAlgs :
923 if self.checkExecutedPassed( name ) :
926 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
929 self.log.debug(
'self.vetoAlgs is %s'%(str(self.vetoAlgs)))
930 for name
in self.vetoAlgs :
931 if self.checkExecutedPassed( name ) :
934 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
939 class Worker( GMPComponent ) :
940 def __init__( self, workerID, queues, events, params , subworkers ) :
941 GMPComponent.__init__(self,
'Worker', workerID, queues, events, params, subworkers )
943 self.writerDict = self.IdentifyWriters( )
945 self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
946 self.log.name =
"Worker-%i"%(self.nodeID)
947 self.log.info(
"Worker-%i Created OK"%(self.nodeID))
948 self.eventOutput =
True
950 def processConfiguration( self ) :
956 self.config[
'EventSelector' ].Input = []
957 self.config[
'ApplicationMgr' ].OutStream = []
958 if "HistogramPersistencySvc" in self.config.keys() :
959 self.config[
'HistogramPersistencySvc' ].OutputFile =
''
960 formatHead =
'[Worker-%i] '%(self.nodeID)
961 self.config[
'MessageSvc'].Format = formatHead+
'% F%18W%S%7W%R%T %0W%M'
963 for key, lst
in self.writerDict.iteritems() :
964 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
966 for m
in self.writerDict[
"tuples" ] :
969 newName = m.getNewName(
'.',
'.w%i.'%(self.nodeID) )
970 self.config[ m.key ].Output = newName
978 if self.app ==
"Gauss":
980 if "ToolSvc.EvtCounter" not in self.config:
981 from Configurables
import EvtCounter
982 counter = EvtCounter()
984 counter = self.config[
"ToolSvc.EvtCounter"]
985 counter.UseIncident =
False
988 self.log.warning(
'Cannot configure EvtCounter')
995 libc = ctypes.CDLL(
'libc.so.6')
996 name = str(self.nodeType) + str(self.nodeID) +
'\0'
997 libc.prctl(15,name,0,0,0)
999 startEngine = time.time()
1000 self.log.info(
"Worker %i starting Engine"%(self.nodeID))
1002 self.filerecordsAgent = FileRecordsAgent(self)
1005 self.log.info(
'EVT WRITERS ON WORKER : %i'\
1006 %( len(self.writerDict[
'events'])))
1008 nEventWriters = len( self.writerDict[
"events" ] )
1012 for m
in self.writerDict[
"events" ] :
1013 for item
in m.ItemList :
1014 hsh = item.find(
'#' )
1017 itemList.add( item )
1018 for item
in m.OptItemList :
1019 hsh = item.find(
'#' )
1022 optItemList.add( item )
1024 itemList -= optItemList
1025 for item
in sorted( itemList ):
1026 self.log.info(
' adding ItemList Item to ts : %s' % ( item ) )
1027 self.ts.addItem( item )
1028 for item
in sorted( optItemList ):
1029 self.log.info(
' adding Optional Item to ts : %s' % ( item ) )
1030 self.ts.addOptItem( item )
1032 self.log.info(
'There is no Event Output for this app' )
1033 self.eventOutput =
False
1038 packet = self.evcom.receive( )
1041 if packet ==
'FINISHED' :
break
1042 evtNumber, tbin = packet
1043 if self.cntr !=
None:
1044 self.cntr.setEventCounter( evtNumber )
1051 self.log.info(
"Fork new subworkers and disconnect from CondDB")
1052 condDB = self.a.service(
'CondDBCnvSvc', gbl.ICondDBReader)
1056 for k
in self.subworkers:
1057 k.SetServices(self.a, self.evt, self.hvt, self.fsr, self.inc, self.pers, self.ts, self.cntr)
1059 self.a.addAlgorithm( CollectHistograms(self) )
1061 self.TS.Load( tbin )
1064 sc = self.a.executeEvent()
1066 self.firstEvTime = time.time()-t
1068 self.rTime += (time.time()-t)
1072 self.log.warning(
'Did not Execute Event')
1073 self.evt.clearStore()
1075 if self.isEventPassed() :
1078 self.log.warning(
'Event did not pass : %i'%(evtNumber) )
1079 self.evt.clearStore()
1081 if self.eventOutput :
1084 self.currentEvent = self.getEventNumber( )
1085 tb = self.TS.Dump( )
1086 self.evcom.send( (self.currentEvent, tb) )
1088 self.inc.fireIncident(gbl.Incident(
'Worker',
'EndEvent'))
1089 self.eventLoopSyncer.set()
1090 self.evt.clearStore( )
1091 self.log.info(
'Setting <Last> Event')
1092 self.lastEvent.set()
1094 self.evcom.finalize()
1095 self.log.info(
'Worker-%i Finished Processing Events'%(self.nodeID) )
1097 self.filerecordsAgent.SendFileRecords()
1099 self.tTime = time.time()-startEngine
1102 for k
in self.subworkers:
1103 self.log.info(
'Join subworkers')
1106 def getCheckAlgs( self ) :
1108 For some output writers, a check is performed to see if the event has
1109 executed certain algorithms.
1110 These reside in the AcceptAlgs property for those writers
1115 for m
in self.writerDict[
"events" ] :
1116 if hasattr(m.w,
'AcceptAlgs') : acc += m.w.AcceptAlgs
1117 if hasattr(m.w,
'RequireAlgs') : req += m.w.RequireAlgs
1118 if hasattr(m.w,
'VetoAlgs') : vet += m.w.VetoAlgs
1119 return (acc, req, vet)
1122 def checkExecutedPassed( self, algName ) :
1123 if self.a.algorithm( algName )._ialg.isExecuted()\
1124 and self.a.algorithm( algName )._ialg.filterPassed() :
1129 def isEventPassed( self ) :
1131 Check the algorithm status for an event.
1132 Depending on output writer settings, the event
1133 may be declined based on various criteria.
1134 This is a transcript of the check that occurs in GaudiSvc::OutputStream
1138 self.log.debug(
'self.acceptAlgs is %s'%(str(self.acceptAlgs)))
1139 if self.acceptAlgs :
1140 for name
in self.acceptAlgs :
1141 if self.checkExecutedPassed( name ) :
1147 self.log.debug(
'self.requireAlgs is %s'%(str(self.requireAlgs)))
1148 for name
in self.requireAlgs :
1149 if self.checkExecutedPassed( name ) :
1152 self.log.info(
'Evt declined (requireAlgs) : %s'%(name) )
1155 self.log.debug(
'self.vetoAlgs is %s'%(str(self.vetoAlgs)))
1156 for name
in self.vetoAlgs :
1157 if self.checkExecutedPassed( name ) :
1160 self.log.info(
'Evt declined : (vetoAlgs) : %s'%(name) )
1166 class Writer( GMPComponent ) :
1167 def __init__( self, queues, events, params, subworkers ) :
1168 GMPComponent.__init__(self,
'Writer', -2, queues, events, params, subworkers )
1170 self.writerDict = self.IdentifyWriters( )
1172 self.status = [
False]*self.nWorkers
1173 self.log.name =
"Writer--2"
1175 def processConfiguration( self ) :
1179 self.config[
'ApplicationMgr' ].TopAlg = []
1180 self.config[
'EventSelector' ].Input = []
1182 self.config[
'MessageSvc'].Format =
'[Writer] % F%18W%S%7W%R%T %0W%M'
1185 for key, lst
in self.writerDict.iteritems() :
1186 self.log.info(
'Writer Type : %s\t : %i'%(key, len(lst)) )
1192 for m
in self.writerDict[
"events" ] :
1193 self.log.debug(
'Processing Event Writer : %s'%(m) )
1194 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers )
1195 self.config[ m.key ].Output = newName
1204 for m
in self.writerDict[
"records" ] :
1205 self.log.debug(
'Processing FileRecords Writer: %s'%(m) )
1206 newName = m.getNewName(
'.',
'.p%i.'%self.nWorkers,
1207 extra=
" OPT='RECREATE'" )
1208 self.config[ m.key ].Output = newName
1211 hs =
"HistogramPersistencySvc"
1213 if hs
in self.config.keys() :
1214 n = self.config[ hs ].OutputFile
1216 newName=self.config[hs].OutputFile.replace(
'.',\
1217 '.p%i.'%(self.nWorkers))
1218 self.config[ hs ].OutputFile = newName
1220 def Engine( self ) :
1224 libc = ctypes.CDLL(
'libc.so.6')
1225 name = str(self.nodeType) + str(self.nodeID) +
'\0'
1226 libc.prctl(15,name,0,0,0)
1228 startEngine = time.time()
1230 self.histoAgent = HistoAgent( self )
1231 self.filerecordsAgent = FileRecordsAgent( self )
1236 stopCriteria = self.nWorkers
1238 current = (current+1)%self.nWorkers
1239 packet = self.evcoms[current].receive( timeout=0.01 )
1242 if packet ==
'FINISHED' :
1243 self.log.info(
'Writer got FINISHED flag : Worker %i'%(current))
1245 self.status[current] =
True
1246 if all(self.status) :
1247 self.log.info(
'FINISHED recd from all workers, break loop')
1252 evtNumber, tbin = packet
1253 self.TS.Load( tbin )
1255 self.a.executeEvent()
1256 self.rTime += ( time.time()-t )
1257 self.currentEvent = self.getEventNumber( )
1258 self.evt.clearStore( )
1259 self.eventLoopSyncer.set()
1260 self.log.name =
"Writer--2"
1261 self.log.info(
'Setting <Last> Event')
1262 self.lastEvent.set()
1265 [ e.finalize()
for e
in self.evcoms ]
1267 sc = self.histoAgent.Receive()
1268 sc = self.histoAgent.RebuildHistoStore()
1269 if sc.isSuccess() : self.log.info(
'Histo Store rebuilt ok' )
1270 else : self.log.warning(
'Histo Store Error in Rebuild' )
1273 sc = self.filerecordsAgent.Receive()
1274 self.filerecordsAgent.Rebuild()
1285 class Coord( object ) :
1286 def __init__( self, nWorkers, config, log ) :
1289 self.config = config
1291 self.log.name =
'GaudiPython-Parallel-Logger'
1292 self.log.info(
'GaudiPython Parallel Process Co-ordinator beginning' )
1296 self.nWorkers = cpu_count()
1298 self.nWorkers = nWorkers
1301 self.qs = self.SetupQueues( )
1302 self.hq = JoinableQueue( )
1303 self.fq = JoinableQueue( )
1306 self.sInit = Syncer( self.nWorkers, self.log,
1307 limit=WAIT_INITIALISE,
1308 step=STEP_INITIALISE )
1309 self.sRun = Syncer( self.nWorkers, self.log,
1311 limit=WAIT_SINGLE_EVENT,
1313 firstEvent=WAIT_FIRST_EVENT )
1314 self.sFin = Syncer( self.nWorkers, self.log,
1315 limit=WAIT_FINALISE,
1316 step=STEP_FINALISE )
1318 self.histSyncEvent =
Event()
1321 params = (self.nWorkers, self.histSyncEvent, self.config, self.log)
1323 self.subworkers = []
1325 for i
in range(1, self.nWorkers ) :
1326 sub = Subworker( i, self.getQueues(i), self.getSyncEvents(i), params, self.subworkers )
1327 self.subworkers.append( sub )
1328 self.reader=
Reader(self.getQueues(-1), self.getSyncEvents(-1), params, self.subworkers)
1330 wk = Worker( 0, self.getQueues(0), self.getSyncEvents(0), params, self.subworkers )
1331 self.writer= Writer(self.getQueues(-2), self.getSyncEvents(-2), params, self.subworkers)
1334 self.system.append(self.writer)
1335 self.system.append(wk)
1336 self.system.append(self.reader)
1338 def getSyncEvents( self, nodeID ) :
1339 init = self.sInit.d[nodeID].event
1340 run = ( self.sRun.d[nodeID].event, self.sRun.d[nodeID].lastEvent )
1341 fin = self.sFin.d[nodeID].event
1342 return ( init, run, fin )
1344 def getQueues( self, nodeID ) :
1345 eventQ = self.qs[ nodeID ]
1348 return ( eventQ, histQ, fsrQ )
1353 self.log.name =
'GaudiPython-Parallel-Logger'
1354 self.log.info(
'INITIALISING SYSTEM' )
1357 for p
in self.system :
1360 sc = self.sInit.syncAll(step=
"Initialise")
1361 if sc == SUCCESS:
pass
1362 else : self.Terminate() ;
return FAILURE
1365 self.log.name =
'GaudiPython-Parallel-Logger'
1366 self.log.info(
'RUNNING SYSTEM' )
1367 sc = self.sRun.syncAll(step=
"Run")
1368 if sc == SUCCESS:
pass
1369 else : self.Terminate() ;
return FAILURE
1372 self.log.name =
'GaudiPython-Parallel-Logger'
1373 self.log.info(
'FINALISING SYSTEM' )
1374 sc = self.sFin.syncAll(step=
"Finalise")
1375 if sc == SUCCESS:
pass
1376 else : self.Terminate() ;
return FAILURE
1379 self.log.info(
"Cleanly join all Processes" )
1381 self.log.info(
"Report Total Success to Main.py" )
1384 def Terminate( self ) :
1386 children = multiprocessing.active_children()
1396 self.system.reverse()
1397 for s
in self.system :
1401 def SetupQueues( self ) :
1409 rwk = JoinableQueue()
1411 workersWriter = [ JoinableQueue()
for i
in xrange(self.nWorkers) ]
1414 d[-2] = (workersWriter,
None)
1415 for i
in xrange(self.nWorkers) : d[i] = (rwk, workersWriter[i])
Essential information of the event used in examples It can be identified by "/Event".
double sum(double x, double y, double z)
NamedRange_< CONTAINER > range(const CONTAINER &cnt, std::string name)
simple function to create the named range form arbitrary container
def Reader(readerType, filename, qacross, qToEngine)