AthenaScenario.py
Go to the documentation of this file.
1 import json
2 from Gaudi.Configuration import *
3 # ============================================================================
4 from Configurables import GaudiExamplesCommonConf, CPUCruncher,HiveEventLoopMgr, HiveWhiteBoard
5 #GaudiExamplesCommonConf()
6 # ============================================================================
7 
8 #-------------------------------------------------------------------------------
9 # Metaconfig
10 
11 NUMBEROFEVENTS = 1
12 NUMBEROFEVENTSINFLIGHT = 1
13 NUMBEROFALGOSINFLIGHT = 1000
14 NUMBEROFTHREADS = 1
15 CLONEALGOS = False
16 DUMPQUEUES = False
17 VERBOSITY = 3
18 
19 
20 NumberOfEvents = NUMBEROFEVENTS
21 NumberOfEventsInFlight = NUMBEROFEVENTSINFLIGHT
22 NumberOfAlgosInFlight = NUMBEROFALGOSINFLIGHT
23 NumberOfThreads = NUMBEROFTHREADS
24 CloneAlgos = CLONEALGOS
25 DumpQueues = DUMPQUEUES
26 Verbosity = VERBOSITY
27 
28 
29 def load_athena_scenario(filename):
30  data = open(filename).read()
31  workflow = eval(data)
32  cpu_cruncher_algos = []
33  cpu_cruncher_algos_inputs = []
34  all_outputs = set()
35  all_inputs = set()
36  for algo in workflow["algorithms"]:
37  # Correct in presence of list wi
38  for starputs in ("inputs","outputs"):
39  algo[starputs] = [item.replace("/","_") for item in algo[starputs]]
40  if algo[starputs] == ['']: algo[starputs] = []
41  if algo[starputs] == ['dummy']: algo[starputs] = []
42  cleaned_inputs = [input for input in algo["inputs"] if (input not in algo["outputs"] ) ]
43 
44  # fix double declaration of outputs (nokey)
45  cleaned_outputs = [output for output in algo["outputs"] if (output not in all_outputs)]
46  new_algo = CPUCruncher(algo["name"],
47  avgRuntime=float(algo["runtimes"][0]/1000000.),
48  DataInputs = cleaned_inputs,
49  DataOutputs = cleaned_outputs
50  )
51  cpu_cruncher_algos.append(new_algo)
52  all_outputs.update(algo["outputs"])
53  all_inputs.update(algo["inputs"])
54  cpu_cruncher_algos_inputs.append(algo["inputs"])
55 
56  #look for the objects that haven't been provided within the job. Assume this needs to come via input
57  new_algo = CPUCruncher("input",
58  avgRuntime=1,
59  DataInputs=[],
60  DataOutputs=[item for item in all_inputs.difference(all_outputs)]
61  )
62  cpu_cruncher_algos.append(new_algo)
63  cpu_cruncher_algos_inputs.append([])
64 
65  print [item for item in all_inputs.difference(all_outputs)]
66 
67  print len(all_outputs)
68  print len(cpu_cruncher_algos)
69  return cpu_cruncher_algos,cpu_cruncher_algos_inputs
70 
71 # Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL )
72 ms = MessageSvc()
73 ms.OutputLevel = Verbosity
74 
75 crunchers,inputs = load_athena_scenario("Athena_loopfixed.json")
76 
77 whiteboard = HiveWhiteBoard("EventDataSvc", EventSlots = NumberOfEventsInFlight)
78 
79 # Setup the Event Loop Manager
80 evtloop = HiveEventLoopMgr()
81 evtloop.MaxAlgosParallel = NumberOfAlgosInFlight
82 evtloop.MaxEventsParallel = NumberOfEventsInFlight
83 evtloop.NumThreads = NumberOfThreads
84 evtloop.CloneAlgorithms = CloneAlgos
85 evtloop.DumpQueues = DumpQueues
86 evtloop.AlgosDependencies = inputs
87 
88 # And the Application Manager
90 app.TopAlg = crunchers
91 app.EvtSel = "NONE" # do not use any event input
92 app.EvtMax = NumberOfEvents
93 app.EventLoop = evtloop
94 app.ExtSvc =[whiteboard]
95 #app.MessageSvcType = "TBBMessageSvc"
96 
The Application Manager class.
def load_athena_scenario(filename)