The Gaudi Framework  v29r0 (ff2e7097)
BrunelScenarioForwardScheduler.py
Go to the documentation of this file.
1 from Gaudi.Configuration import *
2 from Configurables import GaudiExamplesCommonConf, CPUCruncher, HiveSlimEventLoopMgr, HiveWhiteBoard, ForwardSchedulerSvc
3 
4 #-------------------------------------------------------------------------------
5 # Metaconfig
6 
7 NUMBEROFEVENTS = 23
8 NUMBEROFEVENTSINFLIGHT = 7
9 NUMBEROFALGOSINFLIGHT = 11
10 NUMBEROFTHREADS = 2
11 CARDINALITY = 11
12 SCALE = .1
13 VERBOSITY = 5
14 
15 NumberOfEvents = NUMBEROFEVENTS
16 NumberOfEventsInFlight = NUMBEROFEVENTSINFLIGHT
17 NumberOfAlgosInFlight = NUMBEROFALGOSINFLIGHT
18 NumberOfThreads = NUMBEROFTHREADS
19 Cardinality = CARDINALITY
20 Scale = SCALE
21 Verbosity = VERBOSITY
22 
23 
24 #-------------------------------------------------------------------------------
25 
26 
27 def load_brunel_scenario(filename):
28  algs = {}
29  timing = {}
30  objs = []
31  curr = None
32  order = 0
33  nodes = ('/Event', '/Event/Rec', '/Event/DAQ')
34  for l in open(filename).readlines():
35  if l.find('StoreTracer') == 0:
36  if l.find('Executing Algorithm') != -1:
37  alg = l.split()[-1]
38  if alg not in algs.keys():
39  algs[alg] = (order, set(), set())
40  curr = alg
41  order += 1
42  elif l.find('Done with Algorithm') != -1:
43  curr = None
44  elif l.find('[EventDataSvc]') != -1 and curr:
45  obj = l.split()[-1]
46  if obj in nodes:
47  continue
48  if obj.find('/Event/') == 0:
49  obj = obj[7:]
50  obj = obj.replace('/', '_')
51  if obj not in objs:
52  objs.append(obj)
53  talg = algs[curr]
54  if l.find('RETRIEVE') != -1:
55  if obj not in talg[1]:
56  talg[1].add(obj)
57  elif l.find('REGOBJ') != -1:
58  if obj not in talg[2]:
59  talg[2].add(obj)
60  if l.find("TimingAuditor") != -1:
61  algo = l.split()[2] # .rstrip("|")
62  index = 13
63  if algo.endswith("|"):
64  index = 12
65  algo = algo.rstrip("|")
66  if algo in algs.keys():
67  timing[algo] = l.split()[index]
68  else:
69  for name in algs.keys():
70  if name.startswith(algo):
71  timing[name] = l.split()[index]
72 
73  all_inputs = set()
74  all_outputs = set()
75  all_algos = []
76  all_algos_inputs = []
77 
78  # Scale all algo timings if needed
79  if Scale != -1:
80  for alg in timing.keys():
81  old_timing = float(timing[alg])
82  new_timing = old_timing * Scale
83  # print "Algorithm %s: %f --> %f" %(alg, old_timing, new_timing)
84  timing[alg] = new_timing
85 
86  for i, (alg, deps) in enumerate(algs.items()):
87  if alg in ["PatPVOffline", "PrsADCs"]:
88  continue
89  if deps[1] or deps[2]:
90  inputs = []
91  inputs = [item for item in deps[1] if item not in (
92  "DAQ_ODIN", "DAQ_RawEvent") and item not in deps[2]]
93  outputs = [item for item in deps[2]]
94  new_algo = CPUCruncher(alg,
95  avgRuntime=float(timing[alg]),
96  inpKeys=map(
97  lambda s: "/Event/" + s, inputs),
98  outKeys=map(
99  lambda s: "/Event/" + s, outputs),
100  # DataInputs=map( lambda s: "/Event/"+s, inputs),
101  # DataOutputs=map( lambda s: "/Event/"+s, outputs),
102  OutputLevel=6,
103  shortCalib=True
104  )
105 
106  for item in deps[1]:
107  all_inputs.add(item)
108  for item in deps[2]:
109  all_outputs.add(item)
110  all_algos.append(new_algo)
111  all_algos_inputs.append(inputs)
112  # look for the objects that haven't been provided within the job. Assume this needs to come via input
113  new_algo = CPUCruncher("input",
114  avgRuntime=1,
115  inpKeys=[],
116  outKeys=map(
117  lambda s: "/Event/" + s, [item for item in all_inputs.difference(all_outputs)]),
118  OutputLevel=WARNING
119  )
120  all_algos.append(new_algo)
121  all_algos_inputs.append([])
122  for algo in all_algos:
123  algo.Cardinality = Cardinality
124  OutputLevel = WARNING
125 
126  return all_algos, all_algos_inputs
127 
128 
129 # Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL )
131 ms.OutputLevel = Verbosity
132 
133 crunchers, inputs = load_brunel_scenario("Brunel.TES.trace.log")
134 
135 whiteboard = HiveWhiteBoard("EventDataSvc",
136  EventSlots=NumberOfEventsInFlight)
137 
138 slimeventloopmgr = HiveSlimEventLoopMgr(OutputLevel=INFO)
139 
140 scheduler = ForwardSchedulerSvc(MaxEventsInFlight=NumberOfEventsInFlight,
141  MaxAlgosInFlight=NumberOfAlgosInFlight,
142  ThreadPoolSize=NumberOfThreads,
143  OutputLevel=INFO
144  )
145 
146 # And the Application Manager
148 app.TopAlg = crunchers
149 app.EvtSel = "NONE" # do not use any event input
150 app.EvtMax = NumberOfEvents
151 app.EventLoop = slimeventloopmgr
152 app.ExtSvc = [whiteboard]
153 app.MessageSvcType = "InertMessageSvc"
154 app.OutputLevel = INFO
struct GAUDI_API map
Parametrisation class for map-like implementation.
The Application Manager class.