16 from datetime
import datetime
17 from pathlib
import Path
18 from string
import Template
19 from typing
import Callable, Dict, List, Optional, Union
30 STDOUT_LIMIT = int(os.environ.get(
"GAUDI_TEST_STDOUT_LIMIT", 1024**2 * 100))
35 A base class for running and managing subprocess executions within a test framework.
36 It provides mechanisms for setting up the environment, preparing commands,
37 and handling subprocess output and errors.
40 command: List[str] =
None
42 environment: List[str] =
None
45 popen_kwargs: Dict = {}
48 def cwd(self) -> Optional[Path]:
49 cwd = self.popen_kwargs.
get(
"cwd")
50 return Path(cwd)
if cwd
else None
55 Resolve the given path to an absolute path,
56 expanding environment variables.
57 If path looks relative and does not point to anything
60 if isinstance(path, Path):
62 path = os.path.expandvars(path)
67 path, suffix = path.rsplit(
":", 1)
70 if not os.path.isabs(path):
72 possible_path =
str((base_dir / path).resolve())
73 if os.path.exists(possible_path):
80 for item
in cls.environment:
81 key, value = item.split(
"=", 1)
86 env = dict(os.environ)
92 return Template(value).safe_substitute(env)
95 def unset_vars(env: Dict[str, str], vars_to_unset: List[str]) ->
None:
96 for var
in vars_to_unset:
101 if not any(prog.lower().endswith(ext)
for ext
in [
".exe",
".py",
".bat"]):
108 Prepare the command to be executed, resolving paths for each part.
111 for part
in cls.command[1:]:
112 if not part.startswith(
"-"):
121 Handle a process timeout by collecting and returning the stack trace.
134 "--eval-command=thread apply all backtrace",
136 gdb = subprocess.Popen(
137 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
139 return gdb.communicate()[0].decode(
"utf-8", errors=
"backslashreplace")
145 if proc.poll()
is None:
152 os.makedirs(cls.popen_kwargs.
get(
"cwd", tmp_path), exist_ok=
True)
158 Run the specified program and capture its output.
160 start_time = datetime.now()
163 proc = subprocess.Popen(
165 stdout=subprocess.PIPE,
166 stderr=subprocess.PIPE,
171 stdout_chunks, stderr_chunks = [], []
173 exceeded_stream = stack_trace = run_exception =
None
175 proc.stdout.fileno(): (stdout_chunks,
"stdout"),
176 proc.stderr.fileno(): (stderr_chunks,
"stderr"),
180 nonlocal stdout, stderr, exceeded_stream
181 while not exceeded_stream
and proc.poll()
is None:
182 readable, _, _ = select.select(streams.keys(), [], [], cls.timeout)
183 for fileno
in readable:
184 data = os.read(fileno, 1024)
185 chunks, stream_name = streams[fileno]
187 if sum(len(chunk)
for chunk
in chunks) > STDOUT_LIMIT:
188 exceeded_stream = stream_name
191 stdout = b
"".join(stdout_chunks)
192 stderr = b
"".join(stderr_chunks)
194 thread = threading.Thread(target=read_output)
196 thread.join(cls.timeout)
198 if thread.is_alive():
201 elif exceeded_stream:
203 "Stream exceeded size limit", exceeded_stream
206 end_time = datetime.now()
208 completed_process = subprocess.CompletedProcess(
210 returncode=proc.returncode,
216 completed_process=completed_process,
217 start_time=start_time,
219 run_exception=run_exception,
221 expanded_command=command,
223 cwd=cls.popen_kwargs[
"cwd"],
228 tmp_path = cls.popen_kwargs.
get(
"cwd", tmp_path)
230 print(
"Running the command: ", command)
238 @pytest.mark.do_not_collect_source
241 record_property: Callable[[str, str],
None],
242 fixture_result: FixtureResult,
243 reference_path: Optional[Path],
246 Record properties and handle any failures during fixture setup.
248 for key, value
in fixture_result.to_dict().items():
249 if value
is not None:
250 record_property(key, value)
252 record_property(
"reference_file",
str(reference_path))
254 if fixture_result.run_exception:
255 pytest.fail(f
"{fixture_result.run_exception}")
257 @pytest.mark.do_not_collect_source
260 Test that the return code matches the expected value.
262 assert returncode == self.returncode