16 from datetime
import datetime
17 from pathlib
import Path
18 from string
import Template
19 from typing
import Callable, Dict, List, Optional, Union
30 STDOUT_LIMIT = int(os.environ.get(
"GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
35 A base class for running and managing subprocess executions within a test framework.
36 It provides mechanisms for setting up the environment, preparing commands,
37 and handling subprocess output and errors.
40 command: List[str] =
None
42 environment: List[str] =
None
45 popen_kwargs: Dict = {}
48 def cwd(self) -> Optional[Path]:
49 cwd = self.popen_kwargs.
get(
"cwd")
50 return Path(cwd)
if cwd
else None
55 Resolve the given path to an absolute path,
56 expanding environment variables.
57 If path looks relative and does not point to anything
60 if isinstance(path, Path):
62 path = os.path.expandvars(path)
67 path, suffix = path.rsplit(
":", 1)
70 if not os.path.isabs(path):
72 possible_path =
str((base_dir / path).resolve())
73 if os.path.exists(possible_path):
80 for item
in cls.environment:
81 key, value = item.split(
"=", 1)
86 env = dict(os.environ)
92 return Template(value).safe_substitute(env)
95 def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) ->
None:
96 for var
in vars_to_unset:
101 if not any(prog.lower().endswith(ext)
for ext
in [
".exe",
".py",
".bat"]):
108 Prepare the command to be executed, resolving paths for each part.
111 for part
in cls.command[1:]:
112 if not part.startswith(
"-"):
121 Handle a process timeout by collecting and returning the stack trace.
134 "--eval-command=thread apply all backtrace",
136 gdb = subprocess.Popen(
137 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
139 return gdb.communicate()[0].decode(
"utf-8", errors=
"backslashreplace")
145 if proc.poll()
is None:
151 Run the specified program and capture its output.
153 start_time = datetime.now()
158 os.makedirs(cls.popen_kwargs[
"cwd"], exist_ok=
True)
159 proc = subprocess.Popen(
161 stdout=subprocess.PIPE,
162 stderr=subprocess.PIPE,
167 stdout_chunks, stderr_chunks = [], []
169 exceeded_stream = stack_trace = run_exception =
None
171 proc.stdout.fileno(): (stdout_chunks,
"stdout"),
172 proc.stderr.fileno(): (stderr_chunks,
"stderr"),
176 nonlocal stdout, stderr, exceeded_stream
177 while not exceeded_stream
and proc.poll()
is None:
178 readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
179 for fileno
in readable:
180 data = os.read(fileno, 1024)
181 chunks, stream_name = streams[fileno]
183 if sum(len(chunk)
for chunk
in chunks) > STDOUT_LIMIT:
184 exceeded_stream = stream_name
187 stdout = b
"".join(stdout_chunks)
188 stderr = b
"".join(stderr_chunks)
190 thread = threading.Thread(target=read_output)
192 thread.join(cls.timeout)
194 if thread.is_alive():
197 elif exceeded_stream:
199 "Stream exceeded size limit", exceeded_stream
202 end_time = datetime.now()
204 completed_process = subprocess.CompletedProcess(
206 returncode=proc.returncode,
212 completed_process=completed_process,
213 start_time=start_time,
215 run_exception=run_exception,
217 expanded_command=command,
219 cwd=cls.popen_kwargs[
"cwd"],
222 @pytest.mark.do_not_collect_source
225 record_property: Callable[[str, str],
None],
226 fixture_result: FixtureResult,
227 reference_path: Optional[Path],
230 Record properties and handle any failures during fixture setup.
232 for key, value
in fixture_result.to_dict().items():
233 if value
is not None:
234 record_property(key, value)
236 record_property(
"reference_file",
str(reference_path))
238 if fixture_result.run_exception:
239 pytest.fail(f
"{fixture_result.run_exception}")
241 @pytest.mark.do_not_collect_source
244 Test that the return code matches the expected value.
246 assert returncode == self.returncode