mach: Enable ANN rules (type annotations) for ruff Python linter (#38531)

This changes will introduce [flake8-annotations
(ANN)](https://docs.astral.sh/ruff/rules/#flake8-annotations-ann) for
python type annotation, this will make all thing related to function
strictly typed in python

This rule will start to affected this directory from now:
- /python -> Root directory
- /python/tidy
- /python/wpt

Testing: `./mach test-tidy`
Fixes: Not related to any issues

---------

Signed-off-by: Jerens Lensun <jerensslensun@gmail.com>
This commit is contained in:
Jerens Lensun 2025-08-14 18:36:17 +08:00 committed by GitHub
parent 9c1ee4be83
commit 797db25c4e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 122 additions and 70 deletions

View file

@ -17,14 +17,25 @@ select = [
"E",
"W",
"F",
# Type Annotation
"ANN",
]
ignore = [
# Trailing whitespace; the standard tidy process will enforce no trailing whitespace
"W291",
# 80 character line length; the standard tidy process will enforce line length
"E501",
# allow Any type
"ANN401",
]
[tool.ruff.lint.per-file-ignores]
"!python/**/**.py" = ["ANN"]
"python/servo/**.py" = ["ANN"]
"**/test.py" = ["ANN"]
"**/*_tests.py" = ["ANN"]
"**/tests/**/*.py" = ["ANN"]
[tool.pyrefly]
search-path = [
"python",

View file

@ -4,9 +4,11 @@
import hashlib
import os
from os import PathLike
import subprocess
import sys
import runpy
from typing import TYPE_CHECKING
SCRIPT_PATH = os.path.abspath(os.path.dirname(__file__))
TOP_DIR = os.path.abspath(os.path.join(SCRIPT_PATH, ".."))
@ -80,7 +82,11 @@ CATEGORIES = {
}
def _process_exec(args: list[str], cwd) -> None:
if TYPE_CHECKING:
from mach.main import Mach
def _process_exec(args: list[str], cwd: PathLike[bytes] | PathLike[str] | bytes | str) -> None:
try:
subprocess.check_output(args, stderr=subprocess.STDOUT, cwd=cwd)
except subprocess.CalledProcessError as exception:
@ -188,7 +194,7 @@ def bootstrap_command_only(topdir: str) -> int:
return 0
def bootstrap(topdir: str):
def bootstrap(topdir: str) -> "Mach":
_ensure_case_insensitive_if_windows()
topdir = os.path.abspath(topdir)
@ -202,7 +208,7 @@ def bootstrap(topdir: str):
_activate_virtualenv(topdir)
def populate_context(context, key=None):
def populate_context(context: None, key: None | str = None) -> str | None:
if key is None:
return
if key == "topdir":

View file

@ -9,7 +9,7 @@
import logging
import os
from typing import Iterable, Tuple
from collections.abc import Iterable
import unittest
from . import tidy
@ -180,7 +180,7 @@ class CheckTidiness(unittest.TestCase):
self.assertNoMoreErrors(errors)
def test_raw_url_in_rustdoc(self):
def assert_has_a_single_rustdoc_error(errors: Iterable[Tuple[int, str]]):
def assert_has_a_single_rustdoc_error(errors: Iterable[tuple[int, str]]):
self.assertEqual(tidy.ERROR_RAW_URL_IN_RUSTDOC, next(errors)[1])
self.assertNoMoreErrors(errors)

View file

@ -18,7 +18,7 @@ import re
import subprocess
import sys
from dataclasses import dataclass
from typing import Any, Dict, List, TypedDict, LiteralString
from typing import Any, TypedDict, LiteralString
from collections.abc import Iterator, Callable
import types
@ -174,7 +174,7 @@ def progress_wrapper(iterator: Iterator[str]) -> Iterator[str]:
yield thing
def git_changes_since_last_merge(path):
def git_changes_since_last_merge(path: str) -> list[str] | str:
args = ["git", "log", "-n1", "--committer", "noreply@github.com", "--format=%H"]
last_merge = subprocess.check_output(args, universal_newlines=True).strip()
if not last_merge:
@ -191,7 +191,9 @@ class FileList(object):
excluded: list[str]
generator: Iterator[str]
def __init__(self, directory, only_changed_files=False, exclude_dirs=[], progress=True) -> None:
def __init__(
self, directory: str, only_changed_files: bool = False, exclude_dirs: list[str] = [], progress: bool = True
) -> None:
self.directory = directory
self.excluded = exclude_dirs
self.generator = self._filter_excluded() if exclude_dirs else self._default_walk()
@ -314,7 +316,7 @@ def contains_url(line: bytes) -> bool:
return bool(URL_REGEX.search(line))
def is_unsplittable(file_name: str, line: bytes):
def is_unsplittable(file_name: str, line: bytes) -> bool:
return contains_url(line) or file_name.endswith(".rs") and line.startswith(b"use ") and b"{" not in line
@ -432,7 +434,7 @@ def run_python_type_checker() -> Iterator[tuple[str, int, str]]:
yield relative_path(diagnostic.path), diagnostic.line, diagnostic.concise_description
def run_cargo_deny_lints():
def run_cargo_deny_lints() -> Iterator[tuple[str, int, str]]:
print("\r ➤ Running `cargo-deny` checks...")
result = subprocess.run(
["cargo-deny", "--format=json", "--all-features", "check"], encoding="utf-8", capture_output=True
@ -633,10 +635,10 @@ def lint_wpt_test_files() -> Iterator[tuple[str, int, str]]:
# Override the logging function so that we can collect errors from
# the lint script, which doesn't allow configuration of the output.
messages: List[str] = []
messages: list[str] = []
assert lint.logger is not None
def collect_messages(_, message):
def collect_messages(_: None, message: str) -> None:
messages.append(message)
lint.logger.error = types.MethodType(collect_messages, lint.logger)
@ -875,7 +877,7 @@ def collect_errors_for_files(
yield (filename,) + error
def scan(only_changed_files=False, progress=False, github_annotations=False) -> int:
def scan(only_changed_files: bool = False, progress: bool = False, github_annotations: bool = False) -> int:
github_annotation_manager = GitHubAnnotationManager("test-tidy")
# check config file for errors
config_errors = check_config_file(CONFIG_FILE_PATH)
@ -922,7 +924,7 @@ def scan(only_changed_files=False, progress=False, github_annotations=False) ->
class CargoDenyKrate:
def __init__(self, data: Dict[Any, Any]) -> None:
def __init__(self, data: dict[Any, Any]) -> None:
crate = data["Krate"]
self.name = crate["name"]
self.version = crate["version"]

View file

@ -56,7 +56,7 @@ class LocalGitRepo:
# git in advance and run the subprocess by its absolute path.
self.git_path = shutil.which("git")
def run_without_encoding(self, *args, env: dict = {}) -> bytes:
def run_without_encoding(self, *args: str, env: dict = {}) -> bytes:
if self.git_path is None:
raise RuntimeError("Git executable not found in PATH")
command_line = [self.git_path] + list(args)
@ -75,7 +75,7 @@ class LocalGitRepo:
)
raise exception
def run(self, *args, env: dict = {}) -> str:
def run(self, *args: str, env: dict = {}) -> str:
return self.run_without_encoding(*args, env=env).decode("utf-8", errors="surrogateescape")
@ -164,7 +164,7 @@ class WPTSync:
self.local_servo_repo = LocalGitRepo(self.servo_path, self)
self.local_wpt_repo = LocalGitRepo(self.wpt_path, self)
def run(self, payload: dict, step_callback=None) -> bool:
def run(self, payload: dict, step_callback: Callable[[Step], None] | None = None) -> bool:
if "pull_request" not in payload:
return True
@ -184,7 +184,7 @@ class WPTSync:
try:
servo_pr = self.servo.get_pull_request(pull_data["number"])
downstream_wpt_branch = self.downstream_wpt.get_branch(
wpt_branch_name_from_servo_pr_number(servo_pr.number)
wpt_branch_name_from_servo_pr_number(str(servo_pr.number))
)
upstream_pr = self.wpt.get_open_pull_request_for_branch(self.github_username, downstream_wpt_branch)
if upstream_pr:

View file

@ -43,5 +43,5 @@ COULD_NOT_MERGE_CHANGES_UPSTREAM_COMMENT = (
)
def wpt_branch_name_from_servo_pr_number(servo_pr_number) -> str:
def wpt_branch_name_from_servo_pr_number(servo_pr_number: str) -> str:
return f"servo_export_{servo_pr_number}"

View file

@ -18,7 +18,7 @@ from __future__ import annotations
import logging
import urllib.parse
from typing import Optional, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING, Any
import requests
@ -29,7 +29,7 @@ USER_AGENT = "Servo web-platform-test sync service"
TIMEOUT = 30 # 30 seconds
def authenticated(sync: WPTSync, method: str, url: str, json=None) -> requests.Response:
def authenticated(sync: WPTSync, method: str, url: str, json: dict[str, Any] | None = None) -> requests.Response:
logging.info(" → Request: %s %s", method, url)
if json:
logging.info(" → Request JSON: %s", json)
@ -138,7 +138,7 @@ class PullRequest:
def __str__(self) -> str:
return f"{self.repo}#{self.number}"
def api(self, *args, **kwargs) -> requests.Response:
def api(self, *args: Any, **kwargs: dict[str, Any]) -> requests.Response:
return authenticated(self.context, *args, **kwargs)
def leave_comment(self, comment: str) -> requests.Response:
@ -163,7 +163,8 @@ class PullRequest:
self.api("DELETE", f"{self.base_issues_url}/labels/{label}")
def add_labels(self, labels: list[str]) -> None:
self.api("POST", f"{self.base_issues_url}/labels", json=labels)
data = {"labels": labels}
self.api("POST", f"{self.base_issues_url}/labels", json=data)
def merge(self) -> None:
self.api("PUT", f"{self.base_url}/merge", json={"merge_method": "rebase"})

View file

@ -19,7 +19,7 @@ import logging
import os
import textwrap
from typing import TYPE_CHECKING, Generic, Optional, TypeVar
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Callable, Any
from .common import COULD_NOT_APPLY_CHANGES_DOWNSTREAM_COMMENT
from .common import COULD_NOT_APPLY_CHANGES_UPSTREAM_COMMENT
@ -36,7 +36,7 @@ PATCH_FILE_NAME = "tmp.patch"
class Step:
def __init__(self, name) -> None:
def __init__(self, name: str) -> None:
self.name = name
def provides(self) -> Optional[AsyncValue]:
@ -91,7 +91,9 @@ class CreateOrUpdateBranchForPRStep(Step):
if run.upstream_pr.has_value():
run.add_step(CommentStep(run.upstream_pr.value(), COULD_NOT_APPLY_CHANGES_UPSTREAM_COMMENT))
def _get_upstreamable_commits_from_local_servo_repo(self, sync: WPTSync):
def _get_upstreamable_commits_from_local_servo_repo(
self, sync: WPTSync
) -> list[dict[str, bytes | str] | dict[str, str]]:
local_servo_repo = sync.local_servo_repo
number_of_commits = self.pull_data["commits"]
pr_head = self.pull_data["head"]["sha"]
@ -143,7 +145,9 @@ class CreateOrUpdateBranchForPRStep(Step):
run.sync.local_wpt_repo.run("add", "--all")
run.sync.local_wpt_repo.run("commit", "--message", commit["message"], "--author", commit["author"])
def _create_or_update_branch_for_pr(self, run: SyncRun, commits: list[dict], pre_commit_callback=None) -> str:
def _create_or_update_branch_for_pr(
self, run: SyncRun, commits: list[dict], pre_commit_callback: Callable[[], None] | None = None
) -> str:
branch_name = wpt_branch_name_from_servo_pr_number(self.pull_data["number"])
try:
# Create a new branch with a unique name that is consistent between
@ -180,7 +184,7 @@ class CreateOrUpdateBranchForPRStep(Step):
class RemoveBranchForPRStep(Step):
def __init__(self, pull_request) -> None:
def __init__(self, pull_request: dict[str, Any]) -> None:
Step.__init__(self, "RemoveBranchForPRStep")
self.branch_name = wpt_branch_name_from_servo_pr_number(pull_request["number"])

View file

@ -13,7 +13,7 @@ import mozlog.formatters.base
import mozlog.reader
from dataclasses import dataclass, field
from typing import DefaultDict, Dict, Optional, NotRequired, Union, TypedDict, Literal
from typing import DefaultDict, Optional, NotRequired, Union, TypedDict, Literal
from six import itervalues
DEFAULT_MOVE_UP_CODE = "\x1b[A"
@ -44,12 +44,12 @@ class UnexpectedResult:
issues: list[str] = field(default_factory=list)
flaky: bool = False
def __str__(self):
def __str__(self) -> str:
output = UnexpectedResult.to_lines(self)
if self.unexpected_subtest_results:
def make_subtests_failure(subtest_results):
def make_subtests_failure(subtest_results: list[UnexpectedSubtestResult]) -> list[str]:
# Test names sometimes contain control characters, which we want
# to be printed in their raw form, and not their interpreted form.
lines = []
@ -74,7 +74,7 @@ class UnexpectedResult:
return UnexpectedResult.wrap_and_indent_lines(output, " ")
@staticmethod
def wrap_and_indent_lines(lines, indent: str):
def wrap_and_indent_lines(lines: list[str], indent: str) -> str:
if not lines:
return ""
@ -86,7 +86,7 @@ class UnexpectedResult:
return output
@staticmethod
def to_lines(result: Union[UnexpectedSubtestResult, UnexpectedResult], print_stack=True) -> list[str]:
def to_lines(result: Union[UnexpectedSubtestResult, UnexpectedResult], print_stack: bool = True) -> list[str]:
first_line = result.actual
if result.expected != result.actual:
first_line += f" [expected {result.expected}]"
@ -120,12 +120,15 @@ class GlobalTestData(TypedDict):
Status = Literal["PASS", "FAIL", "PRECONDITION_FAILED", "TIMEOUT", "CRASH", "ASSERT", "SKIP", "OK", "ERROR"]
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
class SuiteStartData(GlobalTestData):
tests: Dict
tests: dict
name: NotRequired[str]
run_info: NotRequired[Dict]
version_info: NotRequired[Dict]
device_info: NotRequired[Dict]
run_info: NotRequired[dict]
version_info: NotRequired[dict]
device_info: NotRequired[dict]
class TestStartData(GlobalTestData):
@ -152,6 +155,19 @@ class TestStatusData(TestEndData):
subtest: str
class ProcessOutputData(GlobalTestData):
process: int
data: str
command: NotRequired[str]
test: NotRequired[str]
subsuite: NotRequired[str]
class LogData(GlobalTestData):
level: LogLevel
message: NotRequired[str]
class ServoHandler(mozlog.reader.LogHandler):
"""LogHandler designed to collect unexpected results for use by
script or by the ServoFormatter output formatter."""
@ -159,16 +175,16 @@ class ServoHandler(mozlog.reader.LogHandler):
number_of_tests: int
completed_tests: int
need_to_erase_last_line: int
running_tests: Dict[str, str]
running_tests: dict[str, str]
test_output: DefaultDict[str, str]
subtest_failures: DefaultDict[str, list]
tests_with_failing_subtests: list
unexpected_results: list
expected: Dict[str, int]
unexpected_tests: Dict[str, list]
expected: dict[str, int]
unexpected_tests: dict[str, list]
suite_start_time: int
def __init__(self, detect_flakes=False) -> None:
def __init__(self, detect_flakes: bool = False) -> None:
"""
Flake detection assumes first suite is actual run
and rest of the suites are retry-unexpected for flakes detection.
@ -225,14 +241,14 @@ class ServoHandler(mozlog.reader.LogHandler):
self.number_of_tests = sum(len(tests) for tests in itervalues(data["tests"]))
self.suite_start_time = data["time"]
def suite_end(self, data) -> Optional[str]:
def suite_end(self, data: GlobalTestData) -> Optional[str]:
pass
def test_start(self, data: TestStartData) -> Optional[str]:
self.running_tests[data["thread"]] = data["test"]
@staticmethod
def data_was_for_expected_result(data):
def data_was_for_expected_result(data: TestEndData) -> bool:
if "expected" not in data:
return True
return "known_intermittent" in data and data["status"] in data["known_intermittent"]
@ -319,11 +335,11 @@ class ServoHandler(mozlog.reader.LogHandler):
)
)
def process_output(self, data) -> None:
def process_output(self, data: ProcessOutputData) -> None:
if "test" in data:
self.test_output[data["test"]] += data["data"] + "\n"
def log(self, data) -> str | None:
def log(self, data: LogData) -> str | None:
pass
@ -362,7 +378,7 @@ class ServoFormatter(mozlog.formatters.base.BaseFormatter, ServoHandler):
return ""
return (self.move_up + self.clear_eol) * self.current_display.count("\n")
def generate_output(self, text=None, new_display=None) -> str | None:
def generate_output(self, text: str | None = None, new_display: str | None = None) -> str | None:
if not self.interactive:
return text
@ -392,7 +408,7 @@ class ServoFormatter(mozlog.formatters.base.BaseFormatter, ServoHandler):
else:
return new_display + "No tests running.\n"
def suite_start(self, data) -> str:
def suite_start(self, data: SuiteStartData) -> str:
ServoHandler.suite_start(self, data)
maybe_flakes_msg = " to detect flaky tests" if self.currently_detecting_flakes else ""
if self.number_of_tests == 0:
@ -400,12 +416,12 @@ class ServoFormatter(mozlog.formatters.base.BaseFormatter, ServoHandler):
else:
return f"Running {self.number_of_tests} tests in {data['source']}{maybe_flakes_msg}\n\n"
def test_start(self, data) -> str | None:
def test_start(self, data: TestStartData) -> str | None:
ServoHandler.test_start(self, data)
if self.interactive:
return self.generate_output(new_display=self.build_status_line())
def test_end(self, data) -> str | None:
def test_end(self, data: TestEndData) -> str | None:
unexpected_result = ServoHandler.test_end(self, data)
if unexpected_result:
# Surround test output by newlines so that it is easier to read.
@ -424,10 +440,10 @@ class ServoFormatter(mozlog.formatters.base.BaseFormatter, ServoHandler):
else:
return self.generate_output(text="%s%s\n" % (self.test_counter(), data["test"]))
def test_status(self, data) -> None:
def test_status(self, data: TestStatusData) -> None:
ServoHandler.test_status(self, data)
def suite_end(self, data) -> str | None:
def suite_end(self, data: GlobalTestData) -> str | None:
ServoHandler.suite_end(self, data)
if not self.interactive:
output = "\n"
@ -472,10 +488,10 @@ class ServoFormatter(mozlog.formatters.base.BaseFormatter, ServoHandler):
return self.generate_output(text=output, new_display="")
def process_output(self, data) -> None:
def process_output(self, data: ProcessOutputData) -> None:
ServoHandler.process_output(self, data)
def log(self, data) -> str | None:
def log(self, data: LogData) -> str | None:
ServoHandler.log(self, data)
# We are logging messages that begin with STDERR, because that is how exceptions

View file

@ -3,7 +3,7 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
from wptrunner.wptcommandline import TestRoot
from typing import Mapping
from typing import Mapping, Any
import argparse
from argparse import ArgumentParser
import os
@ -34,7 +34,12 @@ def create_parser() -> ArgumentParser:
return p
def update(check_clean=True, rebuild=False, logger=None, **kwargs) -> int:
def update(
check_clean: bool = True,
rebuild: bool = False,
logger: Any = None,
**kwargs: Any,
) -> int:
if not logger:
logger = wptlogging.setup(kwargs, {"mach": sys.stdout})
kwargs = {
@ -55,7 +60,7 @@ def update(check_clean=True, rebuild=False, logger=None, **kwargs) -> int:
return _update(logger, test_paths, rebuild)
def _update(logger, test_paths: Mapping[str, TestRoot], rebuild) -> int:
def _update(logger: Any, test_paths: Mapping[str, TestRoot], rebuild: bool) -> int:
for url_base, paths in iteritems(test_paths):
manifest_path = os.path.join(paths.metadata_path, "MANIFEST.json")
cache_subdir = os.path.relpath(os.path.dirname(manifest_path), os.path.dirname(__file__))
@ -70,7 +75,7 @@ def _update(logger, test_paths: Mapping[str, TestRoot], rebuild) -> int:
return 0
def _check_clean(logger, test_paths: Mapping[str, TestRoot]) -> int:
def _check_clean(logger: Any, test_paths: Mapping[str, TestRoot]) -> int:
manifests_by_path = {}
rv = 0
for url_base, paths in iteritems(test_paths):
@ -107,7 +112,7 @@ def _check_clean(logger, test_paths: Mapping[str, TestRoot]) -> int:
return rv
def diff_manifests(logger, manifest_path, old_manifest, new_manifest) -> bool:
def diff_manifests(logger: Any, manifest_path: Any, old_manifest: Any, new_manifest: Any) -> bool:
"""Lint the differences between old and new versions of a
manifest. Differences are considered significant (and so produce
lint errors) if they produce a meaningful difference in the actual
@ -164,11 +169,11 @@ def diff_manifests(logger, manifest_path, old_manifest, new_manifest) -> bool:
old_paths = old_manifest.to_json()["items"]
new_paths = new_manifest.to_json()["items"]
if old_paths != new_paths:
logger.warning("Manifest %s contains correct tests but file hashes changed." % manifest_path) # noqa
logger.warning("Manifest %s contains correct tests but file hashes changed." % manifest_path)
clean = False
return clean
def log_error(logger, manifest_path, msg: str) -> None:
def log_error(logger: Any, manifest_path: Any, msg: str) -> None:
logger.lint_error(path=manifest_path, message=msg, lineno=0, source="", linter="wpt-manifest")

View file

@ -14,7 +14,8 @@ import urllib.error
import urllib.parse
import urllib.request
from typing import List, NamedTuple, Optional, Union, cast, Callable
from typing import List, NamedTuple, Optional, Union, cast, Any
from collections.abc import Callable
import mozlog
import mozlog.formatters
@ -37,7 +38,7 @@ def set_if_none(args: dict, key: str, value: bool | int | str) -> None:
args[key] = value
def run_tests(default_binary_path: str, **kwargs) -> int:
def run_tests(default_binary_path: str, **kwargs: Any) -> int:
print(f"Running WPT tests with {default_binary_path}")
# By default, Rayon selects the number of worker threads based on the
@ -249,7 +250,12 @@ def filter_intermittents(unexpected_results: List[UnexpectedResult], output_path
print(f"Filtering {len(unexpected_results)} unexpected results for known intermittents via <{dashboard.url}>")
dashboard.report_failures(unexpected_results)
def add_result(output: list[str], text: str, results: List[UnexpectedResult], filter_func) -> None:
def add_result(
output: list[str],
text: str,
results: List[UnexpectedResult],
filter_func: Callable[[UnexpectedResult], bool],
) -> None:
filtered = [str(result) for result in filter(filter_func, results)]
if filtered:
output += [f"{text} ({len(filtered)}): ", *filtered]
@ -263,7 +269,7 @@ def filter_intermittents(unexpected_results: List[UnexpectedResult], output_path
output,
"Stable unexpected results that are known-intermittent",
unexpected_results,
lambda result: not result.flaky and result.issues,
lambda result: not result.flaky and bool(result.issues),
)
add_result(output, "Stable unexpected results", unexpected_results, is_stable_and_unexpected)
print("\n".join(output))

View file

@ -32,7 +32,7 @@ import time
import unittest
from functools import partial
from typing import Any, Optional, Tuple, Type
from typing import Any, Optional, Type
from wsgiref.simple_server import WSGIRequestHandler, make_server
import flask
@ -221,7 +221,7 @@ class TestApplyCommitsToWPT(unittest.TestCase):
pull_request = SYNC.servo.get_pull_request(pr_number)
step = CreateOrUpdateBranchForPRStep({"number": pr_number}, pull_request)
def get_applied_commits(num_commits: int, applied_commits: list[Tuple[str, str]]) -> None:
def get_applied_commits(num_commits: int, applied_commits: list[tuple[str, str]]) -> None:
assert SYNC is not None
repo = SYNC.local_wpt_repo
log = ["log", "--oneline", f"-{num_commits}"]

View file

@ -3,6 +3,7 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# pylint: disable=missing-docstring
from typing import Any
import os
import subprocess
import shutil
@ -21,7 +22,7 @@ TEST_ROOT = os.path.join(WPT_PATH, "tests")
META_ROOTS = [os.path.join(WPT_PATH, "meta"), os.path.join(WPT_PATH, "meta-legacy")]
def do_sync(**kwargs) -> int:
def do_sync(**kwargs: str) -> int:
last_commit = subprocess.check_output(["git", "log", "-1"])
# Commits should always be authored by the GitHub Actions bot.
@ -94,7 +95,7 @@ def remove_unused_metadata() -> None:
shutil.rmtree(directory)
def update_tests(**kwargs) -> int:
def update_tests(**kwargs: Any) -> int:
def set_if_none(args: dict, key: str, value: str) -> None:
if key not in args or args[key] is None:
args[key] = value
@ -113,11 +114,11 @@ def update_tests(**kwargs) -> int:
return 0 if run_update(**kwargs) else 1
def run_update(**kwargs) -> bool:
def run_update(**kwargs: Any) -> bool:
"""Run the update process returning True if the process is successful."""
logger = setup_logging(kwargs, {"mach": sys.stdout})
return WPTUpdate(logger, **kwargs).run() != exit_unclean
def create_parser(**_kwargs) -> ArgumentParser:
def create_parser(**_kwargs: Any) -> ArgumentParser:
return wptcommandline.create_parser_update()