16 from datetime
import datetime
17 from pathlib
import Path
18 from string
import Template
19 from typing
import Callable, Dict, List, Optional, Union
30 STDOUT_LIMIT = int(os.environ.get(
"GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
35 A base class for running and managing subprocess executions within a test framework.
36 It provides mechanisms for setting up the environment, preparing commands,
37 and handling subprocess output and errors.
40 command: List[str] =
None
42 environment: List[str] =
None
45 popen_kwargs: Dict = {}
48 def cwd(self) -> Optional[Path]:
49 cwd = self.popen_kwargs.
get(
"cwd")
50 return Path(cwd)
if cwd
else None
55 Resolve the given path to an absolute path,
56 expanding environment variables.
57 If path looks relative and does not point to anything
60 if isinstance(path, Path):
62 path = os.path.expandvars(path)
63 if not os.path.isabs(path):
65 possible_path =
str((base_dir / path).resolve())
66 if os.path.exists(possible_path):
73 for item
in cls.environment:
74 key, value = item.split(
"=", 1)
79 env = dict(os.environ)
85 return Template(value).safe_substitute(env)
88 def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) ->
None:
89 for var
in vars_to_unset:
94 if not any(prog.lower().endswith(ext)
for ext
in [
".exe",
".py",
".bat"]):
101 Prepare the command to be executed, resolving paths for each part.
104 for part
in cls.command[1:]:
113 return not part.startswith(
"-")
and ":" not in part
118 Handle a process timeout by collecting and returning the stack trace.
131 "--eval-command=thread apply all backtrace",
133 gdb = subprocess.Popen(
134 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
136 return gdb.communicate()[0].decode(
"utf-8", errors=
"backslashreplace")
142 if proc.poll()
is None:
148 Run the specified program and capture its output.
150 start_time = datetime.now()
155 os.makedirs(cls.popen_kwargs[
"cwd"], exist_ok=
True)
156 proc = subprocess.Popen(
158 stdout=subprocess.PIPE,
159 stderr=subprocess.PIPE,
164 stdout_chunks, stderr_chunks = [], []
166 exceeded_stream = stack_trace = failure =
None
168 proc.stdout.fileno(): (stdout_chunks,
"stdout"),
169 proc.stderr.fileno(): (stderr_chunks,
"stderr"),
173 nonlocal stdout, stderr, exceeded_stream
174 while not exceeded_stream
and proc.poll()
is None:
175 readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
176 for fileno
in readable:
177 data = os.read(fileno, 1024)
178 chunks, stream_name = streams[fileno]
180 if sum(len(chunk)
for chunk
in chunks) > STDOUT_LIMIT:
181 exceeded_stream = stream_name
184 stdout = b
"".join(stdout_chunks)
185 stderr = b
"".join(stderr_chunks)
187 thread = threading.Thread(target=read_output)
189 thread.join(cls.timeout)
191 if thread.is_alive():
194 elif exceeded_stream:
197 end_time = datetime.now()
199 completed_process = subprocess.CompletedProcess(
201 returncode=proc.returncode,
207 completed_process=completed_process,
208 start_time=start_time,
212 expanded_command=command,
214 cwd=cls.popen_kwargs[
"cwd"],
217 @pytest.mark.do_not_collect_source
220 record_property: Callable[[str, str],
None],
221 fixture_result: FixtureResult,
222 reference_path: Optional[Path],
225 Record properties and handle any failures during fixture setup.
227 for key, value
in fixture_result.to_dict().items():
228 if value
is not None:
229 record_property(key, value)
231 record_property(
"reference_file",
str(reference_path))
233 if fixture_result.failure:
234 pytest.fail(f
"{fixture_result.failure}")
236 @pytest.mark.do_not_collect_source
239 Test that the return code matches the expected value.
241 assert returncode == self.returncode