The Gaudi Framework  v39r0 (5b8b5eda)
SubprocessBaseTest.py
Go to the documentation of this file.
1 
11 import os
12 import select
13 import signal
14 import subprocess
15 import threading
16 from datetime import datetime
17 from pathlib import Path
18 from string import Template
19 from typing import Callable, Dict, List, Optional, Union
20 
21 import pytest
22 
23 from GaudiTesting.FixtureResult import (
24  ExceededStreamError,
25  FixtureResult,
26  ProcessTimeoutError,
27 )
28 from GaudiTesting.utils import file_path_for_class, kill_tree, which
29 
30 STDOUT_LIMIT = int(os.environ.get("GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
31 
32 
34  """
35  A base class for running and managing subprocess executions within a test framework.
36  It provides mechanisms for setting up the environment, preparing commands,
37  and handling subprocess output and errors.
38  """
39 
40  command: List[str] = None
41  reference: str = None
42  environment: List[str] = None
43  timeout: int = 600
44  returncode: int = 0
45  popen_kwargs: Dict = {}
46 
47  @property
48  def cwd(self) -> Optional[Path]:
49  cwd = self.popen_kwargs.get("cwd")
50  return Path(cwd) if cwd else None
51 
52  @classmethod
53  def resolve_path(cls, path: Union[Path, str]) -> str:
54  """
55  Resolve the given path to an absolute path,
56  expanding environment variables.
57  If path looks relative and does not point to anything
58  it is not modified.
59  """
60  if isinstance(path, Path):
61  path = str(path)
62  path = os.path.expandvars(path)
63  if not os.path.isabs(path):
64  base_dir = file_path_for_class(cls).parent
65  possible_path = str((base_dir / path).resolve())
66  if os.path.exists(possible_path):
67  path = possible_path
68  return path
69 
70  @classmethod
71  def update_env(cls, env: Dict[str, str]) -> None:
72  if cls.environment:
73  for item in cls.environment:
74  key, value = item.split("=", 1)
75  env[key] = cls.expand_vars_from(value, env)
76 
77  @classmethod
78  def _prepare_environment(cls) -> Dict[str, str]:
79  env = dict(os.environ)
80  cls.update_env(env)
81  return env
82 
83  @staticmethod
84  def expand_vars_from(value: str, env: Dict[str, str]) -> str:
85  return Template(value).safe_substitute(env)
86 
87  @staticmethod
88  def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) -> None:
89  for var in vars_to_unset:
90  env.pop(var, None)
91 
92  @classmethod
93  def _determine_program(cls, prog: str) -> str:
94  if not any(prog.lower().endswith(ext) for ext in [".exe", ".py", ".bat"]):
95  prog += ".exe"
96  return which(prog) or cls.resolve_path(prog)
97 
98  @classmethod
99  def _prepare_command(cls, tmp_path=Path()) -> List[str]:
100  """
101  Prepare the command to be executed, resolving paths for each part.
102  """
103  command = [cls._determine_program(cls.command[0])]
104  for part in cls.command[1:]:
105  if cls._is_file_path(part):
106  command.append(cls.resolve_path(part))
107  else:
108  command.append(part)
109  return command
110 
111  @staticmethod
112  def _is_file_path(part: str) -> bool:
113  return not part.startswith("-") and ":" not in part
114 
115  @classmethod
116  def _handle_timeout(cls, proc: subprocess.Popen) -> str:
117  """
118  Handle a process timeout by collecting and returning the stack trace.
119  """
120  stack_trace = cls._collect_stack_trace(proc)
121  cls._terminate_process(proc)
122  return stack_trace
123 
124  @staticmethod
125  def _collect_stack_trace(proc: subprocess.Popen) -> str:
126  cmd = [
127  "gdb",
128  "--pid",
129  str(proc.pid),
130  "--batch",
131  "--eval-command=thread apply all backtrace",
132  ]
133  gdb = subprocess.Popen(
134  cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
135  )
136  return gdb.communicate()[0].decode("utf-8", errors="backslashreplace")
137 
138  @staticmethod
139  def _terminate_process(proc: subprocess.Popen) -> None:
140  kill_tree(proc.pid, signal.SIGTERM)
141  proc.wait(60)
142  if proc.poll() is None:
143  kill_tree(proc.pid, signal.SIGKILL)
144 
145  @classmethod
146  def run_program(cls, tmp_path=None) -> FixtureResult:
147  """
148  Run the specified program and capture its output.
149  """
150  start_time = datetime.now()
151  command = cls._prepare_command(tmp_path=tmp_path)
152  env = cls._prepare_environment()
153  # ensure the required working directory exist
154  # (the entry cwd of popen_kwargs is set by the fixture "fixture_result")
155  os.makedirs(cls.popen_kwargs["cwd"], exist_ok=True)
156  proc = subprocess.Popen(
157  command,
158  stdout=subprocess.PIPE,
159  stderr=subprocess.PIPE,
160  env=env,
161  **cls.popen_kwargs,
162  )
163 
164  stdout_chunks, stderr_chunks = [], []
165  stdout = stderr = ""
166  exceeded_stream = stack_trace = failure = None
167  streams = {
168  proc.stdout.fileno(): (stdout_chunks, "stdout"),
169  proc.stderr.fileno(): (stderr_chunks, "stderr"),
170  }
171 
172  def read_output():
173  nonlocal stdout, stderr, exceeded_stream
174  while not exceeded_stream and proc.poll() is None:
175  readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
176  for fileno in readable:
177  data = os.read(fileno, 1024)
178  chunks, stream_name = streams[fileno]
179  chunks.append(data)
180  if sum(len(chunk) for chunk in chunks) > STDOUT_LIMIT:
181  exceeded_stream = stream_name
182  break
183 
184  stdout = b"".join(stdout_chunks)
185  stderr = b"".join(stderr_chunks)
186 
187  thread = threading.Thread(target=read_output)
188  thread.start()
189  thread.join(cls.timeout)
190 
191  if thread.is_alive():
192  stack_trace = cls._handle_timeout(proc)
193  failure = ProcessTimeoutError("Process timed out", stack_trace)
194  elif exceeded_stream:
195  failure = ExceededStreamError("Stream exceeded size limit", exceeded_stream)
196 
197  end_time = datetime.now()
198 
199  completed_process = subprocess.CompletedProcess(
200  args=command,
201  returncode=proc.returncode,
202  stdout=stdout,
203  stderr=stderr,
204  )
205 
206  return FixtureResult(
207  completed_process=completed_process,
208  start_time=start_time,
209  end_time=end_time,
210  failure=failure,
211  command=cls.command,
212  expanded_command=command,
213  env=env,
214  cwd=cls.popen_kwargs["cwd"],
215  )
216 
217  @pytest.mark.do_not_collect_source
219  self,
220  record_property: Callable[[str, str], None],
221  fixture_result: FixtureResult,
222  reference_path: Optional[Path],
223  ) -> None:
224  """
225  Record properties and handle any failures during fixture setup.
226  """
227  for key, value in fixture_result.to_dict().items():
228  if value is not None:
229  record_property(key, value)
230  if reference_path:
231  record_property("reference_file", str(reference_path))
232 
233  if fixture_result.failure:
234  pytest.fail(f"{fixture_result.failure}")
235 
236  @pytest.mark.do_not_collect_source
237  def test_returncode(self, returncode: int) -> None:
238  """
239  Test that the return code matches the expected value.
240  """
241  assert returncode == self.returncode
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.expand_vars_from
str expand_vars_from(str value, Dict[str, str] env)
Definition: SubprocessBaseTest.py:84
GaudiTesting.FixtureResult.ProcessTimeoutError
Definition: FixtureResult.py:16
ReadAndWriteWhiteBoard.Path
Path
Definition: ReadAndWriteWhiteBoard.py:58
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.unset_vars
None unset_vars(Dict[str, str] env, List[str] vars_to_unset)
Definition: SubprocessBaseTest.py:88
GaudiTesting.utils.file_path_for_class
def file_path_for_class(cls)
Definition: utils.py:394
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_environment
Dict[str, str] _prepare_environment(cls)
Definition: SubprocessBaseTest.py:78
GaudiPartProp.decorators.get
get
decorate the vector of properties
Definition: decorators.py:283
GaudiTesting.BaseTest.kill_tree
def kill_tree(ppid, sig)
Definition: BaseTest.py:77
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.update_env
None update_env(cls, Dict[str, str] env)
Definition: SubprocessBaseTest.py:71
GaudiTesting.FixtureResult
Definition: FixtureResult.py:1
GaudiTesting.FixtureResult.ExceededStreamError
Definition: FixtureResult.py:22
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._prepare_command
List[str] _prepare_command(cls, tmp_path=Path())
Definition: SubprocessBaseTest.py:99
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.str
str
Definition: SubprocessBaseTest.py:41
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._is_file_path
bool _is_file_path(str part)
Definition: SubprocessBaseTest.py:112
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._determine_program
str _determine_program(cls, str prog)
Definition: SubprocessBaseTest.py:93
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._terminate_process
None _terminate_process(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:139
GaudiTesting.FixtureResult.FixtureResult
Definition: FixtureResult.py:28
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.cwd
Optional[Path] cwd(self)
Definition: SubprocessBaseTest.py:48
GaudiTesting.BaseTest.which
def which(executable)
Definition: BaseTest.py:791
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.resolve_path
str resolve_path(cls, Union[Path, str] path)
Definition: SubprocessBaseTest.py:53
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest
Definition: SubprocessBaseTest.py:33
GaudiTesting.utils
Definition: utils.py:1
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_returncode
None test_returncode(self, int returncode)
Definition: SubprocessBaseTest.py:237
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.test_fixture_setup
None test_fixture_setup(self, Callable[[str, str], None] record_property, FixtureResult fixture_result, Optional[Path] reference_path)
Definition: SubprocessBaseTest.py:218
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._handle_timeout
str _handle_timeout(cls, subprocess.Popen proc)
Definition: SubprocessBaseTest.py:116
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest.run_program
FixtureResult run_program(cls, tmp_path=None)
Definition: SubprocessBaseTest.py:146
GaudiTesting.SubprocessBaseTest.SubprocessBaseTest._collect_stack_trace
str _collect_stack_trace(subprocess.Popen proc)
Definition: SubprocessBaseTest.py:125