Rewrite decisionlib with a builder pattern to be more composable

This commit is contained in:
Simon Sapin 2018-10-06 16:10:27 +02:00
parent eaee801e64
commit c0b132a2e0
3 changed files with 585 additions and 506 deletions

View file

@ -19,270 +19,450 @@ import hashlib
import json
import os
import re
import subprocess
import sys
import taskcluster
class DecisionTask:
# Public API
__all__ = [
"CONFIG", "SHARED", "Task", "DockerWorkerTask",
"GenericWorkerTask", "WindowsGenericWorkerTask",
]
class Config:
"""
Holds some project-specific configuration and provides higher-level functionality
on top of the `taskcluster` package a.k.a. `taskcluster-client.py`.
Global configuration, for users of the library to modify.
"""
def __init__(self):
self.task_name_template = "%s"
self.index_prefix = "garbage.servo-decisionlib"
self.scopes_for_all_subtasks = []
self.routes_for_all_subtasks = []
self.docker_images_expire_in = "1 month"
self.repacked_msi_files_expire_in = "1 month"
DOCKER_IMAGE_ARTIFACT_FILENAME = "image.tar.lz4"
# Set by docker-worker:
# https://docs.taskcluster.net/docs/reference/workers/docker-worker/docs/environment
self.decision_task_id = os.environ.get("TASK_ID")
# https://github.com/servo/taskcluster-bootstrap-docker-images#image-builder
DOCKER_IMAGE_BUILDER_IMAGE = "servobrowser/taskcluster-bootstrap:image-builder@sha256:" \
"0a7d012ce444d62ffb9e7f06f0c52fedc24b68c2060711b313263367f7272d9d"
# Set in the decision tasks payload, such as defined in .taskcluster.yml
self.task_owner = os.environ.get("TASK_OWNER")
self.task_source = os.environ.get("TASK_SOURCE")
self.git_url = os.environ.get("GIT_URL")
self.git_ref = os.environ.get("GIT_REF")
self.git_sha = os.environ.get("GIT_SHA")
def __init__(self, *, index_prefix="garbage.servo-decisionlib", task_name_template="%s",
default_worker_type="github-worker", docker_image_cache_expiry="1 month",
routes_for_all_subtasks=None, scopes_for_all_subtasks=None):
self.task_name_template = task_name_template
self.index_prefix = index_prefix
self.default_worker_type = default_worker_type
self.docker_image_cache_expiry = docker_image_cache_expiry
self.routes_for_all_subtasks = routes_for_all_subtasks or []
self.scopes_for_all_subtasks = scopes_for_all_subtasks or []
def git_sha_is_current_head(self):
output = subprocess.check_output(["git", "rev-parse", "HEAD"])
self.git_sha = output.decode("utf8").strip()
# https://docs.taskcluster.net/docs/reference/workers/docker-worker/docs/features#feature-taskclusterproxy
class Shared:
"""
Global shared state.
"""
def __init__(self):
self.now = datetime.datetime.utcnow()
self.found_or_created_indexed_tasks = {}
# taskclusterProxy URLs:
# https://docs.taskcluster.net/docs/reference/workers/docker-worker/docs/features
self.queue_service = taskcluster.Queue(options={"baseUrl": "http://taskcluster/queue/v1/"})
self.index_service = taskcluster.Index(options={"baseUrl": "http://taskcluster/index/v1/"})
self.now = datetime.datetime.utcnow()
self.found_or_created_indices = {}
def from_now_json(self, offset):
"""
Same as `taskcluster.fromNowJSON`, but uses the creation time of `self` for now.
"""
return taskcluster.stringDate(taskcluster.fromNow(offset, dateObj=self.now))
def find_or_create_task(self, *, index_bucket, index_key, index_expiry, artifacts, **kwargs):
"""
Find a task indexed in the given bucket (kind, category, ) and cache key,
on schedule a new one if there isnt one yet.
Returns the task ID.
"""
index_path = "%s.%s.%s" % (self.index_prefix, index_bucket, index_key)
CONFIG = Config()
SHARED = Shared()
from_now_json = SHARED.from_now_json
now = SHARED.now
task_id = self.found_or_created_indices.get(index_path)
def chaining(op, attr):
def method(self, *args, **kwargs):
op(self, attr, *args, **kwargs)
return self
return method
def append_to_attr(self, attr, *args): getattr(self, attr).extend(args)
def prepend_to_attr(self, attr, *args): getattr(self, attr)[0:0] = list(args)
def update_attr(self, attr, **kwargs): getattr(self, attr).update(kwargs)
class Task:
def __init__(self, name):
self.name = name
self.description = ""
self.scheduler_id = "taskcluster-github"
self.provisioner_id = "aws-provisioner-v1"
self.worker_type = "github-worker"
self.deadline_in = "1 day"
self.expires_in = "1 year"
self.index_and_artifacts_expire_in = self.expires_in
self.dependencies = []
self.scopes = []
self.routes = []
self.extra = {}
with_description = chaining(setattr, "description")
with_scheduler_id = chaining(setattr, "scheduler_id")
with_provisioner_id = chaining(setattr, "provisioner_id")
with_worker_type = chaining(setattr, "worker_type")
with_deadline_in = chaining(setattr, "deadline_in")
with_expires_in = chaining(setattr, "expires_in")
with_index_and_artifacts_expire_in = chaining(setattr, "index_and_artifacts_expire_in")
with_dependencies = chaining(append_to_attr, "dependencies")
with_scopes = chaining(append_to_attr, "scopes")
with_routes = chaining(append_to_attr, "routes")
with_extra = chaining(update_attr, "extra")
def build_worker_payload(self):
raise NotImplementedError
def create(self):
worker_payload = self.build_worker_payload()
assert CONFIG.decision_task_id
assert CONFIG.task_owner
assert CONFIG.task_source
queue_payload = {
"taskGroupId": CONFIG.decision_task_id,
"dependencies": [CONFIG.decision_task_id] + self.dependencies,
"schedulerId": self.scheduler_id,
"provisionerId": self.provisioner_id,
"workerType": self.worker_type,
"created": SHARED.from_now_json(""),
"deadline": SHARED.from_now_json(self.deadline_in),
"expires": SHARED.from_now_json(self.expires_in),
"metadata": {
"name": CONFIG.task_name_template % self.name,
"description": self.description,
"owner": CONFIG.task_owner,
"source": CONFIG.task_source,
},
"payload": worker_payload,
}
scopes = self.scopes + CONFIG.scopes_for_all_subtasks
routes = self.routes + CONFIG.routes_for_all_subtasks
if any(r.startswith("index.") for r in routes):
self.extra.setdefault("index", {})["expires"] = \
SHARED.from_now_json(self.index_and_artifacts_expire_in)
dict_update_if_truthy(
queue_payload,
scopes=scopes,
routes=routes,
extra=self.extra,
)
task_id = taskcluster.slugId().decode("utf8")
SHARED.queue_service.createTask(task_id, queue_payload)
print("Scheduled %s" % self.name)
return task_id
def find_or_create(self, index_path=None):
if not index_path:
worker_type = self.worker_type
index_by = json.dumps([worker_type, self.build_worker_payload()]).encode("utf-8")
index_path = "by-task-definition." + hashlib.sha256(index_by).hexdigest()
index_path = "%s.%s" % (CONFIG.index_prefix, index_path)
task_id = SHARED.found_or_created_indexed_tasks.get(index_path)
if task_id is not None:
return task_id
try:
result = self.index_service.findTask(index_path)
result = SHARED.index_service.findTask(index_path)
task_id = result["taskId"]
except taskcluster.TaskclusterRestFailure as e:
if e.status_code == 404:
task_id = self.create_task(
routes=[
"index." + index_path,
],
extra={
"index": {
"expires": self.from_now_json(self.docker_image_cache_expiry),
},
},
artifacts=[
(artifact, index_expiry)
for artifact in artifacts
],
**kwargs
)
else:
if e.status_code != 404:
raise
self.routes.append("index." + index_path)
task_id = self.create()
self.found_or_created_indices[index_path] = task_id
SHARED.found_or_created_indexed_tasks[index_path] = task_id
return task_id
def find_or_build_docker_image(self, dockerfile):
"""
Find a task that built a Docker image based on this `dockerfile`,
or schedule a new image-building task if needed.
Returns the task ID.
class GenericWorkerTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_run_time_minutes = 30
self.env = {}
self.mounts = []
self.artifacts = []
with_max_run_time_minutes = chaining(setattr, "max_run_time_minutes")
with_artifacts = chaining(append_to_attr, "artifacts")
with_mounts = chaining(append_to_attr, "mounts")
with_env = chaining(update_attr, "env")
def build_command(self):
raise NotImplementedError
def build_worker_payload(self):
worker_payload = {
"command": self.build_command(),
"maxRunTime": self.max_run_time_minutes * 60
}
return dict_update_if_truthy(
worker_payload,
env=self.env,
mounts=self.mounts,
artifacts=[
{
"type": "file",
"path": path,
"name": "public/" + url_basename(path),
"expires": SHARED.from_now_json(self.index_and_artifacts_expire_in),
}
for path in self.artifacts
],
)
def _mount_content(self, url_or_artifact_name, task_id, sha256):
if task_id:
content = {"taskId": task_id, "artifact": url_or_artifact_name}
else:
content = {"url": url_or_artifact_name}
if sha256:
content["sha256"] = sha256
return content
def with_file_mount(self, url_or_artifact_name, task_id=None, sha256=None, path=None):
return self.with_mounts({
"file": path or url_basename(url_or_artifact_name),
"content": self._mount_content(url_or_artifact_name, task_id, sha256),
})
def with_directory_mount(self, url_or_artifact_name, task_id=None, sha256=None, path=None):
supported_formats = ["rar", "tar.bz2", "tar.gz", "zip"]
for fmt in supported_formats:
suffix = "." + fmt
if url_or_artifact_name.endswith(suffix):
return self.with_mounts({
"directory": path or url_basename(url_or_artifact_name[:-len(suffix)]),
"content": self._mount_content(url_or_artifact_name, task_id, sha256),
"format": fmt,
})
raise ValueError(
"%r does not appear to be in one of the supported formats: %r"
% (url_or_artifact_name, ", ".join(supported_formats))
)
class WindowsGenericWorkerTask(GenericWorkerTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.scripts = []
with_script = chaining(append_to_attr, "scripts")
with_early_script = chaining(prepend_to_attr, "scripts")
def build_command(self):
return [deindent(s) for s in self.scripts]
def with_path_from_homedir(self, *paths):
for p in paths:
self.with_early_script("set PATH=%HOMEDRIVE%%HOMEPATH%\\{};%PATH%".format(p))
return self
def with_repo(self, sparse_checkout=None):
git = """
git init repo
cd repo
"""
if sparse_checkout:
git += """
git config core.sparsecheckout true
echo %SPARSE_CHECKOUT_BASE64% > .git\\info\\sparse.b64
certutil -decode .git\\info\\sparse.b64 .git\\info\\sparse-checkout
type .git\\info\\sparse-checkout
"""
self.env["SPARSE_CHECKOUT_BASE64"] = base64.b64encode(
"\n".join(sparse_checkout).encode("utf-8"))
git += """
git fetch --depth 1 %GIT_URL% %GIT_REF%
git reset --hard %GIT_SHA%
"""
return self \
.with_git() \
.with_script(git) \
.with_env(**git_env())
def with_git(self):
return self \
.with_path_from_homedir("git\\cmd") \
.with_directory_mount(
"https://github.com/git-for-windows/git/releases/download/" +
"v2.19.0.windows.1/MinGit-2.19.0-64-bit.zip",
sha256="424d24b5fc185a9c5488d7872262464f2facab4f1d4693ea8008196f14a3c19b",
path="git",
)
def with_rustup(self):
return self \
.with_path_from_homedir(".cargo\\bin") \
.with_early_script(
"%HOMEDRIVE%%HOMEPATH%\\rustup-init.exe --default-toolchain none -y"
) \
.with_file_mount(
"https://static.rust-lang.org/rustup/archive/" +
"1.13.0/i686-pc-windows-gnu/rustup-init.exe",
sha256="43072fbe6b38ab38cd872fa51a33ebd781f83a2d5e83013857fab31fc06e4bf0",
)
def with_repacked_msi(self, url, sha256, path):
repack_task = (
WindowsGenericWorkerTask("MSI repack: " + url)
.with_worker_type(self.worker_type)
.with_max_run_time_minutes(20)
.with_file_mount(url, sha256=sha256, path="input.msi")
.with_directory_mount(
"https://github.com/activescott/lessmsi/releases/download/" +
"v1.6.1/lessmsi-v1.6.1.zip",
sha256="540b8801e08ec39ba26a100c855898f455410cecbae4991afae7bb2b4df026c7",
path="lessmsi"
)
.with_directory_mount(
"https://www.7-zip.org/a/7za920.zip",
sha256="2a3afe19c180f8373fa02ff00254d5394fec0349f5804e0ad2f6067854ff28ac",
path="7zip",
)
.with_path_from_homedir("lessmsi", "7zip")
.with_script("""
lessmsi x input.msi extracted\\
cd extracted\\SourceDir
7za a repacked.zip *
""")
.with_artifacts("extracted/SourceDir/repacked.zip")
.with_index_and_artifacts_expire_in(CONFIG.repacked_msi_files_expire_in)
.find_or_create("repacked-msi." + sha256)
)
return self \
.with_dependencies(repack_task) \
.with_directory_mount("public/repacked.zip", task_id=repack_task, path=path)
def with_python2(self):
return self \
.with_repacked_msi(
"https://www.python.org/ftp/python/2.7.15/python-2.7.15.amd64.msi",
sha256="5e85f3c4c209de98480acbf2ba2e71a907fd5567a838ad4b6748c76deb286ad7",
path="python2"
) \
.with_early_script("""
python -m ensurepip
pip install virtualenv==16.0.0
""") \
.with_path_from_homedir("python2", "python2\\Scripts")
class DockerWorkerTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.docker_image = "ubuntu:bionic-20180821"
self.max_run_time_minutes = 30
self.scripts = []
self.env = {}
self.caches = {}
self.features = {}
self.artifacts = []
with_docker_image = chaining(setattr, "docker_image")
with_max_run_time_minutes = chaining(setattr, "max_run_time_minutes")
with_artifacts = chaining(append_to_attr, "artifacts")
with_script = chaining(append_to_attr, "scripts")
with_early_script = chaining(prepend_to_attr, "scripts")
with_caches = chaining(update_attr, "caches")
with_env = chaining(update_attr, "env")
def build_worker_payload(self):
worker_payload = {
"image": self.docker_image,
"maxRunTime": self.max_run_time_minutes * 60,
"command": [
"/bin/bash", "--login", "-x", "-e", "-c",
deindent("\n".join(self.scripts))
],
}
return dict_update_if_truthy(
worker_payload,
env=self.env,
cache=self.caches,
features=self.features,
artifacts={
"public/" + url_basename(path): {
"type": "file",
"path": path,
"expires": SHARED.from_now_json(self.index_and_artifacts_expire_in),
}
for path in self.artifacts
},
)
def with_features(self, *names):
self.features.update({name: True for name in names})
return self
def with_repo(self):
return self \
.with_env(**git_env()) \
.with_early_script("""
git init repo
cd repo
git fetch --depth 1 "$GIT_URL" "$GIT_REF"
git reset --hard "$GIT_SHA"
""")
def with_dockerfile(self, dockerfile):
basename = os.path.basename(dockerfile)
suffix = ".dockerfile"
assert basename.endswith(suffix)
image_name = basename[:-len(suffix)]
dockerfile_contents = expand_dockerfile(dockerfile)
digest = hashlib.sha256(dockerfile_contents).hexdigest()
return self.find_or_create_task(
index_bucket="docker-image",
index_key=digest,
index_expiry=self.docker_image_cache_expiry,
task_name="Docker image: " + image_name(dockerfile),
script="""
image_build_task = (
DockerWorkerTask("Docker image: " + image_name)
.with_worker_type(self.worker_type)
.with_max_run_time_minutes(30)
.with_index_and_artifacts_expire_in(CONFIG.docker_images_expire_in)
.with_features("dind")
.with_env(DOCKERFILE=dockerfile_contents)
.with_artifacts("/image.tar.lz4")
.with_script("""
echo "$DOCKERFILE" | docker build -t taskcluster-built -
docker save taskcluster-built | lz4 > /%s
""" % self.DOCKER_IMAGE_ARTIFACT_FILENAME,
env={
"DOCKERFILE": dockerfile_contents,
},
artifacts=[
"/" + self.DOCKER_IMAGE_ARTIFACT_FILENAME,
],
max_run_time_minutes=20,
docker_image=self.DOCKER_IMAGE_BUILDER_IMAGE,
features={
"dind": True, # docker-in-docker
},
with_repo=False,
docker save taskcluster-built | lz4 > /image.tar.lz4
""")
.with_docker_image(
# https://github.com/servo/taskcluster-bootstrap-docker-images#image-builder
"servobrowser/taskcluster-bootstrap:image-builder@sha256:" \
"0a7d012ce444d62ffb9e7f06f0c52fedc24b68c2060711b313263367f7272d9d"
)
.find_or_create("docker-image." + digest)
)
def create_task(self, *, task_name, script, max_run_time_minutes,
docker_image=None, dockerfile=None, # One of these is required
artifacts=None, dependencies=None, env=None, cache=None, scopes=None,
routes=None, extra=None, features=None, mounts=None, homedir_path=None,
worker_type=None, with_repo=True, sparse_checkout=None):
"""
Schedule a new task. Returns the new task ID.
One of `docker_image` or `dockerfile` (but not both) must be given.
If `dockerfile` is given, the corresponding Docker image is built as needed and cached.
`with_repo` indicates whether `script` should start in a clone of the git repository.
"""
# https://docs.taskcluster.net/docs/reference/workers/docker-worker/docs/environment
decision_task_id = os.environ["TASK_ID"]
dependencies = [decision_task_id] + (dependencies or [])
# Set in .taskcluster.yml
task_owner = os.environ["TASK_OWNER"]
task_source = os.environ["TASK_SOURCE"]
env = env or {}
if with_repo:
# Set in .taskcluster.yml
for k in ["GIT_URL", "GIT_REF", "GIT_SHA"]:
env[k] = os.environ[k]
worker_type = worker_type or self.default_worker_type
if "docker" in worker_type:
if docker_image and dockerfile:
raise TypeError("cannot use both `docker_image` or `dockerfile`")
if not docker_image and not dockerfile:
raise TypeError("need one of `docker_image` or `dockerfile`")
if dockerfile:
image_build_task = self.find_or_build_docker_image(dockerfile)
dependencies.append(image_build_task)
docker_image = {
"type": "task-image",
"taskId": image_build_task,
"path": "public/" + self.DOCKER_IMAGE_ARTIFACT_FILENAME,
}
if with_repo:
git = """
git init repo
cd repo
git fetch --depth 1 "$GIT_URL" "$GIT_REF"
git reset --hard "$GIT_SHA"
"""
script = git + script
command = ["/bin/bash", "--login", "-x", "-e", "-c", deindent(script)]
else:
command = [
"set PATH=%CD%\\{};%PATH%".format(p)
for p in reversed(homedir_path or [])
]
if with_repo:
if with_repo:
git = """
git init repo
cd repo
"""
if sparse_checkout:
git += """
git config core.sparsecheckout true
echo %SPARSE_CHECKOUT_BASE64% > .git\\info\\sparse.b64
certutil -decode .git\\info\\sparse.b64 .git\\info\\sparse-checkout
type .git\\info\\sparse-checkout
"""
env["SPARSE_CHECKOUT_BASE64"] = base64.b64encode(
"\n".join(sparse_checkout).encode("utf-8"))
command.append(deindent(git + """
git fetch --depth 1 %GIT_URL% %GIT_REF%
git reset --hard %GIT_SHA%
"""))
command.append(deindent(script))
worker_payload = {
"maxRunTime": max_run_time_minutes * 60,
"command": command,
"env": env,
}
if docker_image:
worker_payload["image"] = docker_image
if cache:
worker_payload["cache"] = cache
if features:
worker_payload["features"] = features
if mounts:
worker_payload["mounts"] = mounts
if artifacts:
if "docker" in worker_type:
worker_payload["artifacts"] = {
"public/" + os.path.basename(path): {
"type": "file",
"path": path,
"expires": self.from_now_json(expires),
}
for path, expires in artifacts
}
else:
worker_payload["artifacts"] = [
{
"type": "file",
"name": "public/" + os.path.basename(path),
"path": path,
"expires": self.from_now_json(expires),
}
for path, expires in artifacts
]
payload = {
"taskGroupId": decision_task_id,
"dependencies": dependencies or [],
"schedulerId": "taskcluster-github",
"provisionerId": "aws-provisioner-v1",
"workerType": worker_type,
"created": self.from_now_json(""),
"deadline": self.from_now_json("1 day"),
"metadata": {
"name": self.task_name_template % task_name,
"description": "",
"owner": task_owner,
"source": task_source,
},
"scopes": (scopes or []) + self.scopes_for_all_subtasks,
"routes": (routes or []) + self.routes_for_all_subtasks,
"extra": extra or {},
"payload": worker_payload,
}
task_id = taskcluster.slugId().decode("utf8")
self.queue_service.createTask(task_id, payload)
print("Scheduled %s" % task_name)
return task_id
def image_name(dockerfile):
"""
Guess a short name based on the path `dockerfile`.
"""
basename = os.path.basename(dockerfile)
suffix = ".dockerfile"
if basename == "Dockerfile":
return os.path.basename(os.path.dirname(os.path.abspath(dockerfile)))
elif basename.endswith(suffix):
return basename[:-len(suffix)]
else:
return basename
return self \
.with_dependencies(image_build_task) \
.with_docker_image({
"type": "task-image",
"path": "public/image.tar.lz4",
"taskId": image_build_task,
})
def expand_dockerfile(dockerfile):
@ -303,5 +483,26 @@ def expand_dockerfile(dockerfile):
return b"\n".join([expand_dockerfile(path), rest])
def git_env():
assert CONFIG.git_url
assert CONFIG.git_ref
assert CONFIG.git_sha
return {
"GIT_URL": CONFIG.git_url,
"GIT_REF": CONFIG.git_ref,
"GIT_SHA": CONFIG.git_sha,
}
def dict_update_if_truthy(d, **kwargs):
for key, value in kwargs.items():
if value:
d[key] = value
return d
def deindent(string):
return re.sub("\n +", " \n ", string).strip()
return re.sub("\n +", "\n ", string).strip()
def url_basename(url):
return url.rpartition("/")[-1]