The Gaudi Framework  master (37c0b60a)
SubprocessBaseTest.py
Go to the documentation of this file.
1 
11 import os
12 import select
13 import signal
14 import subprocess
15 import threading
16 from datetime import datetime
17 from pathlib import Path
18 from string import Template
19 from typing import Callable, Dict, List, Optional, Union
20 
21 import pytest
22 
23 from GaudiTesting.FixtureResult import (
24  ExceededStreamError,
25  FixtureResult,
26  ProcessTimeoutError,
27 )
28 from GaudiTesting.utils import file_path_for_class, kill_tree, which
29 
30 STDOUT_LIMIT = int(os.environ.get("GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
31 
32 
34  """
35  A base class for running and managing subprocess executions within a test framework.
36  It provides mechanisms for setting up the environment, preparing commands,
37  and handling subprocess output and errors.
38  """
39 
40  command: List[str] = None
41  reference: str = None
42  environment: List[str] = None
43  timeout: int = 600
44  returncode: int = 0
45  popen_kwargs: Dict = {}
46 
47  @property
48  def cwd(self) -> Optional[Path]:
49  cwd = self.popen_kwargs.get("cwd")
50  return Path(cwd) if cwd else None
51 
52  @classmethod
53  def resolve_path(cls, path: Union[Path, str]) -> str:
54  """
55  Resolve the given path to an absolute path,
56  expanding environment variables.
57  If path looks relative and does not point to anything
58  it is not modified.
59  """
60  if isinstance(path, Path):
61  path = str(path)
62  path = os.path.expandvars(path)
63 
64  # handle the special case "path/to/file:some_suffix"
65  suffix = ""
66  if ":" in path:
67  path, suffix = path.rsplit(":", 1)
68  suffix = f":{suffix}"
69 
70  if not os.path.isabs(path):
71  base_dir = file_path_for_class(cls).parent
72  possible_path = str((base_dir / path).resolve())
73  if os.path.exists(possible_path):
74  path = possible_path
75  return path + suffix
76 
77  @classmethod
78  def update_env(cls, env: Dict[str, str]) -> None:
79  if cls.environment:
80  for item in cls.environment:
81  key, value = item.split("=", 1)
82  env[key] = cls.expand_vars_from(value, env)
83 
84  @classmethod
85  def _prepare_environment(cls) -> Dict[str, str]:
86  env = dict(os.environ)
87  cls.update_env(env)
88  return env
89 
90  @staticmethod
91  def expand_vars_from(value: str, env: Dict[str, str]) -> str:
92  return Template(value).safe_substitute(env)
93 
94  @staticmethod
95  def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) -> None:
96  for var in vars_to_unset:
97  env.pop(var, None)
98 
99  @classmethod
100  def _determine_program(cls, prog: str) -> str:
101  if not any(prog.lower().endswith(ext) for ext in [".exe", ".py", ".bat"]):
102  prog += ".exe"
103  return which(prog) or cls.resolve_path(prog)
104 
105  @classmethod
106  def _prepare_command(cls, tmp_path=Path()) -> List[str]:
107  """
108  Prepare the command to be executed, resolving paths for each part.
109  """
110  command = [cls._determine_program(cls.command[0])]
111  for part in cls.command[1:]:
112  if not part.startswith("-"): # do not try to expand options
113  command.append(cls.resolve_path(part))
114  else:
115  command.append(part)
116  return command
117 
118  @classmethod
119  def _handle_timeout(cls, proc: subprocess.Popen) -> str:
120  """
121  Handle a process timeout by collecting and returning the stack trace.
122  """
123  stack_trace = cls._collect_stack_trace(proc)
124  cls._terminate_process(proc)
125  return stack_trace
126 
127  @staticmethod
128  def _collect_stack_trace(proc: subprocess.Popen) -> str:
129  cmd = [
130  "gdb",
131  "--pid",
132  str(proc.pid),
133  "--batch",
134  "--eval-command=thread apply all backtrace",
135  ]
136  gdb = subprocess.Popen(
137  cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
138  )
139  return gdb.communicate()[0].decode("utf-8", errors="backslashreplace")
140 
141  @staticmethod
142  def _terminate_process(proc: subprocess.Popen) -> None:
143  kill_tree(proc.pid, signal.SIGTERM)
144  proc.wait(60)
145  if proc.poll() is None:
146  kill_tree(proc.pid, signal.SIGKILL)
147 
148  @classmethod
149  def run_program(cls, tmp_path=None) -> FixtureResult:
150  """
151  Run the specified program and capture its output.
152  """
153  start_time = datetime.now()
154  command = cls._prepare_command(tmp_path=tmp_path)
155  env = cls._prepare_environment()
156  # ensure the required working directory exist
157  # (the entry cwd of popen_kwargs is set by the fixture "fixture_result")
158  os.makedirs(cls.popen_kwargs["cwd"], exist_ok=True)
159  proc = subprocess.Popen(
160  command,
161  stdout=subprocess.PIPE,
162  stderr=subprocess.PIPE,
163  env=env,
164  **cls.popen_kwargs,
165  )
166 
167  stdout_chunks, stderr_chunks = [], []
168  stdout = stderr = ""
169  exceeded_stream = stack_trace = run_exception = None
170  streams = {
171  proc.stdout.fileno(): (stdout_chunks, "stdout"),
172  proc.stderr.fileno(): (stderr_chunks, "stderr"),
173  }
174 
175  def read_output():
176  nonlocal stdout, stderr, exceeded_stream
177  while not exceeded_stream and proc.poll() is None:
178  readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
179  for fileno in readable:
180  data = os.read(fileno, 1024)
181  chunks, stream_name = streams[fileno]
182  chunks.append(data)
183  if sum(len(chunk) for chunk in chunks) > STDOUT_LIMIT:
184  exceeded_stream = stream_name
185  break
186 
187  stdout = b"".join(stdout_chunks)
188  stderr = b"".join(stderr_chunks)
189 
190  thread = threading.Thread(target=read_output)
191  thread.start()
192  thread.join(cls.timeout)
193 
194  if thread.is_alive():
195  stack_trace = cls._handle_timeout(proc)
196  run_exception = ProcessTimeoutError("Process timed out", stack_trace)
197  elif exceeded_stream:
198  run_exception = ExceededStreamError(
199  "Stream exceeded size limit", exceeded_stream
200  )
201 
202  end_time = datetime.now()
203 
204  completed_process = subprocess.CompletedProcess(
205  args=command,
206  returncode=proc.returncode,
207  stdout=stdout,
208  stderr=stderr,
209  )
210 
211  return FixtureResult(
212  completed_process=completed_process,
213  start_time=start_time,
214  end_time=end_time,
215  run_exception=run_exception,
216  command=cls.command,
217  expanded_command=command,
218  env=env,
219  cwd=cls.popen_kwargs["cwd"],
220  )
221 
222  @pytest.mark.do_not_collect_source
224  self,
225  record_property: Callable[[str, str], None],
226  fixture_result: FixtureResult,
227  reference_path: Optional[Path],
228  ) -> None:
229  """
230  Record properties and handle any failures during fixture setup.
231  """
232  for key, value in fixture_result.to_dict().items():
233  if value is not None:
234  record_property(key, value)
235  if reference_path:
236  record_property("reference_file", str(reference_path))
237 
238  if fixture_result.run_exception:
239  pytest.fail(f"{fixture_result.run_exception}")
240 
241  @pytest.mark.do_not_collect_source
242  def test_returncode(self, returncode: int) -> None:
243  """
244  Test that the return code matches the expected value.
245  """
246  assert returncode == self.returncode
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.expand_vars_from
str expand_vars_from(str value, Dict[str, str] env)
Definition: SubprocessBaseTest.py:91
GaudiTesting.FixtureResult.ProcessTimeoutError
Definition: FixtureResult.py:16
ReadAndWriteWhiteBoard.Path
Path
Definition: ReadAndWriteWhiteBoard.py:58
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.unset_vars
None unset_vars(Dict[str, str] env, List[str] vars_to_unset)
Definition: SubprocessBaseTest.py:95
GaudiTesting.utils.file_path_for_class
def file_path_for_class(cls)
Definition: utils.py:396
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_environment
Dict[str, str] _prepare_environment(cls)
Definition: SubprocessBaseTest.py:85
GaudiPartProp.decorators.get
get
decorate the vector of properties
Definition: decorators.py:283
GaudiTesting.BaseTest.kill_tree
def kill_tree(ppid, sig)
Definition: BaseTest.py:77
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.update_env
None update_env(cls, Dict[str, str] env)
Definition: SubprocessBaseTest.py:78
GaudiTesting.FixtureResult
Definition: FixtureResult.py:1
GaudiTesting.FixtureResult.ExceededStreamError
Definition: FixtureResult.py:22
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_command
List[str] _prepare_command(cls, tmp_path=Path())
Definition: SubprocessBaseTest.py:106
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.str
str
Definition: SubprocessBaseTest.py:41
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._determine_program
str _determine_program(cls, str prog)
Definition: SubprocessBaseTest.py:100
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._terminate_process
None _terminate_process(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:142
GaudiTesting.FixtureResult.FixtureResult
Definition: FixtureResult.py:28
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.cwd
Optional[Path] cwd(self)
Definition: SubprocessBaseTest.py:48
GaudiTesting.BaseTest.which
def which(executable)
Definition: BaseTest.py:798
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.resolve_path
str resolve_path(cls, Union[Path, str] path)
Definition: SubprocessBaseTest.py:53
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest
Definition: SubprocessBaseTest.py:33
GaudiTesting.utils
Definition: utils.py:1
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_returncode
None test_returncode(self, int returncode)
Definition: SubprocessBaseTest.py:242
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_fixture_setup
None test_fixture_setup(self, Callable[[str, str], None] record_property, FixtureResult fixture_result, Optional[Path] reference_path)
Definition: SubprocessBaseTest.py:223
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._handle_timeout
str _handle_timeout(cls, subprocess.Popen proc)
Definition: SubprocessBaseTest.py:119
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.run_program
FixtureResult run_program(cls, tmp_path=None)
Definition: SubprocessBaseTest.py:149
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._collect_stack_trace
str _collect_stack_trace(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:128