The Gaudi Framework  v39r3 (979e3109)
SubprocessBaseTest.py
Go to the documentation of this file.
1 
11 import os
12 import select
13 import signal
14 import subprocess
15 import threading
16 from datetime import datetime
17 from pathlib import Path
18 from string import Template
19 from typing import Callable, Dict, List, Optional, Union
20 
21 import pytest
22 
23 from GaudiTesting.FixtureResult import (
24  ExceededStreamError,
25  FixtureResult,
26  ProcessTimeoutError,
27 )
28 from GaudiTesting.utils import file_path_for_class, kill_tree, which
29 
30 STDOUT_LIMIT = int(os.environ.get("GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
31 
32 
34  """
35  A base class for running and managing subprocess executions within a test framework.
36  It provides mechanisms for setting up the environment, preparing commands,
37  and handling subprocess output and errors.
38  """
39 
40  command: List[str] = None
41  reference: str = None
42  environment: List[str] = None
43  timeout: int = 600
44  returncode: int = 0
45  popen_kwargs: Dict = {}
46 
47  @property
48  def cwd(self) -> Optional[Path]:
49  cwd = self.popen_kwargs.get("cwd")
50  return Path(cwd) if cwd else None
51 
52  @classmethod
53  def resolve_path(cls, path: Union[Path, str]) -> str:
54  """
55  Resolve the given path to an absolute path,
56  expanding environment variables.
57  If path looks relative and does not point to anything
58  it is not modified.
59  """
60  if isinstance(path, Path):
61  path = str(path)
62  path = os.path.expandvars(path)
63 
64  # handle the special case "path/to/file:some_suffix"
65  suffix = ""
66  if ":" in path:
67  path, suffix = path.rsplit(":", 1)
68  suffix = f":{suffix}"
69 
70  if not os.path.isabs(path):
71  base_dir = file_path_for_class(cls).parent
72  possible_path = str((base_dir / path).resolve())
73  if os.path.exists(possible_path):
74  path = possible_path
75  return path + suffix
76 
77  @classmethod
78  def update_env(cls, env: Dict[str, str]) -> None:
79  if cls.environment:
80  for item in cls.environment:
81  key, value = item.split("=", 1)
82  env[key] = cls.expand_vars_from(value, env)
83 
84  @classmethod
85  def _prepare_environment(cls) -> Dict[str, str]:
86  env = dict(os.environ)
87  cls.update_env(env)
88  return env
89 
90  @staticmethod
91  def expand_vars_from(value: str, env: Dict[str, str]) -> str:
92  return Template(value).safe_substitute(env)
93 
94  @staticmethod
95  def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) -> None:
96  for var in vars_to_unset:
97  env.pop(var, None)
98 
99  @classmethod
100  def _determine_program(cls, prog: str) -> str:
101  if not any(prog.lower().endswith(ext) for ext in [".exe", ".py", ".bat"]):
102  prog += ".exe"
103  return which(prog) or cls.resolve_path(prog)
104 
105  @classmethod
106  def _prepare_command(cls, tmp_path=Path()) -> List[str]:
107  """
108  Prepare the command to be executed, resolving paths for each part.
109  """
110  command = [cls._determine_program(cls.command[0])]
111  for part in cls.command[1:]:
112  if not part.startswith("-"): # do not try to expand options
113  command.append(cls.resolve_path(part))
114  else:
115  command.append(part)
116  return command
117 
118  @classmethod
119  def _handle_timeout(cls, proc: subprocess.Popen) -> str:
120  """
121  Handle a process timeout by collecting and returning the stack trace.
122  """
123  stack_trace = cls._collect_stack_trace(proc)
124  cls._terminate_process(proc)
125  return stack_trace
126 
127  @staticmethod
128  def _collect_stack_trace(proc: subprocess.Popen) -> str:
129  cmd = [
130  "gdb",
131  "--pid",
132  str(proc.pid),
133  "--batch",
134  "--eval-command=thread apply all backtrace",
135  ]
136  gdb = subprocess.Popen(
137  cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
138  )
139  return gdb.communicate()[0].decode("utf-8", errors="backslashreplace")
140 
141  @staticmethod
142  def _terminate_process(proc: subprocess.Popen) -> None:
143  kill_tree(proc.pid, signal.SIGTERM)
144  proc.wait(60)
145  if proc.poll() is None:
146  kill_tree(proc.pid, signal.SIGKILL)
147 
148  @classmethod
149  def _prepare_execution(cls, tmp_path=None):
150  command = cls._prepare_command(tmp_path=tmp_path)
151  env = cls._prepare_environment()
152  os.makedirs(cls.popen_kwargs.get("cwd", tmp_path), exist_ok=True)
153  return command, env
154 
155  @classmethod
156  def run_program(cls, tmp_path=None) -> FixtureResult:
157  """
158  Run the specified program and capture its output.
159  """
160  start_time = datetime.now()
161  command, env = cls._prepare_execution(tmp_path=tmp_path)
162 
163  proc = subprocess.Popen(
164  command,
165  stdout=subprocess.PIPE,
166  stderr=subprocess.PIPE,
167  env=env,
168  **cls.popen_kwargs,
169  )
170 
171  stdout_chunks, stderr_chunks = [], []
172  stdout = stderr = ""
173  exceeded_stream = stack_trace = run_exception = None
174  streams = {
175  proc.stdout.fileno(): (stdout_chunks, "stdout"),
176  proc.stderr.fileno(): (stderr_chunks, "stderr"),
177  }
178 
179  def read_output():
180  nonlocal stdout, stderr, exceeded_stream
181  while not exceeded_stream and proc.poll() is None:
182  readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
183  for fileno in readable:
184  data = os.read(fileno, 1024)
185  chunks, stream_name = streams[fileno]
186  chunks.append(data)
187  if sum(len(chunk) for chunk in chunks) > STDOUT_LIMIT:
188  exceeded_stream = stream_name
189  break
190 
191  stdout = b"".join(stdout_chunks)
192  stderr = b"".join(stderr_chunks)
193 
194  thread = threading.Thread(target=read_output)
195  thread.start()
196  thread.join(cls.timeout)
197 
198  if thread.is_alive():
199  stack_trace = cls._handle_timeout(proc)
200  run_exception = ProcessTimeoutError("Process timed out", stack_trace)
201  elif exceeded_stream:
202  run_exception = ExceededStreamError(
203  "Stream exceeded size limit", exceeded_stream
204  )
205 
206  end_time = datetime.now()
207 
208  completed_process = subprocess.CompletedProcess(
209  args=command,
210  returncode=proc.returncode,
211  stdout=stdout,
212  stderr=stderr,
213  )
214 
215  return FixtureResult(
216  completed_process=completed_process,
217  start_time=start_time,
218  end_time=end_time,
219  run_exception=run_exception,
220  command=cls.command,
221  expanded_command=command,
222  env=env,
223  cwd=cls.popen_kwargs["cwd"],
224  )
225 
226  @classmethod
227  def run_program_for_dbg(cls, tmp_path):
228  tmp_path = cls.popen_kwargs.get("cwd", tmp_path)
229  command, env = cls._prepare_execution(tmp_path=tmp_path)
230  print("Running the command: ", command)
231  subprocess.run(
232  command,
233  env=env,
234  cwd=tmp_path,
235  **cls.popen_kwargs,
236  )
237 
238  @pytest.mark.do_not_collect_source
240  self,
241  record_property: Callable[[str, str], None],
242  fixture_result: FixtureResult,
243  reference_path: Optional[Path],
244  ) -> None:
245  """
246  Record properties and handle any failures during fixture setup.
247  """
248  for key, value in fixture_result.to_dict().items():
249  if value is not None:
250  record_property(key, value)
251  if reference_path:
252  record_property("reference_file", str(reference_path))
253 
254  if fixture_result.run_exception:
255  pytest.fail(f"{fixture_result.run_exception}")
256 
257  @pytest.mark.do_not_collect_source
258  def test_returncode(self, returncode: int) -> None:
259  """
260  Test that the return code matches the expected value.
261  """
262  assert returncode == self.returncode
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.run_program_for_dbg
def run_program_for_dbg(cls, tmp_path)
Definition: SubprocessBaseTest.py:227
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.expand_vars_from
str expand_vars_from(str value, Dict[str, str] env)
Definition: SubprocessBaseTest.py:91
GaudiTesting.FixtureResult.ProcessTimeoutError
Definition: FixtureResult.py:18
ReadAndWriteWhiteBoard.Path
Path
Definition: ReadAndWriteWhiteBoard.py:58
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.unset_vars
None unset_vars(Dict[str, str] env, List[str] vars_to_unset)
Definition: SubprocessBaseTest.py:95
GaudiTesting.utils.file_path_for_class
def file_path_for_class(cls)
Definition: utils.py:396
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_environment
Dict[str, str] _prepare_environment(cls)
Definition: SubprocessBaseTest.py:85
GaudiPartProp.decorators.get
get
decorate the vector of properties
Definition: decorators.py:283
GaudiTesting.BaseTest.kill_tree
def kill_tree(ppid, sig)
Definition: BaseTest.py:77
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.update_env
None update_env(cls, Dict[str, str] env)
Definition: SubprocessBaseTest.py:78
GaudiTesting.FixtureResult
Definition: FixtureResult.py:1
GaudiTesting.FixtureResult.ExceededStreamError
Definition: FixtureResult.py:24
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_command
List[str] _prepare_command(cls, tmp_path=Path())
Definition: SubprocessBaseTest.py:106
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.str
str
Definition: SubprocessBaseTest.py:41
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._determine_program
str _determine_program(cls, str prog)
Definition: SubprocessBaseTest.py:100
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._terminate_process
None _terminate_process(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:142
GaudiTesting.FixtureResult.FixtureResult
Definition: FixtureResult.py:30
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.cwd
Optional[Path] cwd(self)
Definition: SubprocessBaseTest.py:48
GaudiTesting.BaseTest.which
def which(executable)
Definition: BaseTest.py:798
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.resolve_path
str resolve_path(cls, Union[Path, str] path)
Definition: SubprocessBaseTest.py:53
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest
Definition: SubprocessBaseTest.py:33
GaudiTesting.utils
Definition: utils.py:1
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_execution
def _prepare_execution(cls, tmp_path=None)
Definition: SubprocessBaseTest.py:149
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_returncode
None test_returncode(self, int returncode)
Definition: SubprocessBaseTest.py:258
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_fixture_setup
None test_fixture_setup(self, Callable[[str, str], None] record_property, FixtureResult fixture_result, Optional[Path] reference_path)
Definition: SubprocessBaseTest.py:239
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._handle_timeout
str _handle_timeout(cls, subprocess.Popen proc)
Definition: SubprocessBaseTest.py:119
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.run_program
FixtureResult run_program(cls, tmp_path=None)
Definition: SubprocessBaseTest.py:156
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._collect_stack_trace
str _collect_stack_trace(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:128