mirror of
https://github.com/servo/servo.git
synced 2025-08-02 04:00:32 +01:00
317 lines
11 KiB
Python
317 lines
11 KiB
Python
import abc
|
|
import json
|
|
import os
|
|
import stat
|
|
from collections import deque
|
|
from collections import MutableMapping
|
|
|
|
from six import with_metaclass, PY2
|
|
|
|
from .sourcefile import SourceFile
|
|
from .utils import git
|
|
|
|
try:
|
|
from ..gitignore import gitignore
|
|
except ValueError:
|
|
# relative import beyond toplevel throws *ValueError*!
|
|
from gitignore import gitignore # type: ignore
|
|
|
|
|
|
MYPY = False
|
|
if MYPY:
|
|
# MYPY is set to True when run under Mypy.
|
|
from typing import Dict, Optional, List, Set, Text, Iterable, Any, Tuple, Union, Iterator
|
|
from .manifest import Manifest # cyclic import under MYPY guard
|
|
if PY2:
|
|
stat_result = Any
|
|
else:
|
|
stat_result = os.stat_result
|
|
|
|
|
|
def get_tree(tests_root, manifest, manifest_path, cache_root,
|
|
working_copy=True, rebuild=False):
|
|
# type: (bytes, Manifest, Optional[bytes], Optional[bytes], bool, bool) -> FileSystem
|
|
tree = None
|
|
if cache_root is None:
|
|
cache_root = os.path.join(tests_root, b".wptcache")
|
|
if not os.path.exists(cache_root):
|
|
try:
|
|
os.makedirs(cache_root)
|
|
except IOError:
|
|
cache_root = None
|
|
|
|
if not working_copy:
|
|
raise ValueError("working_copy=False unsupported")
|
|
|
|
if tree is None:
|
|
tree = FileSystem(tests_root,
|
|
manifest.url_base,
|
|
manifest_path=manifest_path,
|
|
cache_path=cache_root,
|
|
rebuild=rebuild)
|
|
return tree
|
|
|
|
|
|
class GitHasher(object):
|
|
def __init__(self, path):
|
|
# type: (bytes) -> None
|
|
self.git = git(path)
|
|
|
|
def _local_changes(self):
|
|
# type: () -> Set[bytes]
|
|
"""get a set of files which have changed between HEAD and working copy"""
|
|
assert self.git is not None
|
|
# note that git runs the command with tests_root as the cwd, which may
|
|
# not be the root of the git repo (e.g., within a browser repo)
|
|
cmd = [b"diff-index", b"--relative", b"--no-renames", b"--name-only", b"-z", b"HEAD"]
|
|
data = self.git(*cmd)
|
|
return set(data.split(b"\0"))
|
|
|
|
def hash_cache(self):
|
|
# type: () -> Dict[bytes, Optional[bytes]]
|
|
"""
|
|
A dict of rel_path -> current git object id if the working tree matches HEAD else None
|
|
"""
|
|
hash_cache = {} # type: Dict[bytes, Optional[bytes]]
|
|
|
|
if self.git is None:
|
|
return hash_cache
|
|
|
|
# note that git runs the command with tests_root as the cwd, which may
|
|
# not be the root of the git repo (e.g., within a browser repo)
|
|
cmd = ["ls-tree", "-r", "-z", "HEAD"]
|
|
local_changes = self._local_changes()
|
|
for result in self.git(*cmd).split(b"\0")[:-1]: # type: bytes
|
|
data, rel_path = result.rsplit(b"\t", 1)
|
|
hash_cache[rel_path] = None if rel_path in local_changes else data.split(b" ", 3)[2]
|
|
|
|
return hash_cache
|
|
|
|
|
|
|
|
class FileSystem(object):
|
|
def __init__(self, root, url_base, cache_path, manifest_path=None, rebuild=False):
|
|
# type: (bytes, Text, Optional[bytes], Optional[bytes], bool) -> None
|
|
self.root = os.path.abspath(root)
|
|
self.url_base = url_base
|
|
self.ignore_cache = None
|
|
self.mtime_cache = None
|
|
if cache_path is not None:
|
|
if manifest_path is not None:
|
|
self.mtime_cache = MtimeCache(cache_path, root, manifest_path, rebuild)
|
|
if gitignore.has_ignore(root):
|
|
self.ignore_cache = GitIgnoreCache(cache_path, root, rebuild)
|
|
self.path_filter = gitignore.PathFilter(self.root,
|
|
extras=[".git/"],
|
|
cache=self.ignore_cache)
|
|
git = GitHasher(root)
|
|
if git is not None:
|
|
self.hash_cache = git.hash_cache()
|
|
else:
|
|
self.hash_cache = {}
|
|
|
|
def __iter__(self):
|
|
# type: () -> Iterator[Tuple[Union[bytes, SourceFile], bool]]
|
|
mtime_cache = self.mtime_cache
|
|
for dirpath, dirnames, filenames in self.path_filter(walk(self.root)):
|
|
for filename, path_stat in filenames:
|
|
path = os.path.join(dirpath, filename)
|
|
if mtime_cache is None or mtime_cache.updated(path, path_stat):
|
|
hash = self.hash_cache.get(path, None)
|
|
yield SourceFile(self.root, path, self.url_base, hash), True
|
|
else:
|
|
yield path, False
|
|
|
|
def dump_caches(self):
|
|
# type: () -> None
|
|
for cache in [self.mtime_cache, self.ignore_cache]:
|
|
if cache is not None:
|
|
cache.dump()
|
|
|
|
|
|
class CacheFile(with_metaclass(abc.ABCMeta)):
|
|
def __init__(self, cache_root, tests_root, rebuild=False):
|
|
# type: (bytes, bytes, bool) -> None
|
|
self.tests_root = tests_root
|
|
if not os.path.exists(cache_root):
|
|
os.makedirs(cache_root)
|
|
self.path = os.path.join(cache_root, self.file_name)
|
|
self.modified = False
|
|
self.data = self.load(rebuild)
|
|
|
|
@abc.abstractproperty
|
|
def file_name(self):
|
|
# type: () -> bytes
|
|
pass
|
|
|
|
def dump(self):
|
|
# type: () -> None
|
|
if not self.modified:
|
|
return
|
|
with open(self.path, 'w') as f:
|
|
json.dump(self.data, f, indent=1)
|
|
|
|
def load(self, rebuild=False):
|
|
# type: (bool) -> Dict[Any, Any]
|
|
data = {} # type: Dict[Any, Any]
|
|
try:
|
|
if not rebuild:
|
|
with open(self.path, 'r') as f:
|
|
try:
|
|
data = json.load(f)
|
|
except ValueError:
|
|
pass
|
|
data = self.check_valid(data)
|
|
except IOError:
|
|
pass
|
|
return data
|
|
|
|
def check_valid(self, data):
|
|
# type: (Dict[Any, Any]) -> Dict[Any, Any]
|
|
"""Check if the cached data is valid and return an updated copy of the
|
|
cache containing only data that can be used."""
|
|
return data
|
|
|
|
|
|
class MtimeCache(CacheFile):
|
|
file_name = b"mtime.json"
|
|
|
|
def __init__(self, cache_root, tests_root, manifest_path, rebuild=False):
|
|
# type: (bytes, bytes, bytes, bool) -> None
|
|
self.manifest_path = manifest_path
|
|
super(MtimeCache, self).__init__(cache_root, tests_root, rebuild)
|
|
|
|
def updated(self, rel_path, stat):
|
|
# type: (bytes, stat_result) -> bool
|
|
"""Return a boolean indicating whether the file changed since the cache was last updated.
|
|
|
|
This implicitly updates the cache with the new mtime data."""
|
|
mtime = stat.st_mtime
|
|
if mtime != self.data.get(rel_path):
|
|
self.modified = True
|
|
self.data[rel_path] = mtime
|
|
return True
|
|
return False
|
|
|
|
def check_valid(self, data):
|
|
# type: (Dict[Any, Any]) -> Dict[Any, Any]
|
|
if data.get("/tests_root") != self.tests_root:
|
|
self.modified = True
|
|
else:
|
|
if self.manifest_path is not None and os.path.exists(self.manifest_path):
|
|
mtime = os.path.getmtime(self.manifest_path)
|
|
if data.get("/manifest_path") != [self.manifest_path, mtime]:
|
|
self.modified = True
|
|
else:
|
|
self.modified = True
|
|
if self.modified:
|
|
data = {}
|
|
data["/tests_root"] = self.tests_root
|
|
return data
|
|
|
|
def dump(self):
|
|
# type: () -> None
|
|
if self.manifest_path is None:
|
|
raise ValueError
|
|
if not os.path.exists(self.manifest_path):
|
|
return
|
|
mtime = os.path.getmtime(self.manifest_path)
|
|
self.data["/manifest_path"] = [self.manifest_path, mtime]
|
|
self.data["/tests_root"] = self.tests_root
|
|
super(MtimeCache, self).dump()
|
|
|
|
|
|
class GitIgnoreCache(CacheFile, MutableMapping): # type: ignore
|
|
file_name = b"gitignore.json"
|
|
|
|
def check_valid(self, data):
|
|
# type: (Dict[Any, Any]) -> Dict[Any, Any]
|
|
ignore_path = os.path.join(self.tests_root, b".gitignore")
|
|
mtime = os.path.getmtime(ignore_path)
|
|
if data.get(b"/gitignore_file") != [ignore_path, mtime]:
|
|
self.modified = True
|
|
data = {}
|
|
data[b"/gitignore_file"] = [ignore_path, mtime]
|
|
return data
|
|
|
|
def __contains__(self, key):
|
|
# type: (Any) -> bool
|
|
return key in self.data
|
|
|
|
def __getitem__(self, key):
|
|
# type: (bytes) -> bool
|
|
v = self.data[key]
|
|
assert isinstance(v, bool)
|
|
return v
|
|
|
|
def __setitem__(self, key, value):
|
|
# type: (bytes, bool) -> None
|
|
if self.data.get(key) != value:
|
|
self.modified = True
|
|
self.data[key] = value
|
|
|
|
def __delitem__(self, key):
|
|
# type: (bytes) -> None
|
|
del self.data[key]
|
|
|
|
def __iter__(self):
|
|
# type: () -> Iterator[bytes]
|
|
return iter(self.data)
|
|
|
|
def __len__(self):
|
|
# type: () -> int
|
|
return len(self.data)
|
|
|
|
|
|
def walk(root):
|
|
# type: (bytes) -> Iterable[Tuple[bytes, List[Tuple[bytes, stat_result]], List[Tuple[bytes, stat_result]]]]
|
|
"""Re-implementation of os.walk. Returns an iterator over
|
|
(dirpath, dirnames, filenames), with some semantic differences
|
|
to os.walk.
|
|
|
|
This has a similar interface to os.walk, with the important difference
|
|
that instead of lists of filenames and directory names, it yields
|
|
lists of tuples of the form [(name, stat)] where stat is the result of
|
|
os.stat for the file. That allows reusing the same stat data in the
|
|
caller. It also always returns the dirpath relative to the root, with
|
|
the root iself being returned as the empty string.
|
|
|
|
Unlike os.walk the implementation is not recursive."""
|
|
|
|
listdir = os.listdir
|
|
get_stat = os.stat
|
|
listdir = os.listdir
|
|
join = os.path.join
|
|
is_dir = stat.S_ISDIR
|
|
is_link = stat.S_ISLNK
|
|
relpath = os.path.relpath
|
|
|
|
root = os.path.abspath(root)
|
|
stack = deque([(root, b"")])
|
|
|
|
while stack:
|
|
dir_path, rel_path = stack.popleft()
|
|
try:
|
|
# Note that listdir and error are globals in this module due
|
|
# to earlier import-*.
|
|
names = listdir(dir_path)
|
|
except OSError:
|
|
continue
|
|
|
|
dirs, non_dirs = [], []
|
|
for name in names:
|
|
path = join(dir_path, name)
|
|
try:
|
|
path_stat = get_stat(path)
|
|
except OSError:
|
|
continue
|
|
if is_dir(path_stat.st_mode):
|
|
dirs.append((name, path_stat))
|
|
else:
|
|
non_dirs.append((name, path_stat))
|
|
|
|
yield rel_path, dirs, non_dirs
|
|
for name, path_stat in dirs:
|
|
new_path = join(dir_path, name)
|
|
if not is_link(path_stat.st_mode):
|
|
stack.append((new_path, relpath(new_path, root)))
|