diff --git a/pyproject.toml b/pyproject.toml index f0596be8608..73888f89566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,8 +36,7 @@ search-path = [ "python/wpt", ] project-includes = [ - "python/wpt/**/*.py", - "python/tidy/**/*.py", + "python/**/*.py", ] project-excludes = [ "**/venv/**", diff --git a/python/mach_bootstrap.py b/python/mach_bootstrap.py index 6b3cbe25d8b..12ce66eed8c 100644 --- a/python/mach_bootstrap.py +++ b/python/mach_bootstrap.py @@ -80,7 +80,7 @@ CATEGORIES = { } -def _process_exec(args, cwd): +def _process_exec(args: list[str], cwd) -> None: try: subprocess.check_output(args, stderr=subprocess.STDOUT, cwd=cwd) except subprocess.CalledProcessError as exception: @@ -89,7 +89,7 @@ def _process_exec(args, cwd): sys.exit(1) -def install_virtual_env_requirements(project_path: str, marker_path: str): +def install_virtual_env_requirements(project_path: str, marker_path: str) -> None: requirements_paths = [ os.path.join(project_path, "python", "requirements.txt"), os.path.join( @@ -127,7 +127,7 @@ def install_virtual_env_requirements(project_path: str, marker_path: str): marker_file.write(requirements_hash) -def _activate_virtualenv(topdir): +def _activate_virtualenv(topdir: str) -> None: virtualenv_path = os.path.join(topdir, ".venv") with open(".python-version", "r") as python_version_file: @@ -151,7 +151,7 @@ def _activate_virtualenv(topdir): warnings.filterwarnings("ignore", category=SyntaxWarning, module=r".*.venv") -def _ensure_case_insensitive_if_windows(): +def _ensure_case_insensitive_if_windows() -> None: # The folder is called 'python'. By deliberately checking for it with the wrong case, we determine if the file # system is case sensitive or not. if _is_windows() and not os.path.exists("Python"): @@ -160,11 +160,11 @@ def _ensure_case_insensitive_if_windows(): sys.exit(1) -def _is_windows(): +def _is_windows() -> bool: return sys.platform == "win32" -def bootstrap_command_only(topdir): +def bootstrap_command_only(topdir: str) -> int: # we should activate the venv before importing servo.boostrap # because the module requires non-standard python packages _activate_virtualenv(topdir) @@ -188,7 +188,7 @@ def bootstrap_command_only(topdir): return 0 -def bootstrap(topdir): +def bootstrap(topdir: str): _ensure_case_insensitive_if_windows() topdir = os.path.abspath(topdir) @@ -215,6 +215,7 @@ def bootstrap(topdir): import mach.main mach = mach.main.Mach(os.getcwd()) + # pyrefly: ignore[bad-assignment] mach.populate_context_handler = populate_context for category, meta in CATEGORIES.items(): diff --git a/python/servo/bootstrap_commands.py b/python/servo/bootstrap_commands.py index e0ae95761bf..41b72f13fb3 100644 --- a/python/servo/bootstrap_commands.py +++ b/python/servo/bootstrap_commands.py @@ -18,7 +18,7 @@ import subprocess import sys import tempfile import traceback -import urllib +import urllib.error import toml @@ -40,7 +40,7 @@ class MachCommands(CommandBase): @CommandArgument("--force", "-f", action="store_true", help="Boostrap without confirmation") @CommandArgument("--skip-platform", action="store_true", help="Skip platform bootstrapping.") @CommandArgument("--skip-lints", action="store_true", help="Skip tool necessary for linting.") - def bootstrap(self, force=False, skip_platform=False, skip_lints=False): + def bootstrap(self, force=False, skip_platform=False, skip_lints=False) -> int: # Note: This entry point isn't actually invoked by ./mach bootstrap. # ./mach bootstrap calls mach_bootstrap.bootstrap_command_only so that # it can install dependencies without needing mach's dependencies @@ -57,7 +57,7 @@ class MachCommands(CommandBase): category="bootstrap", ) @CommandArgument("--force", "-f", action="store_true", help="Boostrap without confirmation") - def bootstrap_gstreamer(self, force=False): + def bootstrap_gstreamer(self, force=False) -> int: try: servo.platform.get().bootstrap_gstreamer(force) except NotImplementedError as exception: @@ -66,7 +66,7 @@ class MachCommands(CommandBase): return 0 @Command("update-hsts-preload", description="Download the HSTS preload list", category="bootstrap") - def bootstrap_hsts_preload(self, force=False): + def bootstrap_hsts_preload(self, force=False) -> None: preload_filename = "hsts_preload.fstmap" preload_path = path.join(self.context.topdir, "resources") @@ -104,7 +104,7 @@ class MachCommands(CommandBase): description="Download the public domains list and update resources/public_domains.txt", category="bootstrap", ) - def bootstrap_pub_suffix(self, force=False): + def bootstrap_pub_suffix(self, force=False) -> None: list_url = "https://publicsuffix.org/list/public_suffix_list.dat" dst_filename = path.join(self.context.topdir, "resources", "public_domains.txt") not_implemented_case = re.compile(r"^[^*]+\*") @@ -122,12 +122,12 @@ class MachCommands(CommandBase): for suffix in suffixes: if not_implemented_case.match(suffix): print("Warning: the new list contains a case that servo can't handle: %s" % suffix) - fo.write(suffix.encode("idna") + "\n") + fo.write(suffix.encode("idna") + b"\n") @Command("clean-nightlies", description="Clean unused nightly builds of Rust and Cargo", category="bootstrap") @CommandArgument("--force", "-f", action="store_true", help="Actually remove stuff") @CommandArgument("--keep", default="1", help="Keep up to this many most recent nightlies") - def clean_nightlies(self, force=False, keep=None): + def clean_nightlies(self, force: bool, keep: str | int) -> None: print(f"Current Rust version for Servo: {self.rust_toolchain()}") old_toolchains = [] keep = int(keep) @@ -158,8 +158,8 @@ class MachCommands(CommandBase): @CommandArgument("--force", "-f", action="store_true", help="Actually remove stuff") @CommandArgument("--show-size", "-s", action="store_true", help="Show packages size") @CommandArgument("--keep", default="1", help="Keep up to this many most recent dependencies") - def clean_cargo_cache(self, force=False, show_size=False, keep=None): - def get_size(path): + def clean_cargo_cache(self, force: bool, show_size: bool, keep: str) -> None: + def get_size(path: str) -> float: if os.path.isfile(path): return os.path.getsize(path) / (1024 * 1024.0) total_size = 0 @@ -177,7 +177,7 @@ class MachCommands(CommandBase): import toml if os.environ.get("CARGO_HOME", ""): - cargo_dir = os.environ.get("CARGO_HOME") + cargo_dir = os.environ["CARGO_HOME"] else: home_dir = os.path.expanduser("~") cargo_dir = path.join(home_dir, ".cargo") @@ -265,7 +265,7 @@ class MachCommands(CommandBase): } packages["crates"][crate_name]["exist"].append(d) - total_size = 0 + total_size = 0.0 for packages_type in ["git", "crates"]: sorted_packages = sorted(packages[packages_type]) for crate_name in sorted_packages: @@ -309,7 +309,7 @@ class MachCommands(CommandBase): crate_paths.append(path.join(crates_cache_dir, "{}.crate".format(exist))) crate_paths.append(path.join(crates_src_dir, exist)) - size = sum(get_size(p) for p in crate_paths) if show_size else 0 + size = sum((get_size(p) for p in crate_paths), 0.0) if show_size else 0.0 total_size += size print_msg = (exist_name, " ({}MB)".format(round(size, 2)) if show_size else "", cargo_dir) if force: diff --git a/python/servo/build_commands.py b/python/servo/build_commands.py index 1f08caaefce..cf90776c4e9 100644 --- a/python/servo/build_commands.py +++ b/python/servo/build_commands.py @@ -9,6 +9,7 @@ import datetime import os +from os import PathLike import os.path as path import pathlib import shutil @@ -17,7 +18,7 @@ import subprocess import sys from time import time -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Union from mach.decorators import ( CommandArgument, @@ -71,7 +72,7 @@ def get_rustc_llvm_version() -> Optional[List[int]]: llvm_version = llvm_version.strip() version = llvm_version.split(".") print(f"Info: rustc is using LLVM version {'.'.join(version)}") - return version + return list(map(int, version)) else: print(f"Error: Couldn't find LLVM version in output of `rustc --version --verbose`: `{result.stdout}`") except Exception as e: @@ -101,7 +102,7 @@ class MachCommands(CommandBase): sanitizer: SanitizerKind = SanitizerKind.NONE, flavor=None, **kwargs, - ): + ) -> int: opts = params or [] if build_type.is_release(): @@ -178,7 +179,7 @@ class MachCommands(CommandBase): # On the Mac, set a lovely icon. This makes it easier to pick out the Servo binary in tools # like Instruments.app. try: - import Cocoa + import Cocoa # pyrefly: ignore[import-error] icon_path = path.join(self.get_top_dir(), "resources", "servo_1024.png") icon = Cocoa.NSImage.alloc().initWithContentsOfFile_(icon_path) @@ -192,14 +193,14 @@ class MachCommands(CommandBase): elapsed_delta = datetime.timedelta(seconds=int(elapsed)) build_message = f"{'Succeeded' if status == 0 else 'Failed'} in {elapsed_delta}" print(build_message) - + assert isinstance(status, int) return status @Command("clean", description="Clean the target/ and Python virtual environment directories", category="build") @CommandArgument("--manifest-path", default=None, help="Path to the manifest to the package to clean") @CommandArgument("--verbose", "-v", action="store_true", help="Print verbose output") @CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Cargo") - def clean(self, manifest_path=None, params=[], verbose=False): + def clean(self, manifest_path=None, params=[], verbose=False) -> None: self.ensure_bootstrapped() virtualenv_path = path.join(self.get_top_dir(), ".venv") @@ -214,8 +215,8 @@ class MachCommands(CommandBase): return check_call(["cargo", "clean"] + opts, env=self.build_env(), verbose=verbose) def build_sanitizer_env( - self, env: Dict, opts: List[str], kwargs, target_triple, sanitizer: SanitizerKind = SanitizerKind.NONE - ): + self, env: Dict, opts: List[str], kwargs, target_triple: str, sanitizer: SanitizerKind = SanitizerKind.NONE + ) -> None: if sanitizer.is_none(): return # do not use crown (clashes with different rust version) @@ -293,7 +294,7 @@ def copy_windows_dlls_to_build_directory(servo_binary: str, target: BuildTarget) # Copy in the built EGL and GLES libraries from where they were built to # the final build dirctory - def find_and_copy_built_dll(dll_name): + def find_and_copy_built_dll(dll_name: str) -> None: try: file_to_copy = next(pathlib.Path(build_path).rglob(dll_name)) shutil.copy(file_to_copy, servo_exe_dir) @@ -315,7 +316,7 @@ def copy_windows_dlls_to_build_directory(servo_binary: str, target: BuildTarget) return True -def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget): +def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget) -> bool: gst_root = servo.platform.get().gstreamer_root(target) if not gst_root: print("Could not find GStreamer installation directory.") @@ -354,8 +355,8 @@ def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget): return not missing -def package_msvc_dlls(servo_exe_dir: str, target: BuildTarget): - def copy_file(dll_path: Optional[str]) -> bool: +def package_msvc_dlls(servo_exe_dir: str, target: BuildTarget) -> bool: + def copy_file(dll_path: Union[PathLike[str], str]) -> bool: if not dll_path or not os.path.exists(dll_path): print(f"WARNING: Could not find DLL at {dll_path}", file=sys.stderr) return False diff --git a/python/servo/command_base.py b/python/servo/command_base.py index b844f1fbf70..522917abd84 100644 --- a/python/servo/command_base.py +++ b/python/servo/command_base.py @@ -15,20 +15,20 @@ import gzip import itertools import locale import os -import re import shutil import subprocess import sys import tarfile import urllib import zipfile +import urllib.error +import urllib.request from dataclasses import dataclass from enum import Enum -from errno import ENOENT as NO_SUCH_FILE_OR_DIRECTORY from glob import glob from os import path -from subprocess import PIPE -from typing import Any, Dict, List, Optional +from subprocess import PIPE, CompletedProcess +from typing import Any, Dict, List, Optional, Union, LiteralString, cast from xml.etree.ElementTree import XML import toml @@ -54,13 +54,13 @@ class BuildType: CUSTOM = 3 kind: Kind - profile: Optional[str] + profile: str def dev() -> BuildType: - return BuildType(BuildType.Kind.DEV, None) + return BuildType(BuildType.Kind.DEV, "debug") def release() -> BuildType: - return BuildType(BuildType.Kind.RELEASE, None) + return BuildType(BuildType.Kind.RELEASE, "release") def prod() -> BuildType: return BuildType(BuildType.Kind.CUSTOM, "production") @@ -93,7 +93,7 @@ class BuildType: @contextlib.contextmanager -def cd(new_path): +def cd(new_path: str): """Context manager for changing the current working directory""" previous_path = os.getcwd() try: @@ -104,7 +104,7 @@ def cd(new_path): @contextlib.contextmanager -def setlocale(name): +def setlocale(name: str): """Context manager for changing the current locale""" saved_locale = locale.setlocale(locale.LC_ALL) try: @@ -126,7 +126,7 @@ def find_dep_path_newest(package, bin_path): return None -def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None): +def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None) -> None: """Create a .tar.gz archive in a deterministic (reproducible) manner. See https://reproducible-builds.org/docs/archives/ for more details.""" @@ -177,27 +177,29 @@ def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None): os.rename(temp_file, dest_archive) -def call(*args, **kwargs): +def call(*args, **kwargs) -> int: + """Wrap `subprocess.call`, printing the command if verbose=True.""" + verbose = kwargs.pop("verbose", False) + if verbose: + print(" ".join(args[0])) + # we have to use shell=True in order to get PATH handling + # when looking for the binary on Windows\ + kwargs.setdefault("shell", sys.platform == "win32") + return subprocess.call(*args, **kwargs) + + +def check_output(*args, **kwargs) -> Union[str, bytes]: """Wrap `subprocess.call`, printing the command if verbose=True.""" verbose = kwargs.pop("verbose", False) if verbose: print(" ".join(args[0])) # we have to use shell=True in order to get PATH handling # when looking for the binary on Windows - return subprocess.call(*args, shell=sys.platform == "win32", **kwargs) + kwargs.setdefault("shell", sys.platform == "win32") + return subprocess.check_output(*args, **kwargs) -def check_output(*args, **kwargs) -> bytes: - """Wrap `subprocess.call`, printing the command if verbose=True.""" - verbose = kwargs.pop("verbose", False) - if verbose: - print(" ".join(args[0])) - # we have to use shell=True in order to get PATH handling - # when looking for the binary on Windows - return subprocess.check_output(*args, shell=sys.platform == "win32", **kwargs) - - -def check_call(*args, **kwargs): +def check_call(*args, **kwargs) -> None: """Wrap `subprocess.check_call`, printing the command if verbose=True. Also fix any unicode-containing `env`, for subprocess""" @@ -207,8 +209,9 @@ def check_call(*args, **kwargs): print(" ".join(args[0])) # we have to use shell=True in order to get PATH handling # when looking for the binary on Windows - proc = subprocess.Popen(*args, shell=sys.platform == "win32", **kwargs) - status = None + kwargs.setdefault("shell", sys.platform == "win32") + proc = subprocess.Popen(*args, **kwargs) + status: Optional[int] = None # Leave it to the subprocess to handle Ctrl+C. If it terminates as # a result of Ctrl+C, proc.wait() will return a status code, and, # we get out of the loop. If it doesn't, like e.g. gdb, we continue @@ -223,20 +226,20 @@ def check_call(*args, **kwargs): raise subprocess.CalledProcessError(status, " ".join(*args)) -def is_windows(): +def is_windows() -> bool: return sys.platform == "win32" -def is_macosx(): +def is_macosx() -> bool: return sys.platform == "darwin" -def is_linux(): +def is_linux() -> bool: return sys.platform.startswith("linux") class BuildNotFound(Exception): - def __init__(self, message): + def __init__(self, message) -> None: self.message = message def __str__(self): @@ -248,7 +251,9 @@ class CommandBase(object): This mostly handles configuration management, such as .servobuild.""" - def __init__(self, context): + target: BuildTarget + + def __init__(self, context) -> None: self.context = context self.enable_media = False self.features = [] @@ -257,13 +262,13 @@ class CommandBase(object): # by `configure_build_target` self.target = BuildTarget.from_triple(None) - def get_env_bool(var, default): + def get_env_bool(var: str, default: bool) -> bool: # Contents of env vars are strings by default. This returns the # boolean value of the specified environment variable, or the # speciried default if the var doesn't contain True or False - return {"True": True, "False": False}.get(os.environ.get(var), default) + return {"True": True, "False": False}.get(os.environ.get(var, ""), default) - def resolverelative(category, key): + def resolverelative(category: str, key: str) -> None: # Allow ~ self.config[category][key] = path.expanduser(self.config[category][key]) # Resolve relative paths @@ -327,7 +332,7 @@ class CommandBase(object): def get_top_dir(self): return self.context.topdir - def get_binary_path(self, build_type: BuildType, sanitizer: SanitizerKind = SanitizerKind.NONE): + def get_binary_path(self, build_type: BuildType, sanitizer: SanitizerKind = SanitizerKind.NONE) -> str: base_path = util.get_target_dir() if sanitizer.is_some() or self.target.is_cross_build(): base_path = path.join(base_path, self.target.triple()) @@ -339,7 +344,7 @@ class CommandBase(object): return binary_path - def detach_volume(self, mounted_volume): + def detach_volume(self, mounted_volume: str | bytes) -> None: print("Detaching volume {}".format(mounted_volume)) try: subprocess.check_call(["hdiutil", "detach", mounted_volume]) @@ -347,11 +352,11 @@ class CommandBase(object): print("Could not detach volume {} : {}".format(mounted_volume, e.returncode)) sys.exit(1) - def detach_volume_if_attached(self, mounted_volume): + def detach_volume_if_attached(self, mounted_volume: str | bytes) -> None: if os.path.exists(mounted_volume): self.detach_volume(mounted_volume) - def mount_dmg(self, dmg_path): + def mount_dmg(self, dmg_path) -> None: print("Mounting dmg {}".format(dmg_path)) try: subprocess.check_call(["hdiutil", "attach", dmg_path]) @@ -359,7 +364,7 @@ class CommandBase(object): print("Could not mount Servo dmg : {}".format(e.returncode)) sys.exit(1) - def extract_nightly(self, nightlies_folder, destination_folder, destination_file): + def extract_nightly(self, nightlies_folder: LiteralString, destination_folder: str, destination_file: str) -> None: print("Extracting to {} ...".format(destination_folder)) if is_macosx(): mounted_volume = path.join(path.sep, "Volumes", "Servo") @@ -382,14 +387,14 @@ class CommandBase(object): with tarfile.open(os.path.join(nightlies_folder, destination_file), "r") as tar: tar.extractall(destination_folder) - def get_executable(self, destination_folder): + def get_executable(self, destination_folder: str) -> str: if is_windows(): return path.join(destination_folder, "PFiles", "Mozilla research", "Servo Tech Demo") if is_linux: return path.join(destination_folder, "servo", "servo") return path.join(destination_folder, "servo") - def get_nightly_binary_path(self, nightly_date): + def get_nightly_binary_path(self, nightly_date) -> str | None: if nightly_date is None: return if not nightly_date: @@ -409,10 +414,18 @@ class CommandBase(object): response = urllib.request.urlopen(req).read() tree = XML(response) namespaces = {"ns": tree.tag[1 : tree.tag.index("}")]} + # pyrefly: ignore # missing-attribute file_to_download = tree.find("ns:Contents", namespaces).find("ns:Key", namespaces).text except urllib.error.URLError as e: print("Could not fetch the available nightly versions from the repository : {}".format(e.reason)) sys.exit(1) + except ValueError as e: + print( + "Could not fetch a nightly version for date {} and platform {} cause of {}".format( + nightly_date, os_prefix, e + ) + ) + sys.exit(1) except AttributeError: print("Could not fetch a nightly version for date {} and platform {}".format(nightly_date, os_prefix)) sys.exit(1) @@ -420,6 +433,7 @@ class CommandBase(object): nightly_target_directory = path.join(self.context.topdir, "target") # ':' is not an authorized character for a file name on Windows # make sure the OS specific separator is used + # pyrefly: ignore # missing-attribute target_file_path = file_to_download.replace(":", "-").split("/") destination_file = os.path.join(nightly_target_directory, os.path.join(*target_file_path)) # Once extracted, the nightly folder name is the tar name without the extension @@ -437,6 +451,7 @@ class CommandBase(object): print("The nightly file {} has already been downloaded.".format(destination_file)) else: print("The nightly {} does not exist yet, downloading it.".format(destination_file)) + # pyrefly: ignore # no-matching-overload download_file(destination_file, NIGHTLY_REPOSITORY_URL + file_to_download, destination_file) # Extract the downloaded nightly version @@ -447,10 +462,10 @@ class CommandBase(object): return self.get_executable(destination_folder) - def msvc_package_dir(self, package): + def msvc_package_dir(self, package) -> str: return servo.platform.windows.get_dependency_dir(package) - def build_env(self): + def build_env(self) -> dict[str, str]: """Return an extended environment dictionary.""" env = os.environ.copy() @@ -488,7 +503,7 @@ class CommandBase(object): if not (self.config["build"]["ccache"] == ""): env["CCACHE"] = self.config["build"]["ccache"] - env["CARGO_TARGET_DIR"] = servo.util.get_target_dir() + env["CARGO_TARGET_DIR"] = util.get_target_dir() # Work around https://github.com/servo/servo/issues/24446 # Argument-less str.split normalizes leading, trailing, and double spaces @@ -694,7 +709,7 @@ class CommandBase(object): return target_configuration_decorator - def configure_build_type(self, release: bool, dev: bool, prod: bool, profile: Optional[str]) -> BuildType: + def configure_build_type(self, release: bool, dev: bool, prod: bool, profile: str) -> BuildType: option_count = release + dev + prod + (profile is not None) if option_count > 1: @@ -726,7 +741,7 @@ class CommandBase(object): else: return BuildType.custom(profile) - def configure_build_target(self, kwargs: Dict[str, Any], suppress_log: bool = False): + def configure_build_target(self, kwargs: Dict[str, Any], suppress_log: bool = False) -> None: if hasattr(self.context, "target"): # This call is for a dispatched command and we've already configured # the target, so just use it. @@ -763,12 +778,6 @@ class CommandBase(object): if self.target.is_cross_build() and not suppress_log: print(f"Targeting '{self.target.triple()}' for cross-compilation") - def is_android(self): - return isinstance(self.target, AndroidTarget) - - def is_openharmony(self): - return isinstance(self.target, OpenHarmonyTarget) - def is_media_enabled(self, media_stack: Optional[str]): """Determine whether media is enabled based on the value of the build target platform and the value of the '--media-stack' command-line argument. @@ -804,8 +813,8 @@ class CommandBase(object): capture_output=False, target_override: Optional[str] = None, **_kwargs, - ): - env = env or self.build_env() + ) -> CompletedProcess[bytes] | int: + env = cast(dict[str, str], env or self.build_env()) # NB: On non-Linux platforms we cannot check whether GStreamer is installed until # environment variables are set via `self.build_env()`. @@ -879,21 +888,21 @@ class CommandBase(object): return call(["cargo", command] + args + cargo_args, env=env, verbose=verbose) - def android_adb_path(self, env): + def android_adb_path(self, env) -> LiteralString: if "ANDROID_SDK_ROOT" in env: sdk_adb = path.join(env["ANDROID_SDK_ROOT"], "platform-tools", "adb") if path.exists(sdk_adb): return sdk_adb return "adb" - def android_emulator_path(self, env): + def android_emulator_path(self, env) -> LiteralString: if "ANDROID_SDK_ROOT" in env: sdk_adb = path.join(env["ANDROID_SDK_ROOT"], "emulator", "emulator") if path.exists(sdk_adb): return sdk_adb return "emulator" - def ensure_bootstrapped(self): + def ensure_bootstrapped(self) -> None: if self.context.bootstrapped: return @@ -904,35 +913,13 @@ class CommandBase(object): if not self.target.is_cross_build(): return - installed_targets = check_output(["rustup", "target", "list", "--installed"], cwd=self.context.topdir).decode() + installed_targets = check_output(["rustup", "target", "list", "--installed"], cwd=self.context.topdir) + if isinstance(installed_targets, bytes): + installed_targets = installed_targets.decode("utf-8") if self.target.triple() not in installed_targets: check_call(["rustup", "target", "add", self.target.triple()], cwd=self.context.topdir) - def ensure_rustup_version(self): - try: - version_line = subprocess.check_output( - ["rustup" + servo.platform.get().executable_suffix(), "--version"], - # Silence "info: This is the version for the rustup toolchain manager, - # not the rustc compiler." - stderr=open(os.devnull, "wb"), - ) - except OSError as e: - if e.errno == NO_SUCH_FILE_OR_DIRECTORY: - print( - "It looks like rustup is not installed. See instructions at " - "https://github.com/servo/servo/#setting-up-your-environment" - ) - print() - sys.exit(1) - raise - version = tuple(map(int, re.match(rb"rustup (\d+)\.(\d+)\.(\d+)", version_line).groups())) - version_needed = (1, 23, 0) - if version < version_needed: - print("rustup is at version %s.%s.%s, Servo requires %s.%s.%s or more recent." % (version + version_needed)) - print("Try running 'rustup self update'.") - sys.exit(1) - - def ensure_clobbered(self, target_dir=None): + def ensure_clobbered(self, target_dir=None) -> None: if target_dir is None: target_dir = util.get_target_dir() auto = True if os.environ.get("AUTOCLOBBER", False) else False @@ -955,6 +942,6 @@ class CommandBase(object): Registrar.dispatch("clean", context=self.context, verbose=True) print("Successfully completed auto clobber.") except subprocess.CalledProcessError as error: - sys.exit(error) + sys.exit(error.returncode) else: print("Clobber not needed.") diff --git a/python/servo/devenv_commands.py b/python/servo/devenv_commands.py index 9f87de57da1..87ad307ccff 100644 --- a/python/servo/devenv_commands.py +++ b/python/servo/devenv_commands.py @@ -7,6 +7,7 @@ # option. This file may not be copied, modified, or distributed # except according to those terms. +from subprocess import CompletedProcess import json from mach.decorators import ( @@ -26,13 +27,14 @@ class MachCommands(CommandBase): "params", default=None, nargs="...", help="Command-line arguments to be passed through to cargo check" ) @CommandBase.common_command_arguments(build_configuration=True, build_type=False) - def check(self, params, **kwargs): + def check(self, params, **kwargs) -> int: if not params: params = [] self.ensure_bootstrapped() self.ensure_clobbered() status = self.run_cargo_build_like_command("check", params, **kwargs) + assert isinstance(status, int) if status == 0: print("Finished checking, binary NOT updated. Consider ./mach build before ./mach run") @@ -40,7 +42,7 @@ class MachCommands(CommandBase): @Command("rustc", description="Run the Rust compiler", category="devenv") @CommandArgument("params", default=None, nargs="...", help="Command-line arguments to be passed through to rustc") - def rustc(self, params): + def rustc(self, params) -> int: if params is None: params = [] @@ -52,13 +54,15 @@ class MachCommands(CommandBase): "params", default=None, nargs="...", help="Command-line arguments to be passed through to cargo-fix" ) @CommandBase.common_command_arguments(build_configuration=True, build_type=False) - def cargo_fix(self, params, **kwargs): + def cargo_fix(self, params, **kwargs) -> int: if not params: params = [] self.ensure_bootstrapped() self.ensure_clobbered() - return self.run_cargo_build_like_command("fix", params, **kwargs) + status = self.run_cargo_build_like_command("fix", params, **kwargs) + assert isinstance(status, int) + return status @Command("clippy", description='Run "cargo clippy"', category="devenv") @CommandArgument("params", default=None, nargs="...", help="Command-line arguments to be passed through to clippy") @@ -69,7 +73,7 @@ class MachCommands(CommandBase): help="Emit the clippy warnings in the Github Actions annotations format", ) @CommandBase.common_command_arguments(build_configuration=True, build_type=False) - def cargo_clippy(self, params, github_annotations=False, **kwargs): + def cargo_clippy(self, params, github_annotations=False, **kwargs) -> int: if not params: params = [] @@ -85,6 +89,7 @@ class MachCommands(CommandBase): github_annotation_manager = GitHubAnnotationManager("clippy") results = self.run_cargo_build_like_command("clippy", params, env=env, capture_output=True, **kwargs) + assert isinstance(results, CompletedProcess) if results.returncode == 0: return 0 try: @@ -94,9 +99,11 @@ class MachCommands(CommandBase): except json.JSONDecodeError: pass return results.returncode - return self.run_cargo_build_like_command("clippy", params, env=env, **kwargs) + status = self.run_cargo_build_like_command("clippy", params, env=env, **kwargs) + assert isinstance(status, int) + return status @Command("fetch", description="Fetch Rust, Cargo and Cargo dependencies", category="devenv") - def fetch(self): + def fetch(self) -> int: self.ensure_bootstrapped() return call(["cargo", "fetch"], env=self.build_env()) diff --git a/python/servo/gstreamer.py b/python/servo/gstreamer.py index 010b5ab3a6c..763f294298d 100644 --- a/python/servo/gstreamer.py +++ b/python/servo/gstreamer.py @@ -153,22 +153,22 @@ of using `dumpbin` and the errors that appear when starting Servo. """ -def windows_dlls(): +def windows_dlls() -> list[str]: return GSTREAMER_WIN_DEPENDENCY_LIBS + [f"{lib}-1.0-0.dll" for lib in GSTREAMER_BASE_LIBS] -def windows_plugins(): +def windows_plugins() -> list[str]: libs = [*GSTREAMER_PLUGIN_LIBS, *GSTREAMER_WIN_PLUGIN_LIBS] return [f"{lib}.dll" for lib in libs] -def macos_plugins(): +def macos_plugins() -> list[str]: plugins = [*GSTREAMER_PLUGIN_LIBS, *GSTREAMER_MAC_PLUGIN_LIBS] return [f"lib{plugin}.dylib" for plugin in plugins] -def write_plugin_list(target): +def write_plugin_list(target: str) -> None: plugins = [] if "apple-" in target: plugins = macos_plugins() @@ -191,7 +191,7 @@ def is_macos_system_library(library_path: str) -> bool: return library_path.startswith("/System/Library") or library_path.startswith("/usr/lib") or ".asan." in library_path -def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str], relative_path: str): +def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str], relative_path: str) -> None: """Given a path to a binary (either an executable or a dylib), rewrite the the given dependency lines to be found at the given relative path to the executable in which they are used. In our case, this is typically servoshell.""" @@ -207,7 +207,7 @@ def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str], print(f"{arguments} install_name_tool exited with return value {exception.returncode}") -def make_rpath_path_absolute(dylib_path_from_otool: str, rpath: str): +def make_rpath_path_absolute(dylib_path_from_otool: str, rpath: str) -> str: """Given a dylib dependency from otool, resolve the path into a full path if it contains `@rpath`.""" if not dylib_path_from_otool.startswith("@rpath/"): @@ -228,6 +228,7 @@ def find_non_system_dependencies_with_otool(binary_path: str) -> Set[str]: """Given a binary path, find all dylib dependency lines that do not refer to system libraries.""" process = subprocess.Popen(["/usr/bin/otool", "-L", binary_path], stdout=subprocess.PIPE) + assert process.stdout is not None output = set() for line in map(lambda line: line.decode("utf8"), process.stdout): @@ -242,15 +243,17 @@ def find_non_system_dependencies_with_otool(binary_path: str) -> Set[str]: return output -def package_gstreamer_dylibs(binary_path: str, library_target_directory: str, target: BuildTarget): +def package_gstreamer_dylibs(binary_path: str, library_target_directory: str, target: BuildTarget) -> bool: """Copy all GStreamer dependencies to the "lib" subdirectory of a built version of Servo. Also update any transitive shared library paths so that they are relative to this subdirectory.""" # This import only works when called from `mach`. import servo.platform + import servo.platform.macos gstreamer_root = servo.platform.get().gstreamer_root(target) + assert gstreamer_root is not None gstreamer_version = servo.platform.macos.GSTREAMER_PLUGIN_VERSION gstreamer_root_libs = os.path.join(gstreamer_root, "lib") diff --git a/python/servo/mutation/init.py b/python/servo/mutation/init.py index be618920613..f1a2c5ee077 100644 --- a/python/servo/mutation/init.py +++ b/python/servo/mutation/init.py @@ -25,9 +25,10 @@ def get_folders_list(path): folder_name = join(path, filename) folder_list.append(folder_name) return folder_list + return folder_list -def mutation_test_for(mutation_path): +def mutation_test_for(mutation_path: str) -> None: test_mapping_file = join(mutation_path, "test_mapping.json") if isfile(test_mapping_file): json_data = open(test_mapping_file).read() diff --git a/python/servo/mutation/mutator.py b/python/servo/mutation/mutator.py index 3ae0083f75a..aa61ef677ea 100644 --- a/python/servo/mutation/mutator.py +++ b/python/servo/mutation/mutator.py @@ -9,10 +9,12 @@ import fileinput import re +from re import Match import random +from typing import Iterator -def is_comment(line): +def is_comment(line: str) -> Match[str] | None: return re.search(r"\/\/.*", line) @@ -25,14 +27,14 @@ def init_variables(if_blocks): return random_index, start_counter, end_counter, lines_to_delete, line_to_mutate -def deleteStatements(file_name, line_numbers): +def deleteStatements(file_name, line_numbers) -> None: for line in fileinput.input(file_name, inplace=True): if fileinput.lineno() not in line_numbers: print(line.rstrip()) class Strategy: - def __init__(self): + def __init__(self) -> None: self._strategy_name = "" self._replace_strategy = {} @@ -53,28 +55,28 @@ class Strategy: class AndOr(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) logical_and = r"(?<=\s)&&(?=\s)" self._replace_strategy = {"regex": logical_and, "replaceString": "||"} class IfTrue(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) if_condition = r"(?<=if\s)\s*(?!let\s)(.*)(?=\s\{)" self._replace_strategy = {"regex": if_condition, "replaceString": "true"} class IfFalse(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) if_condition = r"(?<=if\s)\s*(?!let\s)(.*)(?=\s\{)" self._replace_strategy = {"regex": if_condition, "replaceString": "false"} class ModifyComparision(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) less_than_equals = r"(?<=\s)(\<)\=(?=\s)" greater_than_equals = r"(?<=\s)(\<)\=(?=\s)" @@ -82,7 +84,7 @@ class ModifyComparision(Strategy): class MinusToPlus(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) arithmetic_minus = r"(?<=\s)\-(?=\s.+)" minus_in_shorthand = r"(?<=\s)\-(?=\=)" @@ -90,7 +92,7 @@ class MinusToPlus(Strategy): class PlusToMinus(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) arithmetic_plus = r"(?<=[^\"]\s)\+(?=\s[^A-Z\'?\":\{]+)" plus_in_shorthand = r"(?<=\s)\+(?=\=)" @@ -98,14 +100,14 @@ class PlusToMinus(Strategy): class AtomicString(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) string_literal = r"(?<=\").+(?=\")" self._replace_strategy = {"regex": string_literal, "replaceString": " "} class DuplicateLine(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) self._strategy_name = "duplicate" append_statement = r".+?append\(.+?\).*?;" @@ -133,7 +135,7 @@ class DuplicateLine(Strategy): class DeleteIfBlock(Strategy): - def __init__(self): + def __init__(self) -> None: Strategy.__init__(self) self.if_block = r"^\s+if\s(.+)\s\{" self.else_block = r"\selse(.+)\{" @@ -175,7 +177,7 @@ class DeleteIfBlock(Strategy): line_to_mutate += 1 -def get_strategies(): +def get_strategies() -> Iterator[Strategy]: return ( AndOr, IfTrue, @@ -190,7 +192,7 @@ def get_strategies(): class Mutator: - def __init__(self, strategy): + def __init__(self, strategy) -> None: self._strategy = strategy def mutate(self, file_name): diff --git a/python/servo/package_commands.py b/python/servo/package_commands.py index d769e29e5dd..9921877c6eb 100644 --- a/python/servo/package_commands.py +++ b/python/servo/package_commands.py @@ -42,6 +42,7 @@ from servo.command_base import ( from servo.util import delete, get_target_dir from python.servo.platform.build_target import SanitizerKind +from servo.platform.build_target import is_android, is_openharmony PACKAGES = { "android": [ @@ -74,11 +75,11 @@ def packages_for_platform(platform): yield path.join(target_dir, package) -def listfiles(directory): +def listfiles(directory) -> list[str]: return [f for f in os.listdir(directory) if path.isfile(path.join(directory, f))] -def copy_windows_dependencies(binary_path, destination): +def copy_windows_dependencies(binary_path: str, destination: str) -> None: for f in os.listdir(binary_path): if os.path.isfile(path.join(binary_path, f)) and f.endswith(".dll"): shutil.copy(path.join(binary_path, f), destination) @@ -110,12 +111,12 @@ class PackageCommands(CommandBase): @CommandArgument("--target", "-t", default=None, help="Package for given target platform") @CommandBase.common_command_arguments(build_configuration=False, build_type=True, package_configuration=True) @CommandBase.allow_target_configuration - def package(self, build_type: BuildType, flavor=None, sanitizer: SanitizerKind = SanitizerKind.NONE): + def package(self, build_type: BuildType, flavor=None, sanitizer: SanitizerKind = SanitizerKind.NONE) -> int | None: env = self.build_env() binary_path = self.get_binary_path(build_type, sanitizer=sanitizer) dir_to_root = self.get_top_dir() target_dir = path.dirname(binary_path) - if self.is_android(): + if is_android(self.target): target_triple = self.target.triple() if "aarch64" in target_triple: arch_string = "Arm64" @@ -155,7 +156,7 @@ class PackageCommands(CommandBase): except subprocess.CalledProcessError as e: print("Packaging Android exited with return value %d" % e.returncode) return e.returncode - elif self.is_openharmony(): + elif is_openharmony(self.target): # hvigor doesn't support an option to place output files in a specific directory # so copy the source files into the target/openharmony directory first. ohos_app_dir = path.join(self.get_top_dir(), "support", "openharmony") @@ -197,7 +198,7 @@ class PackageCommands(CommandBase): try: with cd(ohos_target_dir): version = check_output(["hvigorw", "--version", "--no-daemon"]) - print(f"Found `hvigorw` with version {str(version, 'utf-8').strip()} in system PATH") + print(f"Found `hvigorw` with version {version.strip()} in system PATH") hvigor_command[0:0] = ["hvigorw"] except FileNotFoundError: print( @@ -216,7 +217,6 @@ class PackageCommands(CommandBase): env["NODE_PATH"] = env["HVIGOR_PATH"] + "/node_modules" hvigor_script = f"{env['HVIGOR_PATH']}/node_modules/@ohos/hvigor/bin/hvigor.js" hvigor_command[0:0] = ["node", hvigor_script] - abi_string = self.target.abi_string() ohos_libs_dir = path.join(ohos_target_dir, "entry", "libs", abi_string) os.makedirs(ohos_libs_dir) @@ -402,7 +402,7 @@ class PackageCommands(CommandBase): usb=False, sanitizer: SanitizerKind = SanitizerKind.NONE, flavor=None, - ): + ) -> int: env = self.build_env() try: binary_path = self.get_binary_path(build_type, sanitizer=sanitizer) @@ -417,7 +417,7 @@ class PackageCommands(CommandBase): print("Rebuilding Servo did not solve the missing build problem.") return 1 - if self.is_android(): + if is_android(self.target): pkg_path = self.target.get_package_path(build_type.directory_name()) exec_command = [self.android_adb_path(env)] if emulator and usb: @@ -428,7 +428,7 @@ class PackageCommands(CommandBase): if usb: exec_command += ["-d"] exec_command += ["install", "-r", pkg_path] - elif self.is_openharmony(): + elif is_openharmony(self.target): pkg_path = self.target.get_package_path(build_type.directory_name(), flavor=flavor) hdc_path = path.join(env["OHOS_SDK_NATIVE"], "../", "toolchains", "hdc") exec_command = [hdc_path, "install", "-r", pkg_path] @@ -453,7 +453,7 @@ class PackageCommands(CommandBase): @CommandArgument( "--github-release-id", default=None, type=int, help="The github release to upload the nightly builds." ) - def upload_nightly(self, platform, secret_from_environment, github_release_id): + def upload_nightly(self, platform, secret_from_environment, github_release_id) -> int: import boto3 def get_s3_secret(): @@ -465,13 +465,13 @@ class PackageCommands(CommandBase): aws_secret_access_key = secret["aws_secret_access_key"] return (aws_access_key, aws_secret_access_key) - def nightly_filename(package, timestamp): + def nightly_filename(package, timestamp) -> str: return "{}-{}".format( timestamp.isoformat() + "Z", # The `Z` denotes UTC path.basename(package), ) - def upload_to_github_release(platform, package, package_hash): + def upload_to_github_release(platform, package: str, package_hash: str) -> None: if not github_release_id: return @@ -483,11 +483,12 @@ class PackageCommands(CommandBase): asset_name = f"servo-latest.{extension}" release.upload_asset(package, name=asset_name) + # pyrefly: ignore[missing-attribute] release.upload_asset_from_memory( package_hash_fileobj, package_hash_fileobj.getbuffer().nbytes, name=f"{asset_name}.sha256" ) - def upload_to_s3(platform, package, package_hash, timestamp): + def upload_to_s3(platform, package: str, package_hash: str, timestamp: datetime) -> None: (aws_access_key, aws_secret_access_key) = get_s3_secret() s3 = boto3.client("s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_access_key) diff --git a/python/servo/platform/__init__.py b/python/servo/platform/__init__.py index f47a3dd561c..e996ac2ccdd 100644 --- a/python/servo/platform/__init__.py +++ b/python/servo/platform/__init__.py @@ -14,7 +14,7 @@ from .windows import Windows __platform__ = None -def host_platform(): +def host_platform() -> str: os_type = platform.system().lower() if os_type == "linux": os_type = "unknown-linux-gnu" @@ -31,7 +31,7 @@ def host_platform(): return os_type -def host_triple(): +def host_triple() -> str: os_type = host_platform() cpu_type = platform.machine().lower() if cpu_type in ["i386", "i486", "i686", "i768", "x86"]: diff --git a/python/servo/platform/base.py b/python/servo/platform/base.py index 3ba59c955bd..e1569868d82 100644 --- a/python/servo/platform/base.py +++ b/python/servo/platform/base.py @@ -16,7 +16,7 @@ from .build_target import BuildTarget class Base: - def __init__(self, triple: str): + def __init__(self, triple: str) -> None: self.environ = os.environ.copy() self.triple = triple self.is_windows = False @@ -29,10 +29,10 @@ class Base: def executable_suffix(self) -> str: return "" - def _platform_bootstrap(self, _force: bool) -> bool: + def _platform_bootstrap(self, force: bool) -> bool: raise NotImplementedError("Bootstrap installation detection not yet available.") - def _platform_bootstrap_gstreamer(self, _target: BuildTarget, _force: bool) -> bool: + def _platform_bootstrap_gstreamer(self, target: BuildTarget, force: bool) -> bool: raise NotImplementedError("GStreamer bootstrap support is not yet available for your OS.") def is_gstreamer_installed(self, target: BuildTarget) -> bool: @@ -54,7 +54,7 @@ class Base: except FileNotFoundError: return False - def bootstrap(self, force: bool, skip_platform: bool, skip_lints: bool): + def bootstrap(self, force: bool, skip_platform: bool, skip_lints: bool) -> None: installed_something = False if not skip_platform: installed_something |= self._platform_bootstrap(force) @@ -67,7 +67,7 @@ class Base: if not installed_something: print("Dependencies were already installed!") - def install_rust_toolchain(self): + def install_rust_toolchain(self) -> None: # rustup 1.28.0, and rustup 1.28.1+ with RUSTUP_AUTO_INSTALL=0, require us to explicitly # install the Rust toolchain before trying to use it. print(" * Installing Rust toolchain...") @@ -86,7 +86,7 @@ class Base: return True def install_cargo_deny(self, force: bool) -> bool: - def cargo_deny_installed(): + def cargo_deny_installed() -> bool: if force or not shutil.which("cargo-deny"): return False # Tidy needs at least version 0.18.1 installed. @@ -115,7 +115,7 @@ class Base: as fast as possible.""" return False - def bootstrap_gstreamer(self, force: bool): + def bootstrap_gstreamer(self, force: bool) -> None: target = BuildTarget.from_triple(self.triple) if not self._platform_bootstrap_gstreamer(target, force): root = self.gstreamer_root(target) diff --git a/python/servo/platform/build_target.py b/python/servo/platform/build_target.py index 9c21363d12b..0acc02e5acc 100644 --- a/python/servo/platform/build_target.py +++ b/python/servo/platform/build_target.py @@ -7,6 +7,7 @@ # option. This file may not be copied, modified, or distributed # except according to those terms. +from typing import TypeGuard import errno import json import os @@ -48,7 +49,7 @@ class SanitizerKind(Enum): class BuildTarget(object): - def __init__(self, target_triple: str): + def __init__(self, target_triple: str) -> None: self.target_triple = target_triple @staticmethod @@ -69,7 +70,7 @@ class BuildTarget(object): def binary_name(self) -> str: return f"servo{servo.platform.get().executable_suffix()}" - def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path): + def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None: pass def is_cross_build(self) -> bool: @@ -124,7 +125,7 @@ class AndroidTarget(CrossBuildTarget): return config - def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path): + def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None: # Paths to Android build tools: if config["android"]["sdk"]: env["ANDROID_SDK_ROOT"] = config["android"]["sdk"] @@ -201,7 +202,7 @@ class AndroidTarget(CrossBuildTarget): llvm_toolchain = path.join(llvm_prebuilt, host) env["PATH"] = env["PATH"] + ":" + path.join(llvm_toolchain, "bin") - def to_ndk_bin(prog): + def to_ndk_bin(prog: str) -> str: return path.join(llvm_toolchain, "bin", prog) # This workaround is due to an issue in the x86_64 Android NDK that introduces @@ -217,6 +218,9 @@ class AndroidTarget(CrossBuildTarget): env["RUSTFLAGS"] = env.get("RUSTFLAGS", "") env["RUSTFLAGS"] += f"-C link-arg={libclangrt_filename}" + assert host_cc + assert host_cxx + env["RUST_TARGET"] = self.triple() env["HOST_CC"] = host_cc env["HOST_CXX"] = host_cxx @@ -289,7 +293,7 @@ class AndroidTarget(CrossBuildTarget): class OpenHarmonyTarget(CrossBuildTarget): DEFAULT_TRIPLE = "aarch64-unknown-linux-ohos" - def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path): + def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None: # Paths to OpenHarmony SDK and build tools: # Note: `OHOS_SDK_NATIVE` is the CMake variable name the `hvigor` build-system # uses for the native directory of the SDK, so we use the same name to be consistent. @@ -343,7 +347,7 @@ class OpenHarmonyTarget(CrossBuildTarget): # on windows, depending on how the wrapper is called. # Instead, we ensure that all the necessary flags for the c-compiler are set # via environment variables such as `TARGET_CFLAGS`. - def to_sdk_llvm_bin(prog: str): + def to_sdk_llvm_bin(prog: str) -> str: if sys.platform == "win32": prog = prog + ".exe" llvm_prog = llvm_bin.joinpath(prog) @@ -498,3 +502,11 @@ class OpenHarmonyTarget(CrossBuildTarget): def abi_string(self) -> str: abi_map = {"aarch64-unknown-linux-ohos": "arm64-v8a", "x86_64-unknown-linux-ohos": "x86_64"} return abi_map[self.triple()] + + +def is_android(target: BuildTarget) -> TypeGuard[AndroidTarget]: + return isinstance(target, AndroidTarget) + + +def is_openharmony(target: BuildTarget) -> TypeGuard[OpenHarmonyTarget]: + return isinstance(target, OpenHarmonyTarget) diff --git a/python/servo/platform/linux.py b/python/servo/platform/linux.py index 1261d98ed3d..da5023066f7 100644 --- a/python/servo/platform/linux.py +++ b/python/servo/platform/linux.py @@ -164,7 +164,7 @@ GSTREAMER_URL = ( class Linux(Base): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.is_linux = True (self.distro, self.version) = Linux.get_distro_and_version() @@ -265,16 +265,16 @@ class Linux(Base): install = True elif self.distro in ["CentOS", "CentOS Linux", "Fedora", "Fedora Linux", "Fedora Linux Asahi Remix"]: command = ["dnf", "install"] - installed_pkgs: [str] = subprocess.check_output( + installed_pkgs: list[str] = subprocess.check_output( ["rpm", "--query", "--all", "--queryformat", "%{NAME}\n"], encoding="utf-8" - ).split("\n") + ).splitlines() pkgs = DNF_PKGS for pkg in pkgs: if pkg not in installed_pkgs: install = True break elif self.distro == "void": - installed_pkgs = str(subprocess.check_output(["xbps-query", "-l"])) + installed_pkgs = subprocess.check_output(["xbps-query", "-l"], text=True).splitlines() pkgs = XBPS_PKGS for pkg in pkgs: command = ["xbps-install", "-A"] @@ -285,13 +285,13 @@ class Linux(Base): if not install: return False - def check_sudo(): + def check_sudo() -> bool: if os.geteuid() != 0: if shutil.which("sudo") is None: return False return True - def run_as_root(command, force=False): + def run_as_root(command: list[str], force: bool = False) -> int: if os.geteuid() != 0: command.insert(0, "sudo") if force: @@ -312,10 +312,10 @@ class Linux(Base): raise EnvironmentError("Installation of dependencies failed.") return True - def gstreamer_root(self, _target: BuildTarget) -> Optional[str]: + def gstreamer_root(self, target: BuildTarget) -> Optional[str]: return None - def _platform_bootstrap_gstreamer(self, _target: BuildTarget, _force: bool) -> bool: + def _platform_bootstrap_gstreamer(self, target: BuildTarget, force: bool) -> bool: raise EnvironmentError( "Bootstrapping GStreamer on Linux is not supported. " + "Please install it using your distribution package manager." diff --git a/python/servo/platform/macos.py b/python/servo/platform/macos.py index 204d07c486f..89a95b985b5 100644 --- a/python/servo/platform/macos.py +++ b/python/servo/platform/macos.py @@ -24,7 +24,7 @@ GSTREAMER_ROOT = "/Library/Frameworks/GStreamer.framework/Versions/1.0" class MacOS(Base): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.is_macos = True @@ -38,7 +38,7 @@ class MacOS(Base): # Servo only supports the official GStreamer distribution on MacOS. return not target.is_cross_build() and os.path.exists(GSTREAMER_ROOT) - def _platform_bootstrap(self, _force: bool) -> bool: + def _platform_bootstrap(self, force: bool) -> bool: installed_something = False try: brewfile = os.path.join(util.SERVO_ROOT, "support", "macos", "Brewfile") diff --git a/python/servo/platform/windows.py b/python/servo/platform/windows.py index d1c35871d26..15b7489dd4e 100644 --- a/python/servo/platform/windows.py +++ b/python/servo/platform/windows.py @@ -11,7 +11,7 @@ import os import subprocess import tempfile from typing import Optional -import urllib +import urllib.parse import zipfile import shutil @@ -32,12 +32,12 @@ DEPENDENCIES_DIR = os.path.join(util.get_target_dir(), "dependencies") WINGET_DEPENDENCIES = ["Kitware.CMake", "LLVM.LLVM", "Ninja-build.Ninja", "WiXToolset.WiXToolset"] -def get_dependency_dir(package): +def get_dependency_dir(package: str) -> str: """Get the directory that a given Windows dependency should extract to.""" return os.path.join(DEPENDENCIES_DIR, package, DEPENDENCIES[package]) -def _winget_import(force: bool = False): +def _winget_import(force: bool = False) -> None: try: # We install tools like LLVM / CMake, so we probably don't want to force-upgrade # a user installed version without good reason. @@ -56,7 +56,7 @@ def _winget_import(force: bool = False): raise e -def _choco_install(force: bool = False): +def _choco_install(force: bool = False) -> None: try: choco_config = os.path.join(util.SERVO_ROOT, "support", "windows", "chocolatey.config") @@ -75,15 +75,15 @@ def _choco_install(force: bool = False): class Windows(Base): - def __init__(self, triple: str): + def __init__(self, triple: str) -> None: super().__init__(triple) self.is_windows = True - def executable_suffix(self): + def executable_suffix(self) -> str: return ".exe" @classmethod - def download_and_extract_dependency(cls, zip_path: str, full_spec: str): + def download_and_extract_dependency(cls, zip_path: str, full_spec: str) -> None: if not os.path.isfile(zip_path): zip_url = f"{DEPS_URL}/{urllib.parse.quote(full_spec)}.zip" util.download_file(full_spec, zip_url, zip_path) diff --git a/python/servo/post_build_commands.py b/python/servo/post_build_commands.py index 85bd358922f..40bb09038ab 100644 --- a/python/servo/post_build_commands.py +++ b/python/servo/post_build_commands.py @@ -11,6 +11,7 @@ import json import os import os.path as path import subprocess +from subprocess import CompletedProcess from shutil import copy2 from typing import List @@ -30,12 +31,12 @@ from servo.command_base import ( check_call, is_linux, ) - +from servo.platform.build_target import is_android ANDROID_APP_NAME = "org.servo.servoshell" -def read_file(filename, if_exists=False): +def read_file(filename, if_exists=False) -> str | None: if if_exists and not path.exists(filename): return None with open(filename) as f: @@ -43,7 +44,7 @@ def read_file(filename, if_exists=False): # Copied from Python 3.3+'s shlex.quote() -def shell_quote(arg): +def shell_quote(arg: str): # use single quotes, and put single quotes into double quotes # the string $'b is then quoted as '$'"'"'b' return "'" + arg.replace("'", "'\"'\"'") + "'" @@ -81,7 +82,7 @@ class PostBuildCommands(CommandBase): software=False, emulator=False, usb=False, - ): + ) -> int | None: return self._run(servo_binary, params, debugger, debugger_cmd, headless, software, emulator, usb) def _run( @@ -94,7 +95,7 @@ class PostBuildCommands(CommandBase): software=False, emulator=False, usb=False, - ): + ) -> int | None: env = self.build_env() env["RUST_BACKTRACE"] = "1" if software: @@ -109,7 +110,7 @@ class PostBuildCommands(CommandBase): if debugger_cmd: debugger = True - if self.is_android(): + if is_android(self.target): if debugger: print("Android on-device debugging is not supported by mach yet. See") print("https://github.com/servo/servo/wiki/Building-for-Android#debugging-on-device") @@ -141,7 +142,7 @@ class PostBuildCommands(CommandBase): if usb: args += ["-d"] shell = subprocess.Popen(args + ["shell"], stdin=subprocess.PIPE) - shell.communicate(bytes("\n".join(script) + "\n", "utf8")) + shell.communicate("\n".join(script) + "\n") return shell.wait() args = [servo_binary] @@ -193,8 +194,9 @@ class PostBuildCommands(CommandBase): @Command("android-emulator", description="Run the Android emulator", category="post-build") @CommandArgument("args", nargs="...", help="Command-line arguments to be passed through to the emulator") - def android_emulator(self, args=None): + def android_emulator(self, args=None) -> int: if not args: + args = [] print("AVDs created by `./mach bootstrap-android` are servo-arm and servo-x86.") emulator = self.android_emulator_path(self.build_env()) return subprocess.call([emulator] + args) @@ -202,7 +204,7 @@ class PostBuildCommands(CommandBase): @Command("rr-record", description="Run Servo whilst recording execution with rr", category="post-build") @CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Servo") @CommandBase.common_command_arguments(binary_selection=True) - def rr_record(self, servo_binary: str, params=[]): + def rr_record(self, servo_binary: str, params=[]) -> None: env = self.build_env() env["RUST_BACKTRACE"] = "1" @@ -221,7 +223,7 @@ class PostBuildCommands(CommandBase): description="Replay the most recent execution of Servo that was recorded with rr", category="post-build", ) - def rr_replay(self): + def rr_replay(self) -> None: try: check_call(["rr", "--fatal-errors", "replay"]) except OSError as e: @@ -233,7 +235,7 @@ class PostBuildCommands(CommandBase): @Command("doc", description="Generate documentation", category="post-build") @CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to cargo doc") @CommandBase.common_command_arguments(build_configuration=True, build_type=False) - def doc(self, params: List[str], **kwargs): + def doc(self, params: List[str], **kwargs) -> CompletedProcess[bytes] | int | None: self.ensure_bootstrapped() docs = path.join(servo.util.get_target_dir(), "doc") diff --git a/python/servo/testing_commands.py b/python/servo/testing_commands.py index f2002499e96..1fa0da5ff87 100644 --- a/python/servo/testing_commands.py +++ b/python/servo/testing_commands.py @@ -8,6 +8,7 @@ # except according to those terms. import argparse +from argparse import ArgumentParser import json import logging import os @@ -19,6 +20,7 @@ import subprocess import sys import textwrap from time import sleep +from typing import Any import tidy import wpt @@ -116,7 +118,7 @@ class MachCommands(CommandBase): DEFAULT_RENDER_MODE = "cpu" HELP_RENDER_MODE = "Value can be 'cpu', 'gpu' or 'both' (default " + DEFAULT_RENDER_MODE + ")" - def __init__(self, context): + def __init__(self, context) -> None: CommandBase.__init__(self, context) if not hasattr(self.context, "built_tests"): self.context.built_tests = False @@ -125,7 +127,7 @@ class MachCommands(CommandBase): @CommandArgument("--base", default=None, help="the base URL for testcases") @CommandArgument("--date", default=None, help="the datestamp for the data") @CommandArgument("--submit", "-a", default=False, action="store_true", help="submit the data to perfherder") - def test_perf(self, base=None, date=None, submit=False): + def test_perf(self, base=None, date=None, submit=False) -> int: env = self.build_env() cmd = ["bash", "test_perf.sh"] if base: @@ -144,7 +146,9 @@ class MachCommands(CommandBase): "--nocapture", default=False, action="store_true", help="Run tests with nocapture ( show test stdout )" ) @CommandBase.common_command_arguments(build_configuration=True, build_type=True) - def test_unit(self, build_type: BuildType, test_name=None, package=None, bench=False, nocapture=False, **kwargs): + def test_unit( + self, build_type: BuildType, test_name=None, package=None, bench=False, nocapture=False, **kwargs + ) -> int: if test_name is None: test_name = [] @@ -215,7 +219,7 @@ class MachCommands(CommandBase): return 0 # Gather Cargo build timings (https://doc.rust-lang.org/cargo/reference/timings.html). - args = ["--timings"] + args: list[str] = ["--timings"] if build_type.is_release(): args += ["--release"] @@ -237,10 +241,12 @@ class MachCommands(CommandBase): result = call(["cargo", "bench" if bench else "test"], cwd="support/crown") if result != 0: return result - return self.run_cargo_build_like_command("bench" if bench else "test", args, env=env, **kwargs) + result = self.run_cargo_build_like_command("bench" if bench else "test", args, env=env, **kwargs) + assert isinstance(result, int) + return result @Command("test-content", description="Run the content tests", category="testing") - def test_content(self): + def test_content(self) -> int: print("Content tests have been replaced by web-platform-tests under tests/wpt/mozilla/.") return 0 @@ -259,7 +265,7 @@ class MachCommands(CommandBase): action="store_true", help="Emit tidy warnings in the Github Actions annotations format", ) - def test_tidy(self, all_files, no_progress, github_annotations): + def test_tidy(self, all_files, no_progress, github_annotations) -> int: tidy_failed = tidy.scan(not all_files, not no_progress, github_annotations) print("\r ➤ Checking formatting of Rust files...") @@ -293,7 +299,7 @@ class MachCommands(CommandBase): @CommandArgument( "tests", default=None, nargs="...", help="Specific WebIDL tests to run, relative to the tests directory" ) - def test_scripts(self, verbose, very_verbose, all, tests): + def test_scripts(self, verbose, very_verbose, all, tests) -> int: if very_verbose: logging.getLogger().level = logging.DEBUG elif verbose: @@ -342,7 +348,7 @@ class MachCommands(CommandBase): # For the `import WebIDL` in runtests.py sys.path.insert(0, test_file_dir) run_file = path.abspath(path.join(test_file_dir, "runtests.py")) - run_globals = {"__file__": run_file} + run_globals: dict[str, Any] = {"__file__": run_file} exec(compile(open(run_file).read(), run_file, "exec"), run_globals) passed = run_globals["run_tests"](tests, verbose or very_verbose) and passed @@ -350,7 +356,7 @@ class MachCommands(CommandBase): @Command("test-devtools", description="Run tests for devtools.", category="testing") @CommandBase.common_command_arguments(build_type=True) - def test_devtools(self, build_type: BuildType, **kwargs): + def test_devtools(self, build_type: BuildType, **kwargs) -> int: print("Running devtools tests...") passed = servo.devtools_tests.run_tests(SCRIPT_PATH, build_type) return 0 if passed else 1 @@ -362,7 +368,7 @@ class MachCommands(CommandBase): parser=wpt.create_parser, ) @CommandBase.common_command_arguments(build_configuration=False, build_type=True) - def test_wpt_failure(self, build_type: BuildType, **kwargs): + def test_wpt_failure(self, build_type: BuildType, **kwargs) -> bool: kwargs["pause_after_test"] = False kwargs["include"] = ["infrastructure/failing-test.html"] return not self._test_wpt(build_type=build_type, **kwargs) @@ -375,7 +381,7 @@ class MachCommands(CommandBase): return self._test_wpt(servo_binary, **kwargs) @CommandBase.allow_target_configuration - def _test_wpt(self, servo_binary: str, **kwargs): + def _test_wpt(self, servo_binary: str, **kwargs) -> int: # TODO(mrobinson): Why do we pass the wrong binary path in when running WPT on Android? return_value = wpt.run.run_tests(servo_binary, **kwargs) return return_value if not kwargs["always_succeed"] else 0 @@ -386,11 +392,11 @@ class MachCommands(CommandBase): category="testing", parser=wpt.manifestupdate.create_parser, ) - def update_manifest(self, **kwargs): + def update_manifest(self, **kwargs) -> int: return wpt.manifestupdate.update(check_clean=False) @Command("fmt", description="Format Rust, Python, and TOML files", category="testing") - def format_code(self): + def format_code(self) -> int: result = format_python_files_with_ruff(check_only=False) if result != 0: return result @@ -404,7 +410,7 @@ class MachCommands(CommandBase): @Command( "update-wpt", description="Update the web platform tests", category="testing", parser=wpt.update.create_parser ) - def update_wpt(self, **kwargs): + def update_wpt(self, **kwargs) -> int: patch = kwargs.get("patch", False) if not patch and kwargs["sync"]: print("Are you sure you don't want a patch?") @@ -413,32 +419,33 @@ class MachCommands(CommandBase): @Command("test-jquery", description="Run the jQuery test suite", category="testing") @CommandBase.common_command_arguments(binary_selection=True) - def test_jquery(self, servo_binary: str): + def test_jquery(self, servo_binary: str) -> int: return self.jquery_test_runner("test", servo_binary) @Command("test-dromaeo", description="Run the Dromaeo test suite", category="testing") @CommandArgument("tests", default=["recommended"], nargs="...", help="Specific tests to run") @CommandArgument("--bmf-output", default=None, help="Specify BMF JSON output file") @CommandBase.common_command_arguments(binary_selection=True) - def test_dromaeo(self, tests, servo_binary: str, bmf_output: str | None = None): + def test_dromaeo(self, tests, servo_binary: str, bmf_output: str | None = None) -> None: return self.dromaeo_test_runner(tests, servo_binary, bmf_output) @Command("test-speedometer", description="Run servo's speedometer", category="testing") @CommandArgument("--bmf-output", default=None, help="Specify BMF JSON output file") @CommandBase.common_command_arguments(binary_selection=True) - def test_speedometer(self, servo_binary: str, bmf_output: str | None = None): + def test_speedometer(self, servo_binary: str, bmf_output: str | None = None) -> None: return self.speedometer_runner(servo_binary, bmf_output) @Command("test-speedometer-ohos", description="Run servo's speedometer on a ohos device", category="testing") @CommandArgument("--bmf-output", default=None, help="Specifcy BMF JSON output file") @CommandArgument("--profile", default=None, help="Specify a profile which will be prepended to the output") # This needs to be a separate command because we do not need a binary locally - def test_speedometer_ohos(self, bmf_output: str | None = None, profile: str | None = None): + + def test_speedometer_ohos(self, bmf_output: str | None = None, profile: str | None = None) -> None: return self.speedometer_runner_ohos(bmf_output, profile) @Command("update-jquery", description="Update the jQuery test suite expected results", category="testing") @CommandBase.common_command_arguments(binary_selection=True) - def update_jquery(self, servo_binary: str): + def update_jquery(self, servo_binary: str) -> int: return self.jquery_test_runner("update", servo_binary) @Command( @@ -447,7 +454,7 @@ class MachCommands(CommandBase): @CommandArgument( "params", default=None, nargs="...", help=" filepaths of output files of two runs of dromaeo test " ) - def compare_dromaeo(self, params): + def compare_dromaeo(self, params) -> None: prev_op_filename = params[0] cur_op_filename = params[1] result = {"Test": [], "Prev_Time": [], "Cur_Time": [], "Difference(%)": []} @@ -527,7 +534,7 @@ class MachCommands(CommandBase): ) ) - def jquery_test_runner(self, cmd, binary: str): + def jquery_test_runner(self, cmd: str, binary: str) -> int: base_dir = path.abspath(path.join("tests", "jquery")) jquery_dir = path.join(base_dir, "jquery") run_file = path.join(base_dir, "run_jquery.py") @@ -544,7 +551,7 @@ class MachCommands(CommandBase): return call([run_file, cmd, bin_path, base_dir]) - def dromaeo_test_runner(self, tests, binary: str, bmf_output: str | None): + def dromaeo_test_runner(self, tests, binary: str, bmf_output: str | None) -> None: base_dir = path.abspath(path.join("tests", "dromaeo")) dromaeo_dir = path.join(base_dir, "dromaeo") run_file = path.join(base_dir, "run_dromaeo.py") @@ -570,11 +577,11 @@ class MachCommands(CommandBase): return check_call([run_file, "|".join(tests), bin_path, base_dir, bmf_output]) - def speedometer_to_bmf(self, speedometer: str, bmf_output: str | None = None, profile: str | None = None): + def speedometer_to_bmf(self, speedometer: dict[str, Any], bmf_output: str, profile: str | None = None) -> None: output = dict() profile = "" if profile is None else profile + "/" - def parse_speedometer_result(result): + def parse_speedometer_result(result) -> None: if result["unit"] == "ms": output[profile + f"Speedometer/{result['name']}"] = { "latency": { # speedometer has ms we need to convert to ns @@ -592,7 +599,7 @@ class MachCommands(CommandBase): } } else: - raise "Unknown unit!" + raise Exception("Unknown unit!") for child in result["children"]: parse_speedometer_result(child) @@ -602,7 +609,7 @@ class MachCommands(CommandBase): with open(bmf_output, "w", encoding="utf-8") as f: json.dump(output, f, indent=4) - def speedometer_runner(self, binary: str, bmf_output: str | None): + def speedometer_runner(self, binary: str, bmf_output: str | None) -> None: speedometer = json.loads( subprocess.check_output( [ @@ -622,13 +629,16 @@ class MachCommands(CommandBase): if bmf_output: self.speedometer_to_bmf(speedometer, bmf_output) - def speedometer_runner_ohos(self, bmf_output: str | None, profile: str | None): - hdc_path: str = shutil.which("hdc") + def speedometer_runner_ohos(self, bmf_output: str | None, profile: str | None) -> None: + hdc_path = shutil.which("hdc") log_path: str = "/data/app/el2/100/base/org.servo.servo/cache/servo.log" - if hdc_path is None: - hdc_path = path.join(os.getenv("OHOS_SDK_NATIVE"), "../", "toolchains", "hdc") - def read_log_file() -> str: + if hdc_path is None: + ohos_sdk_native = os.getenv("OHOS_SDK_NATIVE") + assert ohos_sdk_native + hdc_path = path.join(ohos_sdk_native, "../", "toolchains", "hdc") + + def read_log_file(hdc_path: str) -> str: subprocess.call([hdc_path, "file", "recv", log_path]) file = "" try: @@ -670,12 +680,12 @@ class MachCommands(CommandBase): whole_file: str = "" for i in range(10): sleep(30) - whole_file = read_log_file() + whole_file = read_log_file(hdc_path) if "[INFO script::dom::console]" in whole_file: # technically the file could not have been written completely yet # on devices with slow flash, we might want to wait a bit more sleep(2) - whole_file = read_log_file() + whole_file = read_log_file(hdc_path) break start_index: int = whole_file.index("[INFO script::dom::console]") + len("[INFO script::dom::console]") + 1 json_string = whole_file[start_index:] @@ -694,7 +704,7 @@ class MachCommands(CommandBase): run_file = path.abspath( path.join(PROJECT_TOPLEVEL_PATH, "components", "net", "tests", "cookie_http_state_utils.py") ) - run_globals = {"__file__": run_file} + run_globals: dict[str, Any] = {"__file__": run_file} exec(compile(open(run_file).read(), run_file, "exec"), run_globals) return run_globals["update_test_file"](cache_dir) @@ -711,7 +721,7 @@ class MachCommands(CommandBase): if os.path.exists(dest_folder): shutil.rmtree(dest_folder) - run_globals = {"__file__": run_file} + run_globals: dict[str, Any] = {"__file__": run_file} exec(compile(open(run_file).read(), run_file, "exec"), run_globals) return run_globals["update_conformance"](version, dest_folder, None, patches_dir) @@ -775,7 +785,7 @@ class MachCommands(CommandBase): ) @CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Servo") @CommandBase.common_command_arguments(binary_selection=True) - def smoketest(self, servo_binary: str, params, **kwargs): + def smoketest(self, servo_binary: str, params, **kwargs) -> int | None: # We pass `-f` here so that any thread panic will cause Servo to exit, # preventing a panic from hanging execution. This means that these kind # of panics won't cause timeouts on CI. @@ -786,7 +796,7 @@ class MachCommands(CommandBase): @CommandArgument( "try_strings", default=["full"], nargs="...", help="A list of try strings specifying what kind of job to run." ) - def try_command(self, remote: str, try_strings: list[str]): + def try_command(self, remote: str, try_strings: list[str]) -> int: if subprocess.check_output(["git", "diff", "--cached", "--name-only"]).strip(): print("Cannot run `try` with staged and uncommited changes. ") print("Please either commit or stash them before running `try`.") @@ -836,7 +846,7 @@ class MachCommands(CommandBase): return result -def create_parser_create(): +def create_parser_create() -> ArgumentParser: import argparse p = argparse.ArgumentParser() diff --git a/python/servo/try_parser.py b/python/servo/try_parser.py index c05a6f6ed8f..5dea810667c 100644 --- a/python/servo/try_parser.py +++ b/python/servo/try_parser.py @@ -61,7 +61,7 @@ class JobConfig(object): self.update_name() return True - def update_name(self): + def update_name(self) -> None: if self.workflow is Workflow.LINUX: self.name = "Linux" elif self.workflow is Workflow.MACOS: @@ -158,7 +158,7 @@ def handle_preset(s: str) -> Optional[JobConfig]: return None -def handle_modifier(config: JobConfig, s: str) -> Optional[JobConfig]: +def handle_modifier(config: Optional[JobConfig], s: str) -> Optional[JobConfig]: if config is None: return None s = s.lower() @@ -184,13 +184,13 @@ class Encoder(json.JSONEncoder): class Config(object): - def __init__(self, s: Optional[str] = None): + def __init__(self, s: Optional[str] = None) -> None: self.fail_fast: bool = False self.matrix: list[JobConfig] = list() if s is not None: self.parse(s) - def parse(self, input: str): + def parse(self, input: str) -> None: input = input.lower().strip() if not input: @@ -224,7 +224,7 @@ class Config(object): else: self.add_or_merge_job_to_matrix(job) - def add_or_merge_job_to_matrix(self, job: JobConfig): + def add_or_merge_job_to_matrix(self, job: JobConfig) -> None: for existing_job in self.matrix: if existing_job.merge(job): return @@ -234,7 +234,7 @@ class Config(object): return json.dumps(self, cls=Encoder, **kwargs) -def main(): +def main() -> None: conf = Config(" ".join(sys.argv[1:])) print(conf.to_json()) @@ -244,7 +244,7 @@ if __name__ == "__main__": class TestParser(unittest.TestCase): - def test_string(self): + def test_string(self) -> None: self.assertDictEqual( json.loads(Config("linux-unit-tests fail-fast").to_json()), { @@ -266,7 +266,7 @@ class TestParser(unittest.TestCase): }, ) - def test_empty(self): + def test_empty(self) -> None: self.assertDictEqual( json.loads(Config("").to_json()), { @@ -348,7 +348,7 @@ class TestParser(unittest.TestCase): }, ) - def test_job_merging(self): + def test_job_merging(self) -> None: self.assertDictEqual( json.loads(Config("linux-wpt").to_json()), { @@ -379,6 +379,8 @@ class TestParser(unittest.TestCase): a = handle_modifier(a, "linux-unit-tests") b = handle_preset("linux-wpt") b = handle_modifier(b, "linux-wpt") + assert a is not None + assert b is not None self.assertTrue(a.merge(b), "Should merge jobs that have different unit test configurations.") self.assertEqual(a, JobConfig("Linux (Unit Tests, WPT)", Workflow.LINUX, unit_tests=True, wpt=True)) @@ -402,14 +404,14 @@ class TestParser(unittest.TestCase): self.assertFalse(a.merge(b), "Should not merge jobs with different build arguments.") self.assertEqual(a, JobConfig("Linux (Unit Tests)", Workflow.LINUX, unit_tests=True)) - def test_full(self): + def test_full(self) -> None: self.assertDictEqual(json.loads(Config("full").to_json()), json.loads(Config("").to_json())) - def test_wpt_alias(self): + def test_wpt_alias(self) -> None: self.assertDictEqual(json.loads(Config("wpt").to_json()), json.loads(Config("linux-wpt").to_json())) -def run_tests(): +def run_tests() -> bool: verbosity = 1 if logging.getLogger().level >= logging.WARN else 2 suite = unittest.TestLoader().loadTestsFromTestCase(TestParser) return unittest.TextTestRunner(verbosity=verbosity).run(suite).wasSuccessful() diff --git a/python/servo/util.py b/python/servo/util.py index 2dd670da970..03be77e005e 100644 --- a/python/servo/util.py +++ b/python/servo/util.py @@ -14,9 +14,10 @@ import shutil import stat import sys import time -import urllib +import urllib.error import urllib.request import zipfile +from zipfile import ZipInfo from typing import Dict, List, Union from io import BufferedIOBase, BytesIO @@ -26,20 +27,20 @@ SCRIPT_PATH = os.path.abspath(os.path.dirname(__file__)) SERVO_ROOT = os.path.abspath(os.path.join(SCRIPT_PATH, "..", "..")) -def remove_readonly(func, path, _): +def remove_readonly(func, path, _) -> None: "Clear the readonly bit and reattempt the removal" os.chmod(path, stat.S_IWRITE) func(path) -def delete(path): +def delete(path) -> None: if os.path.isdir(path) and not os.path.islink(path): shutil.rmtree(path, onerror=remove_readonly) else: os.remove(path) -def download(description: str, url: str, writer: BufferedIOBase, start_byte: int = 0): +def download(description: str, url: str, writer: BufferedIOBase, start_byte: int = 0) -> None: if start_byte: print("Resuming download of {} ...".format(url)) else: @@ -101,13 +102,13 @@ def download(description: str, url: str, writer: BufferedIOBase, start_byte: int raise -def download_bytes(description: str, url: str): +def download_bytes(description: str, url: str) -> bytes: content_writer = BytesIO() download(description, url, content_writer) return content_writer.getvalue() -def download_file(description: str, url: str, destination_path: str): +def download_file(description: str, url: str, destination_path: str) -> None: tmp_path = destination_path + ".part" try: start_byte = os.path.getsize(tmp_path) @@ -122,7 +123,7 @@ def download_file(description: str, url: str, destination_path: str): # https://stackoverflow.com/questions/39296101/python-zipfile-removes-execute-permissions-from-binaries # In particular, we want the executable bit for executable files. class ZipFileWithUnixPermissions(zipfile.ZipFile): - def extract(self, member, path=None, pwd=None): + def extract(self, member, path=None, pwd=None) -> str: if not isinstance(member, zipfile.ZipInfo): member = self.getinfo(member) @@ -136,11 +137,12 @@ class ZipFileWithUnixPermissions(zipfile.ZipFile): return extracted # For Python 3.x - def _extract_member(self, member, targetpath, pwd): - if sys.version_info[0] >= 3: + def _extract_member(self, member: ZipInfo, targetpath, pwd) -> str: + if int(sys.version_info[0]) >= 3: if not isinstance(member, zipfile.ZipInfo): member = self.getinfo(member) + # pyrefly: ignore # missing-attribute targetpath = super()._extract_member(member, targetpath, pwd) attr = member.external_attr >> 16 @@ -148,10 +150,11 @@ class ZipFileWithUnixPermissions(zipfile.ZipFile): os.chmod(targetpath, attr) return targetpath else: + # pyrefly: ignore # missing-attribute return super(ZipFileWithUnixPermissions, self)._extract_member(member, targetpath, pwd) -def extract(src, dst, movedir=None, remove=True): +def extract(src, dst, movedir=None, remove=True) -> None: assert src.endswith(".zip") ZipFileWithUnixPermissions(src).extractall(dst) @@ -166,7 +169,7 @@ def extract(src, dst, movedir=None, remove=True): os.remove(src) -def check_hash(filename, expected, algorithm): +def check_hash(filename, expected, algorithm) -> None: hasher = hashlib.new(algorithm) with open(filename, "rb") as f: while True: @@ -179,11 +182,11 @@ def check_hash(filename, expected, algorithm): sys.exit(1) -def get_default_cache_dir(topdir): +def get_default_cache_dir(topdir) -> str: return os.environ.get("SERVO_CACHE_DIR", os.path.join(topdir, ".servo")) -def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]): +def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]) -> None: if isinstance(paths, list): paths = os.pathsep.join(paths) @@ -195,7 +198,7 @@ def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[st env[key] = new_value -def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]): +def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]) -> None: if isinstance(paths, list): paths = os.pathsep.join(paths) @@ -206,5 +209,5 @@ def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[s env[key] = new_value -def get_target_dir(): +def get_target_dir() -> str: return os.environ.get("CARGO_TARGET_DIR", os.path.join(SERVO_ROOT, "target")) diff --git a/python/servo/visual_studio.py b/python/servo/visual_studio.py index 84b73b832e7..6a615b531c6 100644 --- a/python/servo/visual_studio.py +++ b/python/servo/visual_studio.py @@ -30,11 +30,11 @@ class VisualStudioInstallation: installation_path: str vc_install_path: str - def __lt__(self, other): + def __lt__(self, other) -> bool: return self.version_number < other.version_number -def find_vswhere(): +def find_vswhere() -> str | None: for path in [PROGRAM_FILES, PROGRAM_FILES_X86]: if not path: continue @@ -145,11 +145,11 @@ def find_msvc_redist_dirs(vs_platform: str) -> Generator[str, None, None]: path1 = os.path.join(vs_platform, "Microsoft.{}.CRT".format(redist_version)) path2 = os.path.join("onecore", vs_platform, "Microsoft.{}.CRT".format(redist_version)) for path in [path1, path2]: - path = os.path.join(redist_path, path) - if os.path.isdir(path): - yield path + full_path = os.path.join(redist_path, path) + if os.path.isdir(full_path): + yield full_path else: - tried.append(path) + tried.append(full_path) print("Couldn't locate MSVC redistributable directory. Tried:", file=sys.stderr) for path in tried: @@ -169,7 +169,10 @@ def find_windows_sdk_installation_path() -> str: # https://stackoverflow.com/questions/35119223/how-to-programmatically-detect-and-locate-the-windows-10-sdk key_path = r"SOFTWARE\Wow6432Node\Microsoft\Microsoft SDKs\Windows\v10.0" try: - with winreg.OpenKeyEx(winreg.HKEY_LOCAL_MACHINE, key_path) as key: - return str(winreg.QueryValueEx(key, "InstallationFolder")[0]) + if sys.platform == "win32": + with winreg.OpenKeyEx(winreg.HKEY_LOCAL_MACHINE, key_path) as key: + return str(winreg.QueryValueEx(key, "InstallationFolder")[0]) + else: + raise EnvironmentError("Couldn't locate HKEY_LOCAL_MACHINE because it's only available on Windows system") except FileNotFoundError: raise Exception(f"Couldn't find Windows SDK installation path in registry at path ({key_path})")