mach: Add type check on python/servo directory (#38085)

Introduce `python/servo` folder in pyrefly type checker

Testing: Manual testing via `./mach test-tidy` command

---------

Signed-off-by: Jerens Lensun <jerensslensun@gmail.com>
This commit is contained in:
Jerens Lensun 2025-07-28 12:16:08 +08:00 committed by GitHub
parent 056b1538c0
commit 93d234d270
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 310 additions and 276 deletions

View file

@ -80,7 +80,7 @@ CATEGORIES = {
}
def _process_exec(args, cwd):
def _process_exec(args: list[str], cwd) -> None:
try:
subprocess.check_output(args, stderr=subprocess.STDOUT, cwd=cwd)
except subprocess.CalledProcessError as exception:
@ -89,7 +89,7 @@ def _process_exec(args, cwd):
sys.exit(1)
def install_virtual_env_requirements(project_path: str, marker_path: str):
def install_virtual_env_requirements(project_path: str, marker_path: str) -> None:
requirements_paths = [
os.path.join(project_path, "python", "requirements.txt"),
os.path.join(
@ -127,7 +127,7 @@ def install_virtual_env_requirements(project_path: str, marker_path: str):
marker_file.write(requirements_hash)
def _activate_virtualenv(topdir):
def _activate_virtualenv(topdir: str) -> None:
virtualenv_path = os.path.join(topdir, ".venv")
with open(".python-version", "r") as python_version_file:
@ -151,7 +151,7 @@ def _activate_virtualenv(topdir):
warnings.filterwarnings("ignore", category=SyntaxWarning, module=r".*.venv")
def _ensure_case_insensitive_if_windows():
def _ensure_case_insensitive_if_windows() -> None:
# The folder is called 'python'. By deliberately checking for it with the wrong case, we determine if the file
# system is case sensitive or not.
if _is_windows() and not os.path.exists("Python"):
@ -160,11 +160,11 @@ def _ensure_case_insensitive_if_windows():
sys.exit(1)
def _is_windows():
def _is_windows() -> bool:
return sys.platform == "win32"
def bootstrap_command_only(topdir):
def bootstrap_command_only(topdir: str) -> int:
# we should activate the venv before importing servo.boostrap
# because the module requires non-standard python packages
_activate_virtualenv(topdir)
@ -188,7 +188,7 @@ def bootstrap_command_only(topdir):
return 0
def bootstrap(topdir):
def bootstrap(topdir: str):
_ensure_case_insensitive_if_windows()
topdir = os.path.abspath(topdir)
@ -215,6 +215,7 @@ def bootstrap(topdir):
import mach.main
mach = mach.main.Mach(os.getcwd())
# pyrefly: ignore[bad-assignment]
mach.populate_context_handler = populate_context
for category, meta in CATEGORIES.items():

View file

@ -18,7 +18,7 @@ import subprocess
import sys
import tempfile
import traceback
import urllib
import urllib.error
import toml
@ -40,7 +40,7 @@ class MachCommands(CommandBase):
@CommandArgument("--force", "-f", action="store_true", help="Boostrap without confirmation")
@CommandArgument("--skip-platform", action="store_true", help="Skip platform bootstrapping.")
@CommandArgument("--skip-lints", action="store_true", help="Skip tool necessary for linting.")
def bootstrap(self, force=False, skip_platform=False, skip_lints=False):
def bootstrap(self, force=False, skip_platform=False, skip_lints=False) -> int:
# Note: This entry point isn't actually invoked by ./mach bootstrap.
# ./mach bootstrap calls mach_bootstrap.bootstrap_command_only so that
# it can install dependencies without needing mach's dependencies
@ -57,7 +57,7 @@ class MachCommands(CommandBase):
category="bootstrap",
)
@CommandArgument("--force", "-f", action="store_true", help="Boostrap without confirmation")
def bootstrap_gstreamer(self, force=False):
def bootstrap_gstreamer(self, force=False) -> int:
try:
servo.platform.get().bootstrap_gstreamer(force)
except NotImplementedError as exception:
@ -66,7 +66,7 @@ class MachCommands(CommandBase):
return 0
@Command("update-hsts-preload", description="Download the HSTS preload list", category="bootstrap")
def bootstrap_hsts_preload(self, force=False):
def bootstrap_hsts_preload(self, force=False) -> None:
preload_filename = "hsts_preload.fstmap"
preload_path = path.join(self.context.topdir, "resources")
@ -104,7 +104,7 @@ class MachCommands(CommandBase):
description="Download the public domains list and update resources/public_domains.txt",
category="bootstrap",
)
def bootstrap_pub_suffix(self, force=False):
def bootstrap_pub_suffix(self, force=False) -> None:
list_url = "https://publicsuffix.org/list/public_suffix_list.dat"
dst_filename = path.join(self.context.topdir, "resources", "public_domains.txt")
not_implemented_case = re.compile(r"^[^*]+\*")
@ -122,12 +122,12 @@ class MachCommands(CommandBase):
for suffix in suffixes:
if not_implemented_case.match(suffix):
print("Warning: the new list contains a case that servo can't handle: %s" % suffix)
fo.write(suffix.encode("idna") + "\n")
fo.write(suffix.encode("idna") + b"\n")
@Command("clean-nightlies", description="Clean unused nightly builds of Rust and Cargo", category="bootstrap")
@CommandArgument("--force", "-f", action="store_true", help="Actually remove stuff")
@CommandArgument("--keep", default="1", help="Keep up to this many most recent nightlies")
def clean_nightlies(self, force=False, keep=None):
def clean_nightlies(self, force: bool, keep: str | int) -> None:
print(f"Current Rust version for Servo: {self.rust_toolchain()}")
old_toolchains = []
keep = int(keep)
@ -158,8 +158,8 @@ class MachCommands(CommandBase):
@CommandArgument("--force", "-f", action="store_true", help="Actually remove stuff")
@CommandArgument("--show-size", "-s", action="store_true", help="Show packages size")
@CommandArgument("--keep", default="1", help="Keep up to this many most recent dependencies")
def clean_cargo_cache(self, force=False, show_size=False, keep=None):
def get_size(path):
def clean_cargo_cache(self, force: bool, show_size: bool, keep: str) -> None:
def get_size(path: str) -> float:
if os.path.isfile(path):
return os.path.getsize(path) / (1024 * 1024.0)
total_size = 0
@ -177,7 +177,7 @@ class MachCommands(CommandBase):
import toml
if os.environ.get("CARGO_HOME", ""):
cargo_dir = os.environ.get("CARGO_HOME")
cargo_dir = os.environ["CARGO_HOME"]
else:
home_dir = os.path.expanduser("~")
cargo_dir = path.join(home_dir, ".cargo")
@ -265,7 +265,7 @@ class MachCommands(CommandBase):
}
packages["crates"][crate_name]["exist"].append(d)
total_size = 0
total_size = 0.0
for packages_type in ["git", "crates"]:
sorted_packages = sorted(packages[packages_type])
for crate_name in sorted_packages:
@ -309,7 +309,7 @@ class MachCommands(CommandBase):
crate_paths.append(path.join(crates_cache_dir, "{}.crate".format(exist)))
crate_paths.append(path.join(crates_src_dir, exist))
size = sum(get_size(p) for p in crate_paths) if show_size else 0
size = sum((get_size(p) for p in crate_paths), 0.0) if show_size else 0.0
total_size += size
print_msg = (exist_name, " ({}MB)".format(round(size, 2)) if show_size else "", cargo_dir)
if force:

View file

@ -9,6 +9,7 @@
import datetime
import os
from os import PathLike
import os.path as path
import pathlib
import shutil
@ -17,7 +18,7 @@ import subprocess
import sys
from time import time
from typing import Optional, List, Dict
from typing import Optional, List, Dict, Union
from mach.decorators import (
CommandArgument,
@ -71,7 +72,7 @@ def get_rustc_llvm_version() -> Optional[List[int]]:
llvm_version = llvm_version.strip()
version = llvm_version.split(".")
print(f"Info: rustc is using LLVM version {'.'.join(version)}")
return version
return list(map(int, version))
else:
print(f"Error: Couldn't find LLVM version in output of `rustc --version --verbose`: `{result.stdout}`")
except Exception as e:
@ -101,7 +102,7 @@ class MachCommands(CommandBase):
sanitizer: SanitizerKind = SanitizerKind.NONE,
flavor=None,
**kwargs,
):
) -> int:
opts = params or []
if build_type.is_release():
@ -178,7 +179,7 @@ class MachCommands(CommandBase):
# On the Mac, set a lovely icon. This makes it easier to pick out the Servo binary in tools
# like Instruments.app.
try:
import Cocoa
import Cocoa # pyrefly: ignore[import-error]
icon_path = path.join(self.get_top_dir(), "resources", "servo_1024.png")
icon = Cocoa.NSImage.alloc().initWithContentsOfFile_(icon_path)
@ -192,14 +193,14 @@ class MachCommands(CommandBase):
elapsed_delta = datetime.timedelta(seconds=int(elapsed))
build_message = f"{'Succeeded' if status == 0 else 'Failed'} in {elapsed_delta}"
print(build_message)
assert isinstance(status, int)
return status
@Command("clean", description="Clean the target/ and Python virtual environment directories", category="build")
@CommandArgument("--manifest-path", default=None, help="Path to the manifest to the package to clean")
@CommandArgument("--verbose", "-v", action="store_true", help="Print verbose output")
@CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Cargo")
def clean(self, manifest_path=None, params=[], verbose=False):
def clean(self, manifest_path=None, params=[], verbose=False) -> None:
self.ensure_bootstrapped()
virtualenv_path = path.join(self.get_top_dir(), ".venv")
@ -214,8 +215,8 @@ class MachCommands(CommandBase):
return check_call(["cargo", "clean"] + opts, env=self.build_env(), verbose=verbose)
def build_sanitizer_env(
self, env: Dict, opts: List[str], kwargs, target_triple, sanitizer: SanitizerKind = SanitizerKind.NONE
):
self, env: Dict, opts: List[str], kwargs, target_triple: str, sanitizer: SanitizerKind = SanitizerKind.NONE
) -> None:
if sanitizer.is_none():
return
# do not use crown (clashes with different rust version)
@ -293,7 +294,7 @@ def copy_windows_dlls_to_build_directory(servo_binary: str, target: BuildTarget)
# Copy in the built EGL and GLES libraries from where they were built to
# the final build dirctory
def find_and_copy_built_dll(dll_name):
def find_and_copy_built_dll(dll_name: str) -> None:
try:
file_to_copy = next(pathlib.Path(build_path).rglob(dll_name))
shutil.copy(file_to_copy, servo_exe_dir)
@ -315,7 +316,7 @@ def copy_windows_dlls_to_build_directory(servo_binary: str, target: BuildTarget)
return True
def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget):
def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget) -> bool:
gst_root = servo.platform.get().gstreamer_root(target)
if not gst_root:
print("Could not find GStreamer installation directory.")
@ -354,8 +355,8 @@ def package_gstreamer_dlls(servo_exe_dir: str, target: BuildTarget):
return not missing
def package_msvc_dlls(servo_exe_dir: str, target: BuildTarget):
def copy_file(dll_path: Optional[str]) -> bool:
def package_msvc_dlls(servo_exe_dir: str, target: BuildTarget) -> bool:
def copy_file(dll_path: Union[PathLike[str], str]) -> bool:
if not dll_path or not os.path.exists(dll_path):
print(f"WARNING: Could not find DLL at {dll_path}", file=sys.stderr)
return False

View file

@ -15,20 +15,20 @@ import gzip
import itertools
import locale
import os
import re
import shutil
import subprocess
import sys
import tarfile
import urllib
import zipfile
import urllib.error
import urllib.request
from dataclasses import dataclass
from enum import Enum
from errno import ENOENT as NO_SUCH_FILE_OR_DIRECTORY
from glob import glob
from os import path
from subprocess import PIPE
from typing import Any, Dict, List, Optional
from subprocess import PIPE, CompletedProcess
from typing import Any, Dict, List, Optional, Union, LiteralString, cast
from xml.etree.ElementTree import XML
import toml
@ -54,13 +54,13 @@ class BuildType:
CUSTOM = 3
kind: Kind
profile: Optional[str]
profile: str
def dev() -> BuildType:
return BuildType(BuildType.Kind.DEV, None)
return BuildType(BuildType.Kind.DEV, "debug")
def release() -> BuildType:
return BuildType(BuildType.Kind.RELEASE, None)
return BuildType(BuildType.Kind.RELEASE, "release")
def prod() -> BuildType:
return BuildType(BuildType.Kind.CUSTOM, "production")
@ -93,7 +93,7 @@ class BuildType:
@contextlib.contextmanager
def cd(new_path):
def cd(new_path: str):
"""Context manager for changing the current working directory"""
previous_path = os.getcwd()
try:
@ -104,7 +104,7 @@ def cd(new_path):
@contextlib.contextmanager
def setlocale(name):
def setlocale(name: str):
"""Context manager for changing the current locale"""
saved_locale = locale.setlocale(locale.LC_ALL)
try:
@ -126,7 +126,7 @@ def find_dep_path_newest(package, bin_path):
return None
def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None):
def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None) -> None:
"""Create a .tar.gz archive in a deterministic (reproducible) manner.
See https://reproducible-builds.org/docs/archives/ for more details."""
@ -177,27 +177,29 @@ def archive_deterministically(dir_to_archive, dest_archive, prepend_path=None):
os.rename(temp_file, dest_archive)
def call(*args, **kwargs):
def call(*args, **kwargs) -> int:
"""Wrap `subprocess.call`, printing the command if verbose=True."""
verbose = kwargs.pop("verbose", False)
if verbose:
print(" ".join(args[0]))
# we have to use shell=True in order to get PATH handling
# when looking for the binary on Windows\
kwargs.setdefault("shell", sys.platform == "win32")
return subprocess.call(*args, **kwargs)
def check_output(*args, **kwargs) -> Union[str, bytes]:
"""Wrap `subprocess.call`, printing the command if verbose=True."""
verbose = kwargs.pop("verbose", False)
if verbose:
print(" ".join(args[0]))
# we have to use shell=True in order to get PATH handling
# when looking for the binary on Windows
return subprocess.call(*args, shell=sys.platform == "win32", **kwargs)
kwargs.setdefault("shell", sys.platform == "win32")
return subprocess.check_output(*args, **kwargs)
def check_output(*args, **kwargs) -> bytes:
"""Wrap `subprocess.call`, printing the command if verbose=True."""
verbose = kwargs.pop("verbose", False)
if verbose:
print(" ".join(args[0]))
# we have to use shell=True in order to get PATH handling
# when looking for the binary on Windows
return subprocess.check_output(*args, shell=sys.platform == "win32", **kwargs)
def check_call(*args, **kwargs):
def check_call(*args, **kwargs) -> None:
"""Wrap `subprocess.check_call`, printing the command if verbose=True.
Also fix any unicode-containing `env`, for subprocess"""
@ -207,8 +209,9 @@ def check_call(*args, **kwargs):
print(" ".join(args[0]))
# we have to use shell=True in order to get PATH handling
# when looking for the binary on Windows
proc = subprocess.Popen(*args, shell=sys.platform == "win32", **kwargs)
status = None
kwargs.setdefault("shell", sys.platform == "win32")
proc = subprocess.Popen(*args, **kwargs)
status: Optional[int] = None
# Leave it to the subprocess to handle Ctrl+C. If it terminates as
# a result of Ctrl+C, proc.wait() will return a status code, and,
# we get out of the loop. If it doesn't, like e.g. gdb, we continue
@ -223,20 +226,20 @@ def check_call(*args, **kwargs):
raise subprocess.CalledProcessError(status, " ".join(*args))
def is_windows():
def is_windows() -> bool:
return sys.platform == "win32"
def is_macosx():
def is_macosx() -> bool:
return sys.platform == "darwin"
def is_linux():
def is_linux() -> bool:
return sys.platform.startswith("linux")
class BuildNotFound(Exception):
def __init__(self, message):
def __init__(self, message) -> None:
self.message = message
def __str__(self):
@ -248,7 +251,9 @@ class CommandBase(object):
This mostly handles configuration management, such as .servobuild."""
def __init__(self, context):
target: BuildTarget
def __init__(self, context) -> None:
self.context = context
self.enable_media = False
self.features = []
@ -257,13 +262,13 @@ class CommandBase(object):
# by `configure_build_target`
self.target = BuildTarget.from_triple(None)
def get_env_bool(var, default):
def get_env_bool(var: str, default: bool) -> bool:
# Contents of env vars are strings by default. This returns the
# boolean value of the specified environment variable, or the
# speciried default if the var doesn't contain True or False
return {"True": True, "False": False}.get(os.environ.get(var), default)
return {"True": True, "False": False}.get(os.environ.get(var, ""), default)
def resolverelative(category, key):
def resolverelative(category: str, key: str) -> None:
# Allow ~
self.config[category][key] = path.expanduser(self.config[category][key])
# Resolve relative paths
@ -327,7 +332,7 @@ class CommandBase(object):
def get_top_dir(self):
return self.context.topdir
def get_binary_path(self, build_type: BuildType, sanitizer: SanitizerKind = SanitizerKind.NONE):
def get_binary_path(self, build_type: BuildType, sanitizer: SanitizerKind = SanitizerKind.NONE) -> str:
base_path = util.get_target_dir()
if sanitizer.is_some() or self.target.is_cross_build():
base_path = path.join(base_path, self.target.triple())
@ -339,7 +344,7 @@ class CommandBase(object):
return binary_path
def detach_volume(self, mounted_volume):
def detach_volume(self, mounted_volume: str | bytes) -> None:
print("Detaching volume {}".format(mounted_volume))
try:
subprocess.check_call(["hdiutil", "detach", mounted_volume])
@ -347,11 +352,11 @@ class CommandBase(object):
print("Could not detach volume {} : {}".format(mounted_volume, e.returncode))
sys.exit(1)
def detach_volume_if_attached(self, mounted_volume):
def detach_volume_if_attached(self, mounted_volume: str | bytes) -> None:
if os.path.exists(mounted_volume):
self.detach_volume(mounted_volume)
def mount_dmg(self, dmg_path):
def mount_dmg(self, dmg_path) -> None:
print("Mounting dmg {}".format(dmg_path))
try:
subprocess.check_call(["hdiutil", "attach", dmg_path])
@ -359,7 +364,7 @@ class CommandBase(object):
print("Could not mount Servo dmg : {}".format(e.returncode))
sys.exit(1)
def extract_nightly(self, nightlies_folder, destination_folder, destination_file):
def extract_nightly(self, nightlies_folder: LiteralString, destination_folder: str, destination_file: str) -> None:
print("Extracting to {} ...".format(destination_folder))
if is_macosx():
mounted_volume = path.join(path.sep, "Volumes", "Servo")
@ -382,14 +387,14 @@ class CommandBase(object):
with tarfile.open(os.path.join(nightlies_folder, destination_file), "r") as tar:
tar.extractall(destination_folder)
def get_executable(self, destination_folder):
def get_executable(self, destination_folder: str) -> str:
if is_windows():
return path.join(destination_folder, "PFiles", "Mozilla research", "Servo Tech Demo")
if is_linux:
return path.join(destination_folder, "servo", "servo")
return path.join(destination_folder, "servo")
def get_nightly_binary_path(self, nightly_date):
def get_nightly_binary_path(self, nightly_date) -> str | None:
if nightly_date is None:
return
if not nightly_date:
@ -409,10 +414,18 @@ class CommandBase(object):
response = urllib.request.urlopen(req).read()
tree = XML(response)
namespaces = {"ns": tree.tag[1 : tree.tag.index("}")]}
# pyrefly: ignore # missing-attribute
file_to_download = tree.find("ns:Contents", namespaces).find("ns:Key", namespaces).text
except urllib.error.URLError as e:
print("Could not fetch the available nightly versions from the repository : {}".format(e.reason))
sys.exit(1)
except ValueError as e:
print(
"Could not fetch a nightly version for date {} and platform {} cause of {}".format(
nightly_date, os_prefix, e
)
)
sys.exit(1)
except AttributeError:
print("Could not fetch a nightly version for date {} and platform {}".format(nightly_date, os_prefix))
sys.exit(1)
@ -420,6 +433,7 @@ class CommandBase(object):
nightly_target_directory = path.join(self.context.topdir, "target")
# ':' is not an authorized character for a file name on Windows
# make sure the OS specific separator is used
# pyrefly: ignore # missing-attribute
target_file_path = file_to_download.replace(":", "-").split("/")
destination_file = os.path.join(nightly_target_directory, os.path.join(*target_file_path))
# Once extracted, the nightly folder name is the tar name without the extension
@ -437,6 +451,7 @@ class CommandBase(object):
print("The nightly file {} has already been downloaded.".format(destination_file))
else:
print("The nightly {} does not exist yet, downloading it.".format(destination_file))
# pyrefly: ignore # no-matching-overload
download_file(destination_file, NIGHTLY_REPOSITORY_URL + file_to_download, destination_file)
# Extract the downloaded nightly version
@ -447,10 +462,10 @@ class CommandBase(object):
return self.get_executable(destination_folder)
def msvc_package_dir(self, package):
def msvc_package_dir(self, package) -> str:
return servo.platform.windows.get_dependency_dir(package)
def build_env(self):
def build_env(self) -> dict[str, str]:
"""Return an extended environment dictionary."""
env = os.environ.copy()
@ -488,7 +503,7 @@ class CommandBase(object):
if not (self.config["build"]["ccache"] == ""):
env["CCACHE"] = self.config["build"]["ccache"]
env["CARGO_TARGET_DIR"] = servo.util.get_target_dir()
env["CARGO_TARGET_DIR"] = util.get_target_dir()
# Work around https://github.com/servo/servo/issues/24446
# Argument-less str.split normalizes leading, trailing, and double spaces
@ -694,7 +709,7 @@ class CommandBase(object):
return target_configuration_decorator
def configure_build_type(self, release: bool, dev: bool, prod: bool, profile: Optional[str]) -> BuildType:
def configure_build_type(self, release: bool, dev: bool, prod: bool, profile: str) -> BuildType:
option_count = release + dev + prod + (profile is not None)
if option_count > 1:
@ -726,7 +741,7 @@ class CommandBase(object):
else:
return BuildType.custom(profile)
def configure_build_target(self, kwargs: Dict[str, Any], suppress_log: bool = False):
def configure_build_target(self, kwargs: Dict[str, Any], suppress_log: bool = False) -> None:
if hasattr(self.context, "target"):
# This call is for a dispatched command and we've already configured
# the target, so just use it.
@ -763,12 +778,6 @@ class CommandBase(object):
if self.target.is_cross_build() and not suppress_log:
print(f"Targeting '{self.target.triple()}' for cross-compilation")
def is_android(self):
return isinstance(self.target, AndroidTarget)
def is_openharmony(self):
return isinstance(self.target, OpenHarmonyTarget)
def is_media_enabled(self, media_stack: Optional[str]):
"""Determine whether media is enabled based on the value of the build target
platform and the value of the '--media-stack' command-line argument.
@ -804,8 +813,8 @@ class CommandBase(object):
capture_output=False,
target_override: Optional[str] = None,
**_kwargs,
):
env = env or self.build_env()
) -> CompletedProcess[bytes] | int:
env = cast(dict[str, str], env or self.build_env())
# NB: On non-Linux platforms we cannot check whether GStreamer is installed until
# environment variables are set via `self.build_env()`.
@ -879,21 +888,21 @@ class CommandBase(object):
return call(["cargo", command] + args + cargo_args, env=env, verbose=verbose)
def android_adb_path(self, env):
def android_adb_path(self, env) -> LiteralString:
if "ANDROID_SDK_ROOT" in env:
sdk_adb = path.join(env["ANDROID_SDK_ROOT"], "platform-tools", "adb")
if path.exists(sdk_adb):
return sdk_adb
return "adb"
def android_emulator_path(self, env):
def android_emulator_path(self, env) -> LiteralString:
if "ANDROID_SDK_ROOT" in env:
sdk_adb = path.join(env["ANDROID_SDK_ROOT"], "emulator", "emulator")
if path.exists(sdk_adb):
return sdk_adb
return "emulator"
def ensure_bootstrapped(self):
def ensure_bootstrapped(self) -> None:
if self.context.bootstrapped:
return
@ -904,35 +913,13 @@ class CommandBase(object):
if not self.target.is_cross_build():
return
installed_targets = check_output(["rustup", "target", "list", "--installed"], cwd=self.context.topdir).decode()
installed_targets = check_output(["rustup", "target", "list", "--installed"], cwd=self.context.topdir)
if isinstance(installed_targets, bytes):
installed_targets = installed_targets.decode("utf-8")
if self.target.triple() not in installed_targets:
check_call(["rustup", "target", "add", self.target.triple()], cwd=self.context.topdir)
def ensure_rustup_version(self):
try:
version_line = subprocess.check_output(
["rustup" + servo.platform.get().executable_suffix(), "--version"],
# Silence "info: This is the version for the rustup toolchain manager,
# not the rustc compiler."
stderr=open(os.devnull, "wb"),
)
except OSError as e:
if e.errno == NO_SUCH_FILE_OR_DIRECTORY:
print(
"It looks like rustup is not installed. See instructions at "
"https://github.com/servo/servo/#setting-up-your-environment"
)
print()
sys.exit(1)
raise
version = tuple(map(int, re.match(rb"rustup (\d+)\.(\d+)\.(\d+)", version_line).groups()))
version_needed = (1, 23, 0)
if version < version_needed:
print("rustup is at version %s.%s.%s, Servo requires %s.%s.%s or more recent." % (version + version_needed))
print("Try running 'rustup self update'.")
sys.exit(1)
def ensure_clobbered(self, target_dir=None):
def ensure_clobbered(self, target_dir=None) -> None:
if target_dir is None:
target_dir = util.get_target_dir()
auto = True if os.environ.get("AUTOCLOBBER", False) else False
@ -955,6 +942,6 @@ class CommandBase(object):
Registrar.dispatch("clean", context=self.context, verbose=True)
print("Successfully completed auto clobber.")
except subprocess.CalledProcessError as error:
sys.exit(error)
sys.exit(error.returncode)
else:
print("Clobber not needed.")

View file

@ -7,6 +7,7 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.
from subprocess import CompletedProcess
import json
from mach.decorators import (
@ -26,13 +27,14 @@ class MachCommands(CommandBase):
"params", default=None, nargs="...", help="Command-line arguments to be passed through to cargo check"
)
@CommandBase.common_command_arguments(build_configuration=True, build_type=False)
def check(self, params, **kwargs):
def check(self, params, **kwargs) -> int:
if not params:
params = []
self.ensure_bootstrapped()
self.ensure_clobbered()
status = self.run_cargo_build_like_command("check", params, **kwargs)
assert isinstance(status, int)
if status == 0:
print("Finished checking, binary NOT updated. Consider ./mach build before ./mach run")
@ -40,7 +42,7 @@ class MachCommands(CommandBase):
@Command("rustc", description="Run the Rust compiler", category="devenv")
@CommandArgument("params", default=None, nargs="...", help="Command-line arguments to be passed through to rustc")
def rustc(self, params):
def rustc(self, params) -> int:
if params is None:
params = []
@ -52,13 +54,15 @@ class MachCommands(CommandBase):
"params", default=None, nargs="...", help="Command-line arguments to be passed through to cargo-fix"
)
@CommandBase.common_command_arguments(build_configuration=True, build_type=False)
def cargo_fix(self, params, **kwargs):
def cargo_fix(self, params, **kwargs) -> int:
if not params:
params = []
self.ensure_bootstrapped()
self.ensure_clobbered()
return self.run_cargo_build_like_command("fix", params, **kwargs)
status = self.run_cargo_build_like_command("fix", params, **kwargs)
assert isinstance(status, int)
return status
@Command("clippy", description='Run "cargo clippy"', category="devenv")
@CommandArgument("params", default=None, nargs="...", help="Command-line arguments to be passed through to clippy")
@ -69,7 +73,7 @@ class MachCommands(CommandBase):
help="Emit the clippy warnings in the Github Actions annotations format",
)
@CommandBase.common_command_arguments(build_configuration=True, build_type=False)
def cargo_clippy(self, params, github_annotations=False, **kwargs):
def cargo_clippy(self, params, github_annotations=False, **kwargs) -> int:
if not params:
params = []
@ -85,6 +89,7 @@ class MachCommands(CommandBase):
github_annotation_manager = GitHubAnnotationManager("clippy")
results = self.run_cargo_build_like_command("clippy", params, env=env, capture_output=True, **kwargs)
assert isinstance(results, CompletedProcess)
if results.returncode == 0:
return 0
try:
@ -94,9 +99,11 @@ class MachCommands(CommandBase):
except json.JSONDecodeError:
pass
return results.returncode
return self.run_cargo_build_like_command("clippy", params, env=env, **kwargs)
status = self.run_cargo_build_like_command("clippy", params, env=env, **kwargs)
assert isinstance(status, int)
return status
@Command("fetch", description="Fetch Rust, Cargo and Cargo dependencies", category="devenv")
def fetch(self):
def fetch(self) -> int:
self.ensure_bootstrapped()
return call(["cargo", "fetch"], env=self.build_env())

View file

@ -153,22 +153,22 @@ of using `dumpbin` and the errors that appear when starting Servo.
"""
def windows_dlls():
def windows_dlls() -> list[str]:
return GSTREAMER_WIN_DEPENDENCY_LIBS + [f"{lib}-1.0-0.dll" for lib in GSTREAMER_BASE_LIBS]
def windows_plugins():
def windows_plugins() -> list[str]:
libs = [*GSTREAMER_PLUGIN_LIBS, *GSTREAMER_WIN_PLUGIN_LIBS]
return [f"{lib}.dll" for lib in libs]
def macos_plugins():
def macos_plugins() -> list[str]:
plugins = [*GSTREAMER_PLUGIN_LIBS, *GSTREAMER_MAC_PLUGIN_LIBS]
return [f"lib{plugin}.dylib" for plugin in plugins]
def write_plugin_list(target):
def write_plugin_list(target: str) -> None:
plugins = []
if "apple-" in target:
plugins = macos_plugins()
@ -191,7 +191,7 @@ def is_macos_system_library(library_path: str) -> bool:
return library_path.startswith("/System/Library") or library_path.startswith("/usr/lib") or ".asan." in library_path
def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str], relative_path: str):
def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str], relative_path: str) -> None:
"""Given a path to a binary (either an executable or a dylib), rewrite the
the given dependency lines to be found at the given relative path to
the executable in which they are used. In our case, this is typically servoshell."""
@ -207,7 +207,7 @@ def rewrite_dependencies_to_be_relative(binary: str, dependency_lines: Set[str],
print(f"{arguments} install_name_tool exited with return value {exception.returncode}")
def make_rpath_path_absolute(dylib_path_from_otool: str, rpath: str):
def make_rpath_path_absolute(dylib_path_from_otool: str, rpath: str) -> str:
"""Given a dylib dependency from otool, resolve the path into a full path if it
contains `@rpath`."""
if not dylib_path_from_otool.startswith("@rpath/"):
@ -228,6 +228,7 @@ def find_non_system_dependencies_with_otool(binary_path: str) -> Set[str]:
"""Given a binary path, find all dylib dependency lines that do not refer to
system libraries."""
process = subprocess.Popen(["/usr/bin/otool", "-L", binary_path], stdout=subprocess.PIPE)
assert process.stdout is not None
output = set()
for line in map(lambda line: line.decode("utf8"), process.stdout):
@ -242,15 +243,17 @@ def find_non_system_dependencies_with_otool(binary_path: str) -> Set[str]:
return output
def package_gstreamer_dylibs(binary_path: str, library_target_directory: str, target: BuildTarget):
def package_gstreamer_dylibs(binary_path: str, library_target_directory: str, target: BuildTarget) -> bool:
"""Copy all GStreamer dependencies to the "lib" subdirectory of a built version of
Servo. Also update any transitive shared library paths so that they are relative to
this subdirectory."""
# This import only works when called from `mach`.
import servo.platform
import servo.platform.macos
gstreamer_root = servo.platform.get().gstreamer_root(target)
assert gstreamer_root is not None
gstreamer_version = servo.platform.macos.GSTREAMER_PLUGIN_VERSION
gstreamer_root_libs = os.path.join(gstreamer_root, "lib")

View file

@ -25,9 +25,10 @@ def get_folders_list(path):
folder_name = join(path, filename)
folder_list.append(folder_name)
return folder_list
return folder_list
def mutation_test_for(mutation_path):
def mutation_test_for(mutation_path: str) -> None:
test_mapping_file = join(mutation_path, "test_mapping.json")
if isfile(test_mapping_file):
json_data = open(test_mapping_file).read()

View file

@ -9,10 +9,12 @@
import fileinput
import re
from re import Match
import random
from typing import Iterator
def is_comment(line):
def is_comment(line: str) -> Match[str] | None:
return re.search(r"\/\/.*", line)
@ -25,14 +27,14 @@ def init_variables(if_blocks):
return random_index, start_counter, end_counter, lines_to_delete, line_to_mutate
def deleteStatements(file_name, line_numbers):
def deleteStatements(file_name, line_numbers) -> None:
for line in fileinput.input(file_name, inplace=True):
if fileinput.lineno() not in line_numbers:
print(line.rstrip())
class Strategy:
def __init__(self):
def __init__(self) -> None:
self._strategy_name = ""
self._replace_strategy = {}
@ -53,28 +55,28 @@ class Strategy:
class AndOr(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
logical_and = r"(?<=\s)&&(?=\s)"
self._replace_strategy = {"regex": logical_and, "replaceString": "||"}
class IfTrue(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
if_condition = r"(?<=if\s)\s*(?!let\s)(.*)(?=\s\{)"
self._replace_strategy = {"regex": if_condition, "replaceString": "true"}
class IfFalse(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
if_condition = r"(?<=if\s)\s*(?!let\s)(.*)(?=\s\{)"
self._replace_strategy = {"regex": if_condition, "replaceString": "false"}
class ModifyComparision(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
less_than_equals = r"(?<=\s)(\<)\=(?=\s)"
greater_than_equals = r"(?<=\s)(\<)\=(?=\s)"
@ -82,7 +84,7 @@ class ModifyComparision(Strategy):
class MinusToPlus(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
arithmetic_minus = r"(?<=\s)\-(?=\s.+)"
minus_in_shorthand = r"(?<=\s)\-(?=\=)"
@ -90,7 +92,7 @@ class MinusToPlus(Strategy):
class PlusToMinus(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
arithmetic_plus = r"(?<=[^\"]\s)\+(?=\s[^A-Z\'?\":\{]+)"
plus_in_shorthand = r"(?<=\s)\+(?=\=)"
@ -98,14 +100,14 @@ class PlusToMinus(Strategy):
class AtomicString(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
string_literal = r"(?<=\").+(?=\")"
self._replace_strategy = {"regex": string_literal, "replaceString": " "}
class DuplicateLine(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
self._strategy_name = "duplicate"
append_statement = r".+?append\(.+?\).*?;"
@ -133,7 +135,7 @@ class DuplicateLine(Strategy):
class DeleteIfBlock(Strategy):
def __init__(self):
def __init__(self) -> None:
Strategy.__init__(self)
self.if_block = r"^\s+if\s(.+)\s\{"
self.else_block = r"\selse(.+)\{"
@ -175,7 +177,7 @@ class DeleteIfBlock(Strategy):
line_to_mutate += 1
def get_strategies():
def get_strategies() -> Iterator[Strategy]:
return (
AndOr,
IfTrue,
@ -190,7 +192,7 @@ def get_strategies():
class Mutator:
def __init__(self, strategy):
def __init__(self, strategy) -> None:
self._strategy = strategy
def mutate(self, file_name):

View file

@ -42,6 +42,7 @@ from servo.command_base import (
from servo.util import delete, get_target_dir
from python.servo.platform.build_target import SanitizerKind
from servo.platform.build_target import is_android, is_openharmony
PACKAGES = {
"android": [
@ -74,11 +75,11 @@ def packages_for_platform(platform):
yield path.join(target_dir, package)
def listfiles(directory):
def listfiles(directory) -> list[str]:
return [f for f in os.listdir(directory) if path.isfile(path.join(directory, f))]
def copy_windows_dependencies(binary_path, destination):
def copy_windows_dependencies(binary_path: str, destination: str) -> None:
for f in os.listdir(binary_path):
if os.path.isfile(path.join(binary_path, f)) and f.endswith(".dll"):
shutil.copy(path.join(binary_path, f), destination)
@ -110,12 +111,12 @@ class PackageCommands(CommandBase):
@CommandArgument("--target", "-t", default=None, help="Package for given target platform")
@CommandBase.common_command_arguments(build_configuration=False, build_type=True, package_configuration=True)
@CommandBase.allow_target_configuration
def package(self, build_type: BuildType, flavor=None, sanitizer: SanitizerKind = SanitizerKind.NONE):
def package(self, build_type: BuildType, flavor=None, sanitizer: SanitizerKind = SanitizerKind.NONE) -> int | None:
env = self.build_env()
binary_path = self.get_binary_path(build_type, sanitizer=sanitizer)
dir_to_root = self.get_top_dir()
target_dir = path.dirname(binary_path)
if self.is_android():
if is_android(self.target):
target_triple = self.target.triple()
if "aarch64" in target_triple:
arch_string = "Arm64"
@ -155,7 +156,7 @@ class PackageCommands(CommandBase):
except subprocess.CalledProcessError as e:
print("Packaging Android exited with return value %d" % e.returncode)
return e.returncode
elif self.is_openharmony():
elif is_openharmony(self.target):
# hvigor doesn't support an option to place output files in a specific directory
# so copy the source files into the target/openharmony directory first.
ohos_app_dir = path.join(self.get_top_dir(), "support", "openharmony")
@ -197,7 +198,7 @@ class PackageCommands(CommandBase):
try:
with cd(ohos_target_dir):
version = check_output(["hvigorw", "--version", "--no-daemon"])
print(f"Found `hvigorw` with version {str(version, 'utf-8').strip()} in system PATH")
print(f"Found `hvigorw` with version {version.strip()} in system PATH")
hvigor_command[0:0] = ["hvigorw"]
except FileNotFoundError:
print(
@ -216,7 +217,6 @@ class PackageCommands(CommandBase):
env["NODE_PATH"] = env["HVIGOR_PATH"] + "/node_modules"
hvigor_script = f"{env['HVIGOR_PATH']}/node_modules/@ohos/hvigor/bin/hvigor.js"
hvigor_command[0:0] = ["node", hvigor_script]
abi_string = self.target.abi_string()
ohos_libs_dir = path.join(ohos_target_dir, "entry", "libs", abi_string)
os.makedirs(ohos_libs_dir)
@ -402,7 +402,7 @@ class PackageCommands(CommandBase):
usb=False,
sanitizer: SanitizerKind = SanitizerKind.NONE,
flavor=None,
):
) -> int:
env = self.build_env()
try:
binary_path = self.get_binary_path(build_type, sanitizer=sanitizer)
@ -417,7 +417,7 @@ class PackageCommands(CommandBase):
print("Rebuilding Servo did not solve the missing build problem.")
return 1
if self.is_android():
if is_android(self.target):
pkg_path = self.target.get_package_path(build_type.directory_name())
exec_command = [self.android_adb_path(env)]
if emulator and usb:
@ -428,7 +428,7 @@ class PackageCommands(CommandBase):
if usb:
exec_command += ["-d"]
exec_command += ["install", "-r", pkg_path]
elif self.is_openharmony():
elif is_openharmony(self.target):
pkg_path = self.target.get_package_path(build_type.directory_name(), flavor=flavor)
hdc_path = path.join(env["OHOS_SDK_NATIVE"], "../", "toolchains", "hdc")
exec_command = [hdc_path, "install", "-r", pkg_path]
@ -453,7 +453,7 @@ class PackageCommands(CommandBase):
@CommandArgument(
"--github-release-id", default=None, type=int, help="The github release to upload the nightly builds."
)
def upload_nightly(self, platform, secret_from_environment, github_release_id):
def upload_nightly(self, platform, secret_from_environment, github_release_id) -> int:
import boto3
def get_s3_secret():
@ -465,13 +465,13 @@ class PackageCommands(CommandBase):
aws_secret_access_key = secret["aws_secret_access_key"]
return (aws_access_key, aws_secret_access_key)
def nightly_filename(package, timestamp):
def nightly_filename(package, timestamp) -> str:
return "{}-{}".format(
timestamp.isoformat() + "Z", # The `Z` denotes UTC
path.basename(package),
)
def upload_to_github_release(platform, package, package_hash):
def upload_to_github_release(platform, package: str, package_hash: str) -> None:
if not github_release_id:
return
@ -483,11 +483,12 @@ class PackageCommands(CommandBase):
asset_name = f"servo-latest.{extension}"
release.upload_asset(package, name=asset_name)
# pyrefly: ignore[missing-attribute]
release.upload_asset_from_memory(
package_hash_fileobj, package_hash_fileobj.getbuffer().nbytes, name=f"{asset_name}.sha256"
)
def upload_to_s3(platform, package, package_hash, timestamp):
def upload_to_s3(platform, package: str, package_hash: str, timestamp: datetime) -> None:
(aws_access_key, aws_secret_access_key) = get_s3_secret()
s3 = boto3.client("s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_access_key)

View file

@ -14,7 +14,7 @@ from .windows import Windows
__platform__ = None
def host_platform():
def host_platform() -> str:
os_type = platform.system().lower()
if os_type == "linux":
os_type = "unknown-linux-gnu"
@ -31,7 +31,7 @@ def host_platform():
return os_type
def host_triple():
def host_triple() -> str:
os_type = host_platform()
cpu_type = platform.machine().lower()
if cpu_type in ["i386", "i486", "i686", "i768", "x86"]:

View file

@ -16,7 +16,7 @@ from .build_target import BuildTarget
class Base:
def __init__(self, triple: str):
def __init__(self, triple: str) -> None:
self.environ = os.environ.copy()
self.triple = triple
self.is_windows = False
@ -29,10 +29,10 @@ class Base:
def executable_suffix(self) -> str:
return ""
def _platform_bootstrap(self, _force: bool) -> bool:
def _platform_bootstrap(self, force: bool) -> bool:
raise NotImplementedError("Bootstrap installation detection not yet available.")
def _platform_bootstrap_gstreamer(self, _target: BuildTarget, _force: bool) -> bool:
def _platform_bootstrap_gstreamer(self, target: BuildTarget, force: bool) -> bool:
raise NotImplementedError("GStreamer bootstrap support is not yet available for your OS.")
def is_gstreamer_installed(self, target: BuildTarget) -> bool:
@ -54,7 +54,7 @@ class Base:
except FileNotFoundError:
return False
def bootstrap(self, force: bool, skip_platform: bool, skip_lints: bool):
def bootstrap(self, force: bool, skip_platform: bool, skip_lints: bool) -> None:
installed_something = False
if not skip_platform:
installed_something |= self._platform_bootstrap(force)
@ -67,7 +67,7 @@ class Base:
if not installed_something:
print("Dependencies were already installed!")
def install_rust_toolchain(self):
def install_rust_toolchain(self) -> None:
# rustup 1.28.0, and rustup 1.28.1+ with RUSTUP_AUTO_INSTALL=0, require us to explicitly
# install the Rust toolchain before trying to use it.
print(" * Installing Rust toolchain...")
@ -86,7 +86,7 @@ class Base:
return True
def install_cargo_deny(self, force: bool) -> bool:
def cargo_deny_installed():
def cargo_deny_installed() -> bool:
if force or not shutil.which("cargo-deny"):
return False
# Tidy needs at least version 0.18.1 installed.
@ -115,7 +115,7 @@ class Base:
as fast as possible."""
return False
def bootstrap_gstreamer(self, force: bool):
def bootstrap_gstreamer(self, force: bool) -> None:
target = BuildTarget.from_triple(self.triple)
if not self._platform_bootstrap_gstreamer(target, force):
root = self.gstreamer_root(target)

View file

@ -7,6 +7,7 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.
from typing import TypeGuard
import errno
import json
import os
@ -48,7 +49,7 @@ class SanitizerKind(Enum):
class BuildTarget(object):
def __init__(self, target_triple: str):
def __init__(self, target_triple: str) -> None:
self.target_triple = target_triple
@staticmethod
@ -69,7 +70,7 @@ class BuildTarget(object):
def binary_name(self) -> str:
return f"servo{servo.platform.get().executable_suffix()}"
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path):
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None:
pass
def is_cross_build(self) -> bool:
@ -124,7 +125,7 @@ class AndroidTarget(CrossBuildTarget):
return config
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path):
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None:
# Paths to Android build tools:
if config["android"]["sdk"]:
env["ANDROID_SDK_ROOT"] = config["android"]["sdk"]
@ -201,7 +202,7 @@ class AndroidTarget(CrossBuildTarget):
llvm_toolchain = path.join(llvm_prebuilt, host)
env["PATH"] = env["PATH"] + ":" + path.join(llvm_toolchain, "bin")
def to_ndk_bin(prog):
def to_ndk_bin(prog: str) -> str:
return path.join(llvm_toolchain, "bin", prog)
# This workaround is due to an issue in the x86_64 Android NDK that introduces
@ -217,6 +218,9 @@ class AndroidTarget(CrossBuildTarget):
env["RUSTFLAGS"] = env.get("RUSTFLAGS", "")
env["RUSTFLAGS"] += f"-C link-arg={libclangrt_filename}"
assert host_cc
assert host_cxx
env["RUST_TARGET"] = self.triple()
env["HOST_CC"] = host_cc
env["HOST_CXX"] = host_cxx
@ -289,7 +293,7 @@ class AndroidTarget(CrossBuildTarget):
class OpenHarmonyTarget(CrossBuildTarget):
DEFAULT_TRIPLE = "aarch64-unknown-linux-ohos"
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path):
def configure_build_environment(self, env: Dict[str, str], config: Dict[str, Any], topdir: pathlib.Path) -> None:
# Paths to OpenHarmony SDK and build tools:
# Note: `OHOS_SDK_NATIVE` is the CMake variable name the `hvigor` build-system
# uses for the native directory of the SDK, so we use the same name to be consistent.
@ -343,7 +347,7 @@ class OpenHarmonyTarget(CrossBuildTarget):
# on windows, depending on how the wrapper is called.
# Instead, we ensure that all the necessary flags for the c-compiler are set
# via environment variables such as `TARGET_CFLAGS`.
def to_sdk_llvm_bin(prog: str):
def to_sdk_llvm_bin(prog: str) -> str:
if sys.platform == "win32":
prog = prog + ".exe"
llvm_prog = llvm_bin.joinpath(prog)
@ -498,3 +502,11 @@ class OpenHarmonyTarget(CrossBuildTarget):
def abi_string(self) -> str:
abi_map = {"aarch64-unknown-linux-ohos": "arm64-v8a", "x86_64-unknown-linux-ohos": "x86_64"}
return abi_map[self.triple()]
def is_android(target: BuildTarget) -> TypeGuard[AndroidTarget]:
return isinstance(target, AndroidTarget)
def is_openharmony(target: BuildTarget) -> TypeGuard[OpenHarmonyTarget]:
return isinstance(target, OpenHarmonyTarget)

View file

@ -164,7 +164,7 @@ GSTREAMER_URL = (
class Linux(Base):
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.is_linux = True
(self.distro, self.version) = Linux.get_distro_and_version()
@ -265,16 +265,16 @@ class Linux(Base):
install = True
elif self.distro in ["CentOS", "CentOS Linux", "Fedora", "Fedora Linux", "Fedora Linux Asahi Remix"]:
command = ["dnf", "install"]
installed_pkgs: [str] = subprocess.check_output(
installed_pkgs: list[str] = subprocess.check_output(
["rpm", "--query", "--all", "--queryformat", "%{NAME}\n"], encoding="utf-8"
).split("\n")
).splitlines()
pkgs = DNF_PKGS
for pkg in pkgs:
if pkg not in installed_pkgs:
install = True
break
elif self.distro == "void":
installed_pkgs = str(subprocess.check_output(["xbps-query", "-l"]))
installed_pkgs = subprocess.check_output(["xbps-query", "-l"], text=True).splitlines()
pkgs = XBPS_PKGS
for pkg in pkgs:
command = ["xbps-install", "-A"]
@ -285,13 +285,13 @@ class Linux(Base):
if not install:
return False
def check_sudo():
def check_sudo() -> bool:
if os.geteuid() != 0:
if shutil.which("sudo") is None:
return False
return True
def run_as_root(command, force=False):
def run_as_root(command: list[str], force: bool = False) -> int:
if os.geteuid() != 0:
command.insert(0, "sudo")
if force:
@ -312,10 +312,10 @@ class Linux(Base):
raise EnvironmentError("Installation of dependencies failed.")
return True
def gstreamer_root(self, _target: BuildTarget) -> Optional[str]:
def gstreamer_root(self, target: BuildTarget) -> Optional[str]:
return None
def _platform_bootstrap_gstreamer(self, _target: BuildTarget, _force: bool) -> bool:
def _platform_bootstrap_gstreamer(self, target: BuildTarget, force: bool) -> bool:
raise EnvironmentError(
"Bootstrapping GStreamer on Linux is not supported. "
+ "Please install it using your distribution package manager."

View file

@ -24,7 +24,7 @@ GSTREAMER_ROOT = "/Library/Frameworks/GStreamer.framework/Versions/1.0"
class MacOS(Base):
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.is_macos = True
@ -38,7 +38,7 @@ class MacOS(Base):
# Servo only supports the official GStreamer distribution on MacOS.
return not target.is_cross_build() and os.path.exists(GSTREAMER_ROOT)
def _platform_bootstrap(self, _force: bool) -> bool:
def _platform_bootstrap(self, force: bool) -> bool:
installed_something = False
try:
brewfile = os.path.join(util.SERVO_ROOT, "support", "macos", "Brewfile")

View file

@ -11,7 +11,7 @@ import os
import subprocess
import tempfile
from typing import Optional
import urllib
import urllib.parse
import zipfile
import shutil
@ -32,12 +32,12 @@ DEPENDENCIES_DIR = os.path.join(util.get_target_dir(), "dependencies")
WINGET_DEPENDENCIES = ["Kitware.CMake", "LLVM.LLVM", "Ninja-build.Ninja", "WiXToolset.WiXToolset"]
def get_dependency_dir(package):
def get_dependency_dir(package: str) -> str:
"""Get the directory that a given Windows dependency should extract to."""
return os.path.join(DEPENDENCIES_DIR, package, DEPENDENCIES[package])
def _winget_import(force: bool = False):
def _winget_import(force: bool = False) -> None:
try:
# We install tools like LLVM / CMake, so we probably don't want to force-upgrade
# a user installed version without good reason.
@ -56,7 +56,7 @@ def _winget_import(force: bool = False):
raise e
def _choco_install(force: bool = False):
def _choco_install(force: bool = False) -> None:
try:
choco_config = os.path.join(util.SERVO_ROOT, "support", "windows", "chocolatey.config")
@ -75,15 +75,15 @@ def _choco_install(force: bool = False):
class Windows(Base):
def __init__(self, triple: str):
def __init__(self, triple: str) -> None:
super().__init__(triple)
self.is_windows = True
def executable_suffix(self):
def executable_suffix(self) -> str:
return ".exe"
@classmethod
def download_and_extract_dependency(cls, zip_path: str, full_spec: str):
def download_and_extract_dependency(cls, zip_path: str, full_spec: str) -> None:
if not os.path.isfile(zip_path):
zip_url = f"{DEPS_URL}/{urllib.parse.quote(full_spec)}.zip"
util.download_file(full_spec, zip_url, zip_path)

View file

@ -11,6 +11,7 @@ import json
import os
import os.path as path
import subprocess
from subprocess import CompletedProcess
from shutil import copy2
from typing import List
@ -30,12 +31,12 @@ from servo.command_base import (
check_call,
is_linux,
)
from servo.platform.build_target import is_android
ANDROID_APP_NAME = "org.servo.servoshell"
def read_file(filename, if_exists=False):
def read_file(filename, if_exists=False) -> str | None:
if if_exists and not path.exists(filename):
return None
with open(filename) as f:
@ -43,7 +44,7 @@ def read_file(filename, if_exists=False):
# Copied from Python 3.3+'s shlex.quote()
def shell_quote(arg):
def shell_quote(arg: str):
# use single quotes, and put single quotes into double quotes
# the string $'b is then quoted as '$'"'"'b'
return "'" + arg.replace("'", "'\"'\"'") + "'"
@ -81,7 +82,7 @@ class PostBuildCommands(CommandBase):
software=False,
emulator=False,
usb=False,
):
) -> int | None:
return self._run(servo_binary, params, debugger, debugger_cmd, headless, software, emulator, usb)
def _run(
@ -94,7 +95,7 @@ class PostBuildCommands(CommandBase):
software=False,
emulator=False,
usb=False,
):
) -> int | None:
env = self.build_env()
env["RUST_BACKTRACE"] = "1"
if software:
@ -109,7 +110,7 @@ class PostBuildCommands(CommandBase):
if debugger_cmd:
debugger = True
if self.is_android():
if is_android(self.target):
if debugger:
print("Android on-device debugging is not supported by mach yet. See")
print("https://github.com/servo/servo/wiki/Building-for-Android#debugging-on-device")
@ -141,7 +142,7 @@ class PostBuildCommands(CommandBase):
if usb:
args += ["-d"]
shell = subprocess.Popen(args + ["shell"], stdin=subprocess.PIPE)
shell.communicate(bytes("\n".join(script) + "\n", "utf8"))
shell.communicate("\n".join(script) + "\n")
return shell.wait()
args = [servo_binary]
@ -193,8 +194,9 @@ class PostBuildCommands(CommandBase):
@Command("android-emulator", description="Run the Android emulator", category="post-build")
@CommandArgument("args", nargs="...", help="Command-line arguments to be passed through to the emulator")
def android_emulator(self, args=None):
def android_emulator(self, args=None) -> int:
if not args:
args = []
print("AVDs created by `./mach bootstrap-android` are servo-arm and servo-x86.")
emulator = self.android_emulator_path(self.build_env())
return subprocess.call([emulator] + args)
@ -202,7 +204,7 @@ class PostBuildCommands(CommandBase):
@Command("rr-record", description="Run Servo whilst recording execution with rr", category="post-build")
@CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Servo")
@CommandBase.common_command_arguments(binary_selection=True)
def rr_record(self, servo_binary: str, params=[]):
def rr_record(self, servo_binary: str, params=[]) -> None:
env = self.build_env()
env["RUST_BACKTRACE"] = "1"
@ -221,7 +223,7 @@ class PostBuildCommands(CommandBase):
description="Replay the most recent execution of Servo that was recorded with rr",
category="post-build",
)
def rr_replay(self):
def rr_replay(self) -> None:
try:
check_call(["rr", "--fatal-errors", "replay"])
except OSError as e:
@ -233,7 +235,7 @@ class PostBuildCommands(CommandBase):
@Command("doc", description="Generate documentation", category="post-build")
@CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to cargo doc")
@CommandBase.common_command_arguments(build_configuration=True, build_type=False)
def doc(self, params: List[str], **kwargs):
def doc(self, params: List[str], **kwargs) -> CompletedProcess[bytes] | int | None:
self.ensure_bootstrapped()
docs = path.join(servo.util.get_target_dir(), "doc")

View file

@ -8,6 +8,7 @@
# except according to those terms.
import argparse
from argparse import ArgumentParser
import json
import logging
import os
@ -19,6 +20,7 @@ import subprocess
import sys
import textwrap
from time import sleep
from typing import Any
import tidy
import wpt
@ -116,7 +118,7 @@ class MachCommands(CommandBase):
DEFAULT_RENDER_MODE = "cpu"
HELP_RENDER_MODE = "Value can be 'cpu', 'gpu' or 'both' (default " + DEFAULT_RENDER_MODE + ")"
def __init__(self, context):
def __init__(self, context) -> None:
CommandBase.__init__(self, context)
if not hasattr(self.context, "built_tests"):
self.context.built_tests = False
@ -125,7 +127,7 @@ class MachCommands(CommandBase):
@CommandArgument("--base", default=None, help="the base URL for testcases")
@CommandArgument("--date", default=None, help="the datestamp for the data")
@CommandArgument("--submit", "-a", default=False, action="store_true", help="submit the data to perfherder")
def test_perf(self, base=None, date=None, submit=False):
def test_perf(self, base=None, date=None, submit=False) -> int:
env = self.build_env()
cmd = ["bash", "test_perf.sh"]
if base:
@ -144,7 +146,9 @@ class MachCommands(CommandBase):
"--nocapture", default=False, action="store_true", help="Run tests with nocapture ( show test stdout )"
)
@CommandBase.common_command_arguments(build_configuration=True, build_type=True)
def test_unit(self, build_type: BuildType, test_name=None, package=None, bench=False, nocapture=False, **kwargs):
def test_unit(
self, build_type: BuildType, test_name=None, package=None, bench=False, nocapture=False, **kwargs
) -> int:
if test_name is None:
test_name = []
@ -215,7 +219,7 @@ class MachCommands(CommandBase):
return 0
# Gather Cargo build timings (https://doc.rust-lang.org/cargo/reference/timings.html).
args = ["--timings"]
args: list[str] = ["--timings"]
if build_type.is_release():
args += ["--release"]
@ -237,10 +241,12 @@ class MachCommands(CommandBase):
result = call(["cargo", "bench" if bench else "test"], cwd="support/crown")
if result != 0:
return result
return self.run_cargo_build_like_command("bench" if bench else "test", args, env=env, **kwargs)
result = self.run_cargo_build_like_command("bench" if bench else "test", args, env=env, **kwargs)
assert isinstance(result, int)
return result
@Command("test-content", description="Run the content tests", category="testing")
def test_content(self):
def test_content(self) -> int:
print("Content tests have been replaced by web-platform-tests under tests/wpt/mozilla/.")
return 0
@ -259,7 +265,7 @@ class MachCommands(CommandBase):
action="store_true",
help="Emit tidy warnings in the Github Actions annotations format",
)
def test_tidy(self, all_files, no_progress, github_annotations):
def test_tidy(self, all_files, no_progress, github_annotations) -> int:
tidy_failed = tidy.scan(not all_files, not no_progress, github_annotations)
print("\r ➤ Checking formatting of Rust files...")
@ -293,7 +299,7 @@ class MachCommands(CommandBase):
@CommandArgument(
"tests", default=None, nargs="...", help="Specific WebIDL tests to run, relative to the tests directory"
)
def test_scripts(self, verbose, very_verbose, all, tests):
def test_scripts(self, verbose, very_verbose, all, tests) -> int:
if very_verbose:
logging.getLogger().level = logging.DEBUG
elif verbose:
@ -342,7 +348,7 @@ class MachCommands(CommandBase):
# For the `import WebIDL` in runtests.py
sys.path.insert(0, test_file_dir)
run_file = path.abspath(path.join(test_file_dir, "runtests.py"))
run_globals = {"__file__": run_file}
run_globals: dict[str, Any] = {"__file__": run_file}
exec(compile(open(run_file).read(), run_file, "exec"), run_globals)
passed = run_globals["run_tests"](tests, verbose or very_verbose) and passed
@ -350,7 +356,7 @@ class MachCommands(CommandBase):
@Command("test-devtools", description="Run tests for devtools.", category="testing")
@CommandBase.common_command_arguments(build_type=True)
def test_devtools(self, build_type: BuildType, **kwargs):
def test_devtools(self, build_type: BuildType, **kwargs) -> int:
print("Running devtools tests...")
passed = servo.devtools_tests.run_tests(SCRIPT_PATH, build_type)
return 0 if passed else 1
@ -362,7 +368,7 @@ class MachCommands(CommandBase):
parser=wpt.create_parser,
)
@CommandBase.common_command_arguments(build_configuration=False, build_type=True)
def test_wpt_failure(self, build_type: BuildType, **kwargs):
def test_wpt_failure(self, build_type: BuildType, **kwargs) -> bool:
kwargs["pause_after_test"] = False
kwargs["include"] = ["infrastructure/failing-test.html"]
return not self._test_wpt(build_type=build_type, **kwargs)
@ -375,7 +381,7 @@ class MachCommands(CommandBase):
return self._test_wpt(servo_binary, **kwargs)
@CommandBase.allow_target_configuration
def _test_wpt(self, servo_binary: str, **kwargs):
def _test_wpt(self, servo_binary: str, **kwargs) -> int:
# TODO(mrobinson): Why do we pass the wrong binary path in when running WPT on Android?
return_value = wpt.run.run_tests(servo_binary, **kwargs)
return return_value if not kwargs["always_succeed"] else 0
@ -386,11 +392,11 @@ class MachCommands(CommandBase):
category="testing",
parser=wpt.manifestupdate.create_parser,
)
def update_manifest(self, **kwargs):
def update_manifest(self, **kwargs) -> int:
return wpt.manifestupdate.update(check_clean=False)
@Command("fmt", description="Format Rust, Python, and TOML files", category="testing")
def format_code(self):
def format_code(self) -> int:
result = format_python_files_with_ruff(check_only=False)
if result != 0:
return result
@ -404,7 +410,7 @@ class MachCommands(CommandBase):
@Command(
"update-wpt", description="Update the web platform tests", category="testing", parser=wpt.update.create_parser
)
def update_wpt(self, **kwargs):
def update_wpt(self, **kwargs) -> int:
patch = kwargs.get("patch", False)
if not patch and kwargs["sync"]:
print("Are you sure you don't want a patch?")
@ -413,32 +419,33 @@ class MachCommands(CommandBase):
@Command("test-jquery", description="Run the jQuery test suite", category="testing")
@CommandBase.common_command_arguments(binary_selection=True)
def test_jquery(self, servo_binary: str):
def test_jquery(self, servo_binary: str) -> int:
return self.jquery_test_runner("test", servo_binary)
@Command("test-dromaeo", description="Run the Dromaeo test suite", category="testing")
@CommandArgument("tests", default=["recommended"], nargs="...", help="Specific tests to run")
@CommandArgument("--bmf-output", default=None, help="Specify BMF JSON output file")
@CommandBase.common_command_arguments(binary_selection=True)
def test_dromaeo(self, tests, servo_binary: str, bmf_output: str | None = None):
def test_dromaeo(self, tests, servo_binary: str, bmf_output: str | None = None) -> None:
return self.dromaeo_test_runner(tests, servo_binary, bmf_output)
@Command("test-speedometer", description="Run servo's speedometer", category="testing")
@CommandArgument("--bmf-output", default=None, help="Specify BMF JSON output file")
@CommandBase.common_command_arguments(binary_selection=True)
def test_speedometer(self, servo_binary: str, bmf_output: str | None = None):
def test_speedometer(self, servo_binary: str, bmf_output: str | None = None) -> None:
return self.speedometer_runner(servo_binary, bmf_output)
@Command("test-speedometer-ohos", description="Run servo's speedometer on a ohos device", category="testing")
@CommandArgument("--bmf-output", default=None, help="Specifcy BMF JSON output file")
@CommandArgument("--profile", default=None, help="Specify a profile which will be prepended to the output")
# This needs to be a separate command because we do not need a binary locally
def test_speedometer_ohos(self, bmf_output: str | None = None, profile: str | None = None):
def test_speedometer_ohos(self, bmf_output: str | None = None, profile: str | None = None) -> None:
return self.speedometer_runner_ohos(bmf_output, profile)
@Command("update-jquery", description="Update the jQuery test suite expected results", category="testing")
@CommandBase.common_command_arguments(binary_selection=True)
def update_jquery(self, servo_binary: str):
def update_jquery(self, servo_binary: str) -> int:
return self.jquery_test_runner("update", servo_binary)
@Command(
@ -447,7 +454,7 @@ class MachCommands(CommandBase):
@CommandArgument(
"params", default=None, nargs="...", help=" filepaths of output files of two runs of dromaeo test "
)
def compare_dromaeo(self, params):
def compare_dromaeo(self, params) -> None:
prev_op_filename = params[0]
cur_op_filename = params[1]
result = {"Test": [], "Prev_Time": [], "Cur_Time": [], "Difference(%)": []}
@ -527,7 +534,7 @@ class MachCommands(CommandBase):
)
)
def jquery_test_runner(self, cmd, binary: str):
def jquery_test_runner(self, cmd: str, binary: str) -> int:
base_dir = path.abspath(path.join("tests", "jquery"))
jquery_dir = path.join(base_dir, "jquery")
run_file = path.join(base_dir, "run_jquery.py")
@ -544,7 +551,7 @@ class MachCommands(CommandBase):
return call([run_file, cmd, bin_path, base_dir])
def dromaeo_test_runner(self, tests, binary: str, bmf_output: str | None):
def dromaeo_test_runner(self, tests, binary: str, bmf_output: str | None) -> None:
base_dir = path.abspath(path.join("tests", "dromaeo"))
dromaeo_dir = path.join(base_dir, "dromaeo")
run_file = path.join(base_dir, "run_dromaeo.py")
@ -570,11 +577,11 @@ class MachCommands(CommandBase):
return check_call([run_file, "|".join(tests), bin_path, base_dir, bmf_output])
def speedometer_to_bmf(self, speedometer: str, bmf_output: str | None = None, profile: str | None = None):
def speedometer_to_bmf(self, speedometer: dict[str, Any], bmf_output: str, profile: str | None = None) -> None:
output = dict()
profile = "" if profile is None else profile + "/"
def parse_speedometer_result(result):
def parse_speedometer_result(result) -> None:
if result["unit"] == "ms":
output[profile + f"Speedometer/{result['name']}"] = {
"latency": { # speedometer has ms we need to convert to ns
@ -592,7 +599,7 @@ class MachCommands(CommandBase):
}
}
else:
raise "Unknown unit!"
raise Exception("Unknown unit!")
for child in result["children"]:
parse_speedometer_result(child)
@ -602,7 +609,7 @@ class MachCommands(CommandBase):
with open(bmf_output, "w", encoding="utf-8") as f:
json.dump(output, f, indent=4)
def speedometer_runner(self, binary: str, bmf_output: str | None):
def speedometer_runner(self, binary: str, bmf_output: str | None) -> None:
speedometer = json.loads(
subprocess.check_output(
[
@ -622,13 +629,16 @@ class MachCommands(CommandBase):
if bmf_output:
self.speedometer_to_bmf(speedometer, bmf_output)
def speedometer_runner_ohos(self, bmf_output: str | None, profile: str | None):
hdc_path: str = shutil.which("hdc")
def speedometer_runner_ohos(self, bmf_output: str | None, profile: str | None) -> None:
hdc_path = shutil.which("hdc")
log_path: str = "/data/app/el2/100/base/org.servo.servo/cache/servo.log"
if hdc_path is None:
hdc_path = path.join(os.getenv("OHOS_SDK_NATIVE"), "../", "toolchains", "hdc")
def read_log_file() -> str:
if hdc_path is None:
ohos_sdk_native = os.getenv("OHOS_SDK_NATIVE")
assert ohos_sdk_native
hdc_path = path.join(ohos_sdk_native, "../", "toolchains", "hdc")
def read_log_file(hdc_path: str) -> str:
subprocess.call([hdc_path, "file", "recv", log_path])
file = ""
try:
@ -670,12 +680,12 @@ class MachCommands(CommandBase):
whole_file: str = ""
for i in range(10):
sleep(30)
whole_file = read_log_file()
whole_file = read_log_file(hdc_path)
if "[INFO script::dom::console]" in whole_file:
# technically the file could not have been written completely yet
# on devices with slow flash, we might want to wait a bit more
sleep(2)
whole_file = read_log_file()
whole_file = read_log_file(hdc_path)
break
start_index: int = whole_file.index("[INFO script::dom::console]") + len("[INFO script::dom::console]") + 1
json_string = whole_file[start_index:]
@ -694,7 +704,7 @@ class MachCommands(CommandBase):
run_file = path.abspath(
path.join(PROJECT_TOPLEVEL_PATH, "components", "net", "tests", "cookie_http_state_utils.py")
)
run_globals = {"__file__": run_file}
run_globals: dict[str, Any] = {"__file__": run_file}
exec(compile(open(run_file).read(), run_file, "exec"), run_globals)
return run_globals["update_test_file"](cache_dir)
@ -711,7 +721,7 @@ class MachCommands(CommandBase):
if os.path.exists(dest_folder):
shutil.rmtree(dest_folder)
run_globals = {"__file__": run_file}
run_globals: dict[str, Any] = {"__file__": run_file}
exec(compile(open(run_file).read(), run_file, "exec"), run_globals)
return run_globals["update_conformance"](version, dest_folder, None, patches_dir)
@ -775,7 +785,7 @@ class MachCommands(CommandBase):
)
@CommandArgument("params", nargs="...", help="Command-line arguments to be passed through to Servo")
@CommandBase.common_command_arguments(binary_selection=True)
def smoketest(self, servo_binary: str, params, **kwargs):
def smoketest(self, servo_binary: str, params, **kwargs) -> int | None:
# We pass `-f` here so that any thread panic will cause Servo to exit,
# preventing a panic from hanging execution. This means that these kind
# of panics won't cause timeouts on CI.
@ -786,7 +796,7 @@ class MachCommands(CommandBase):
@CommandArgument(
"try_strings", default=["full"], nargs="...", help="A list of try strings specifying what kind of job to run."
)
def try_command(self, remote: str, try_strings: list[str]):
def try_command(self, remote: str, try_strings: list[str]) -> int:
if subprocess.check_output(["git", "diff", "--cached", "--name-only"]).strip():
print("Cannot run `try` with staged and uncommited changes. ")
print("Please either commit or stash them before running `try`.")
@ -836,7 +846,7 @@ class MachCommands(CommandBase):
return result
def create_parser_create():
def create_parser_create() -> ArgumentParser:
import argparse
p = argparse.ArgumentParser()

View file

@ -61,7 +61,7 @@ class JobConfig(object):
self.update_name()
return True
def update_name(self):
def update_name(self) -> None:
if self.workflow is Workflow.LINUX:
self.name = "Linux"
elif self.workflow is Workflow.MACOS:
@ -158,7 +158,7 @@ def handle_preset(s: str) -> Optional[JobConfig]:
return None
def handle_modifier(config: JobConfig, s: str) -> Optional[JobConfig]:
def handle_modifier(config: Optional[JobConfig], s: str) -> Optional[JobConfig]:
if config is None:
return None
s = s.lower()
@ -184,13 +184,13 @@ class Encoder(json.JSONEncoder):
class Config(object):
def __init__(self, s: Optional[str] = None):
def __init__(self, s: Optional[str] = None) -> None:
self.fail_fast: bool = False
self.matrix: list[JobConfig] = list()
if s is not None:
self.parse(s)
def parse(self, input: str):
def parse(self, input: str) -> None:
input = input.lower().strip()
if not input:
@ -224,7 +224,7 @@ class Config(object):
else:
self.add_or_merge_job_to_matrix(job)
def add_or_merge_job_to_matrix(self, job: JobConfig):
def add_or_merge_job_to_matrix(self, job: JobConfig) -> None:
for existing_job in self.matrix:
if existing_job.merge(job):
return
@ -234,7 +234,7 @@ class Config(object):
return json.dumps(self, cls=Encoder, **kwargs)
def main():
def main() -> None:
conf = Config(" ".join(sys.argv[1:]))
print(conf.to_json())
@ -244,7 +244,7 @@ if __name__ == "__main__":
class TestParser(unittest.TestCase):
def test_string(self):
def test_string(self) -> None:
self.assertDictEqual(
json.loads(Config("linux-unit-tests fail-fast").to_json()),
{
@ -266,7 +266,7 @@ class TestParser(unittest.TestCase):
},
)
def test_empty(self):
def test_empty(self) -> None:
self.assertDictEqual(
json.loads(Config("").to_json()),
{
@ -348,7 +348,7 @@ class TestParser(unittest.TestCase):
},
)
def test_job_merging(self):
def test_job_merging(self) -> None:
self.assertDictEqual(
json.loads(Config("linux-wpt").to_json()),
{
@ -379,6 +379,8 @@ class TestParser(unittest.TestCase):
a = handle_modifier(a, "linux-unit-tests")
b = handle_preset("linux-wpt")
b = handle_modifier(b, "linux-wpt")
assert a is not None
assert b is not None
self.assertTrue(a.merge(b), "Should merge jobs that have different unit test configurations.")
self.assertEqual(a, JobConfig("Linux (Unit Tests, WPT)", Workflow.LINUX, unit_tests=True, wpt=True))
@ -402,14 +404,14 @@ class TestParser(unittest.TestCase):
self.assertFalse(a.merge(b), "Should not merge jobs with different build arguments.")
self.assertEqual(a, JobConfig("Linux (Unit Tests)", Workflow.LINUX, unit_tests=True))
def test_full(self):
def test_full(self) -> None:
self.assertDictEqual(json.loads(Config("full").to_json()), json.loads(Config("").to_json()))
def test_wpt_alias(self):
def test_wpt_alias(self) -> None:
self.assertDictEqual(json.loads(Config("wpt").to_json()), json.loads(Config("linux-wpt").to_json()))
def run_tests():
def run_tests() -> bool:
verbosity = 1 if logging.getLogger().level >= logging.WARN else 2
suite = unittest.TestLoader().loadTestsFromTestCase(TestParser)
return unittest.TextTestRunner(verbosity=verbosity).run(suite).wasSuccessful()

View file

@ -14,9 +14,10 @@ import shutil
import stat
import sys
import time
import urllib
import urllib.error
import urllib.request
import zipfile
from zipfile import ZipInfo
from typing import Dict, List, Union
from io import BufferedIOBase, BytesIO
@ -26,20 +27,20 @@ SCRIPT_PATH = os.path.abspath(os.path.dirname(__file__))
SERVO_ROOT = os.path.abspath(os.path.join(SCRIPT_PATH, "..", ".."))
def remove_readonly(func, path, _):
def remove_readonly(func, path, _) -> None:
"Clear the readonly bit and reattempt the removal"
os.chmod(path, stat.S_IWRITE)
func(path)
def delete(path):
def delete(path) -> None:
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, onerror=remove_readonly)
else:
os.remove(path)
def download(description: str, url: str, writer: BufferedIOBase, start_byte: int = 0):
def download(description: str, url: str, writer: BufferedIOBase, start_byte: int = 0) -> None:
if start_byte:
print("Resuming download of {} ...".format(url))
else:
@ -101,13 +102,13 @@ def download(description: str, url: str, writer: BufferedIOBase, start_byte: int
raise
def download_bytes(description: str, url: str):
def download_bytes(description: str, url: str) -> bytes:
content_writer = BytesIO()
download(description, url, content_writer)
return content_writer.getvalue()
def download_file(description: str, url: str, destination_path: str):
def download_file(description: str, url: str, destination_path: str) -> None:
tmp_path = destination_path + ".part"
try:
start_byte = os.path.getsize(tmp_path)
@ -122,7 +123,7 @@ def download_file(description: str, url: str, destination_path: str):
# https://stackoverflow.com/questions/39296101/python-zipfile-removes-execute-permissions-from-binaries
# In particular, we want the executable bit for executable files.
class ZipFileWithUnixPermissions(zipfile.ZipFile):
def extract(self, member, path=None, pwd=None):
def extract(self, member, path=None, pwd=None) -> str:
if not isinstance(member, zipfile.ZipInfo):
member = self.getinfo(member)
@ -136,11 +137,12 @@ class ZipFileWithUnixPermissions(zipfile.ZipFile):
return extracted
# For Python 3.x
def _extract_member(self, member, targetpath, pwd):
if sys.version_info[0] >= 3:
def _extract_member(self, member: ZipInfo, targetpath, pwd) -> str:
if int(sys.version_info[0]) >= 3:
if not isinstance(member, zipfile.ZipInfo):
member = self.getinfo(member)
# pyrefly: ignore # missing-attribute
targetpath = super()._extract_member(member, targetpath, pwd)
attr = member.external_attr >> 16
@ -148,10 +150,11 @@ class ZipFileWithUnixPermissions(zipfile.ZipFile):
os.chmod(targetpath, attr)
return targetpath
else:
# pyrefly: ignore # missing-attribute
return super(ZipFileWithUnixPermissions, self)._extract_member(member, targetpath, pwd)
def extract(src, dst, movedir=None, remove=True):
def extract(src, dst, movedir=None, remove=True) -> None:
assert src.endswith(".zip")
ZipFileWithUnixPermissions(src).extractall(dst)
@ -166,7 +169,7 @@ def extract(src, dst, movedir=None, remove=True):
os.remove(src)
def check_hash(filename, expected, algorithm):
def check_hash(filename, expected, algorithm) -> None:
hasher = hashlib.new(algorithm)
with open(filename, "rb") as f:
while True:
@ -179,11 +182,11 @@ def check_hash(filename, expected, algorithm):
sys.exit(1)
def get_default_cache_dir(topdir):
def get_default_cache_dir(topdir) -> str:
return os.environ.get("SERVO_CACHE_DIR", os.path.join(topdir, ".servo"))
def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]):
def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]) -> None:
if isinstance(paths, list):
paths = os.pathsep.join(paths)
@ -195,7 +198,7 @@ def append_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[st
env[key] = new_value
def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]):
def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[str]]) -> None:
if isinstance(paths, list):
paths = os.pathsep.join(paths)
@ -206,5 +209,5 @@ def prepend_paths_to_env(env: Dict[str, str], key: str, paths: Union[str, List[s
env[key] = new_value
def get_target_dir():
def get_target_dir() -> str:
return os.environ.get("CARGO_TARGET_DIR", os.path.join(SERVO_ROOT, "target"))

View file

@ -30,11 +30,11 @@ class VisualStudioInstallation:
installation_path: str
vc_install_path: str
def __lt__(self, other):
def __lt__(self, other) -> bool:
return self.version_number < other.version_number
def find_vswhere():
def find_vswhere() -> str | None:
for path in [PROGRAM_FILES, PROGRAM_FILES_X86]:
if not path:
continue
@ -145,11 +145,11 @@ def find_msvc_redist_dirs(vs_platform: str) -> Generator[str, None, None]:
path1 = os.path.join(vs_platform, "Microsoft.{}.CRT".format(redist_version))
path2 = os.path.join("onecore", vs_platform, "Microsoft.{}.CRT".format(redist_version))
for path in [path1, path2]:
path = os.path.join(redist_path, path)
if os.path.isdir(path):
yield path
full_path = os.path.join(redist_path, path)
if os.path.isdir(full_path):
yield full_path
else:
tried.append(path)
tried.append(full_path)
print("Couldn't locate MSVC redistributable directory. Tried:", file=sys.stderr)
for path in tried:
@ -169,7 +169,10 @@ def find_windows_sdk_installation_path() -> str:
# https://stackoverflow.com/questions/35119223/how-to-programmatically-detect-and-locate-the-windows-10-sdk
key_path = r"SOFTWARE\Wow6432Node\Microsoft\Microsoft SDKs\Windows\v10.0"
try:
with winreg.OpenKeyEx(winreg.HKEY_LOCAL_MACHINE, key_path) as key:
return str(winreg.QueryValueEx(key, "InstallationFolder")[0])
if sys.platform == "win32":
with winreg.OpenKeyEx(winreg.HKEY_LOCAL_MACHINE, key_path) as key:
return str(winreg.QueryValueEx(key, "InstallationFolder")[0])
else:
raise EnvironmentError("Couldn't locate HKEY_LOCAL_MACHINE because it's only available on Windows system")
except FileNotFoundError:
raise Exception(f"Couldn't find Windows SDK installation path in registry at path ({key_path})")