Update web-platform-tests to revision 3b3585e368841b77caea8576fa56cef91c3fbdf0

This commit is contained in:
Ms2ger 2016-09-26 10:52:50 +02:00
parent d00639c55f
commit 3b4f0ec0bb
541 changed files with 14609 additions and 3288 deletions

View file

@ -0,0 +1 @@
from . import localpaths as _localpaths

View file

@ -12,10 +12,10 @@ import sys
from collections import defaultdict
from .. import localpaths
from ..localpaths import repo_root
from manifest.sourcefile import SourceFile
from six import iteritems
from six import iteritems, itervalues
from six.moves import range
here = os.path.abspath(os.path.split(__file__)[0])
@ -52,6 +52,7 @@ def parse_whitelist(f):
"""
data = defaultdict(lambda:defaultdict(set))
ignored_files = set()
for line in f:
line = line.strip()
@ -64,9 +65,13 @@ def parse_whitelist(f):
parts[-1] = int(parts[-1])
error_type, file_match, line_number = parts
data[file_match][error_type].add(line_number)
return data
if error_type == "*":
ignored_files.add(file_match)
else:
data[file_match][error_type].add(line_number)
return data, ignored_files
def filter_whitelist_errors(data, path, errors):
@ -79,9 +84,7 @@ def filter_whitelist_errors(data, path, errors):
for file_match, whitelist_errors in iteritems(data):
if fnmatch.fnmatch(path, file_match):
for i, (error_type, msg, path, line) in enumerate(errors):
if "*" in whitelist_errors:
whitelisted[i] = True
elif error_type in whitelist_errors:
if error_type in whitelist_errors:
allowed_lines = whitelist_errors[error_type]
if None in allowed_lines or line in allowed_lines:
whitelisted[i] = True
@ -240,6 +243,14 @@ def check_parsed(repo_root, path, f):
if all(seen_elements[name] for name in required_elements):
break
for element in source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src]"):
src = element.attrib["src"]
for name in ["testharness", "testharnessreport"]:
if "%s.js" % name == src or ("/%s.js" % name in src and src != "/resources/%s.js" % name):
errors.append(("%s-PATH" % name.upper(), "%s.js script seen with incorrect path" % name, path, None))
return errors
class ASTCheck(object):
@ -347,7 +358,6 @@ def parse_args():
return parser.parse_args()
def main():
repo_root = localpaths.repo_root
args = parse_args()
paths = args.paths if args.paths else all_git_paths(repo_root)
return lint(repo_root, paths, args.json)
@ -357,7 +367,7 @@ def lint(repo_root, paths, output_json):
last = None
with open(os.path.join(repo_root, "lint.whitelist")) as f:
whitelist = parse_whitelist(f)
whitelist, ignored_files = parse_whitelist(f)
if output_json:
output_errors = output_errors_json
@ -390,11 +400,14 @@ def lint(repo_root, paths, output_json):
if not os.path.exists(abs_path):
continue
if any(fnmatch.fnmatch(path, file_match) for file_match in ignored_files):
continue
errors = check_path(repo_root, path)
last = process_errors(path, errors) or last
if not os.path.isdir(abs_path):
with open(abs_path) as f:
with open(abs_path, 'rb') as f:
errors = check_file_contents(repo_root, path, f)
last = process_errors(path, errors) or last
@ -402,7 +415,7 @@ def lint(repo_root, paths, output_json):
output_error_count(error_count)
if error_count:
print(ERROR_MSG % (last[0], last[1], last[0], last[1]))
return sum(error_count.itervalues())
return sum(itervalues(error_count))
path_lints = [check_path_length]
file_lints = [check_regexp_line, check_parsed, check_python_ast]

View file

@ -0,0 +1 @@
THIS LINE HAS TRAILING WHITESPACE

View file

@ -0,0 +1 @@
THIS LINE HAS TRAILING WHITESPACE

View file

@ -0,0 +1 @@
*:broken_ignored.html

View file

@ -0,0 +1 @@
THIS LINE HAS NO TRAILING WHITESPACE

View file

@ -221,6 +221,75 @@ def test_present_testharnesscss():
]
def test_testharness_path():
code = b"""\
<html xmlns="http://www.w3.org/1999/xhtml">
<script src="testharness.js"></script>
<script src="resources/testharness.js"></script>
<script src="../resources/testharness.js"></script>
<script src="http://w3c-test.org/resources/testharness.js"></script>
</html>
"""
error_map = check_with_files(code)
for (filename, (errors, kind)) in error_map.items():
expected = [("W3C-TEST.ORG", "External w3c-test.org domain used", filename, 5)]
if kind == "python":
expected.append(("PARSE-FAILED", "Unable to parse file", filename, 1))
elif kind in ["web-lax", "web-strict"]:
expected.extend([
("TESTHARNESS-PATH", "testharness.js script seen with incorrect path", filename, None),
("TESTHARNESS-PATH", "testharness.js script seen with incorrect path", filename, None),
("TESTHARNESS-PATH", "testharness.js script seen with incorrect path", filename, None),
("TESTHARNESS-PATH", "testharness.js script seen with incorrect path", filename, None),
])
assert errors == expected
def test_testharnessreport_path():
code = b"""\
<html xmlns="http://www.w3.org/1999/xhtml">
<script src="testharnessreport.js"></script>
<script src="resources/testharnessreport.js"></script>
<script src="../resources/testharnessreport.js"></script>
<script src="http://w3c-test.org/resources/testharnessreport.js"></script>
</html>
"""
error_map = check_with_files(code)
for (filename, (errors, kind)) in error_map.items():
expected = [("W3C-TEST.ORG", "External w3c-test.org domain used", filename, 5)]
if kind == "python":
expected.append(("PARSE-FAILED", "Unable to parse file", filename, 1))
elif kind in ["web-lax", "web-strict"]:
expected.extend([
("TESTHARNESSREPORT-PATH", "testharnessreport.js script seen with incorrect path", filename, None),
("TESTHARNESSREPORT-PATH", "testharnessreport.js script seen with incorrect path", filename, None),
("TESTHARNESSREPORT-PATH", "testharnessreport.js script seen with incorrect path", filename, None),
("TESTHARNESSREPORT-PATH", "testharnessreport.js script seen with incorrect path", filename, None),
])
assert errors == expected
def test_not_testharness_path():
code = b"""\
<html xmlns="http://www.w3.org/1999/xhtml">
<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>
<script src="resources/webperftestharness.js"></script>
</html>
"""
error_map = check_with_files(code)
for (filename, (errors, kind)) in error_map.items():
if kind == "python":
assert errors == [
("PARSE-FAILED", "Unable to parse file", filename, 1),
]
else:
assert errors == []
@pytest.mark.skipif(six.PY3, reason="Cannot parse print statements from python 3")
def test_print_statement():
error_map = check_with_files(b"def foo():\n print 'statement'\n print\n")

View file

@ -1,9 +1,23 @@
from __future__ import unicode_literals
from ..lint import filter_whitelist_errors, parse_whitelist
import os
import mock
import pytest
import six
def test_lint():
from .. import lint as lint_mod
from ..lint import filter_whitelist_errors, parse_whitelist, lint
_dummy_repo = os.path.join(os.path.dirname(__file__), "dummy")
def _mock_lint(name):
wrapped = getattr(lint_mod, name)
return mock.patch(lint_mod.__name__ + "." + name, wraps=wrapped)
def test_filter_whitelist_errors():
filtered = filter_whitelist_errors({}, '', [])
assert filtered == []
@ -26,10 +40,7 @@ CONSOLE:streams/resources/test-utils.js: 12
*:resources/*
""")
expected = {
'*.pdf': {
'*': {None},
},
expected_data = {
'.gitmodules': {
'INDENT TABS': {None},
},
@ -37,9 +48,6 @@ CONSOLE:streams/resources/test-utils.js: 12
'TRAILING WHITESPACE': {None},
'INDENT TABS': {None},
},
'resources/*': {
'*': {None},
},
'streams/resources/test-utils.js': {
'CONSOLE': {12},
'CR AT EOL': {None},
@ -51,4 +59,80 @@ CONSOLE:streams/resources/test-utils.js: 12
'CR AT EOL': {None},
},
}
assert parse_whitelist(input_buffer) == expected
expected_ignored = {"*.pdf", "resources/*"}
data, ignored = parse_whitelist(input_buffer)
assert data == expected_data
assert ignored == expected_ignored
def test_lint_no_files(capsys):
rv = lint(_dummy_repo, [], False)
assert rv == 0
out, err = capsys.readouterr()
assert out == ""
assert err == ""
def test_lint_ignored_file(capsys):
with _mock_lint("check_path") as mocked_check_path:
with _mock_lint("check_file_contents") as mocked_check_file_contents:
rv = lint(_dummy_repo, ["broken_ignored.html"], False)
assert rv == 0
assert not mocked_check_path.called
assert not mocked_check_file_contents.called
out, err = capsys.readouterr()
assert out == ""
assert err == ""
def test_lint_not_existing_file(capsys):
with _mock_lint("check_path") as mocked_check_path:
with _mock_lint("check_file_contents") as mocked_check_file_contents:
# really long path-linted filename
name = "a" * 256 + ".html"
rv = lint(_dummy_repo, [name], False)
assert rv == 0
assert not mocked_check_path.called
assert not mocked_check_file_contents.called
out, err = capsys.readouterr()
assert out == ""
assert err == ""
def test_lint_passing(capsys):
with _mock_lint("check_path") as mocked_check_path:
with _mock_lint("check_file_contents") as mocked_check_file_contents:
rv = lint(_dummy_repo, ["okay.html"], False)
assert rv == 0
assert mocked_check_path.call_count == 1
assert mocked_check_file_contents.call_count == 1
out, err = capsys.readouterr()
assert out == ""
assert err == ""
def test_lint_failing(capsys):
with _mock_lint("check_path") as mocked_check_path:
with _mock_lint("check_file_contents") as mocked_check_file_contents:
rv = lint(_dummy_repo, ["broken.html"], False)
assert rv == 1
assert mocked_check_path.call_count == 1
assert mocked_check_file_contents.call_count == 1
out, err = capsys.readouterr()
assert "TRAILING WHITESPACE" in out
assert "broken.html 1 " in out
assert err == ""
def test_lint_passing_and_failing(capsys):
with _mock_lint("check_path") as mocked_check_path:
with _mock_lint("check_file_contents") as mocked_check_file_contents:
rv = lint(_dummy_repo, ["broken.html", "okay.html"], False)
assert rv == 1
assert mocked_check_path.call_count == 2
assert mocked_check_file_contents.call_count == 2
out, err = capsys.readouterr()
assert "TRAILING WHITESPACE" in out
assert "broken.html 1 " in out
assert "okay.html" not in out
assert err == ""

View file

@ -4,11 +4,11 @@ import sys
here = os.path.abspath(os.path.split(__file__)[0])
repo_root = os.path.abspath(os.path.join(here, os.pardir))
sys.path.insert(0, os.path.join(repo_root, "tools"))
sys.path.insert(0, os.path.join(repo_root, "tools", "six"))
sys.path.insert(0, os.path.join(repo_root, "tools", "html5lib"))
sys.path.insert(0, os.path.join(repo_root, "tools", "wptserve"))
sys.path.insert(0, os.path.join(repo_root, "tools", "pywebsocket", "src"))
sys.path.insert(0, os.path.join(repo_root, "tools", "py"))
sys.path.insert(0, os.path.join(repo_root, "tools", "pytest"))
sys.path.insert(0, os.path.join(repo_root, "tools", "webdriver"))
sys.path.insert(0, os.path.join(here))
sys.path.insert(0, os.path.join(here, "six"))
sys.path.insert(0, os.path.join(here, "html5lib"))
sys.path.insert(0, os.path.join(here, "wptserve"))
sys.path.insert(0, os.path.join(here, "pywebsocket", "src"))
sys.path.insert(0, os.path.join(here, "py"))
sys.path.insert(0, os.path.join(here, "pytest"))
sys.path.insert(0, os.path.join(here, "webdriver"))

View file

@ -46,6 +46,8 @@ class Manifest(object):
if item_type == "reftest":
for path, items in self.local_changes.iterdeletedreftests():
paths[path] -= items
if len(paths[path]) == 0:
del paths[path]
yield item_type, paths
@ -56,13 +58,10 @@ class Manifest(object):
if item is None:
return
is_reference = False
if isinstance(item, RefTest):
self.reftest_nodes[item.path].add(item)
self.reftest_nodes_by_url[item.url] = item
is_reference = item.is_reference
if not is_reference:
else:
self._add(item)
item.manifest = self
@ -281,6 +280,7 @@ class Manifest(object):
tests_root,
obj["local_changes"],
source_files=source_files)
self.update_reftests()
return self
@ -297,13 +297,10 @@ class LocalChanges(object):
if item is None:
return
is_reference = False
if isinstance(item, RefTest):
self.reftest_nodes[item.path].add(item)
self.reftest_nodes_by_url[item.url] = item
is_reference = item.is_reference
if not is_reference:
else:
self._add(item)
item.manifest = self.manifest

View file

@ -7,9 +7,6 @@ try:
except ImportError:
from xml.etree import ElementTree
here = os.path.dirname(__file__)
localpaths = imp.load_source("localpaths", os.path.abspath(os.path.join(here, os.pardir, "localpaths.py")))
import html5lib
from . import vcs

View file

@ -6,6 +6,7 @@ def test_local_reftest_add():
s = sourcefile.SourceFile("/", "test", "/")
test = manifestitem.RefTest(s, "/test", [("/ref", "==")])
m.local_changes.add(test)
m.update_reftests()
assert list(m) == [(test.path, {test})]
@ -15,6 +16,7 @@ def test_local_reftest_delete_path():
test = manifestitem.RefTest(s, "/test", [("/ref", "==")])
m.add(test)
m.local_changes.add_deleted(test.path)
m.update_reftests()
assert list(m) == []
@ -23,18 +25,19 @@ def test_local_reftest_adjusted():
s = sourcefile.SourceFile("/", "test", "/")
test = manifestitem.RefTest(s, "/test", [("/ref", "==")])
m.add(test)
assert list(m) == [(test.path, {test})]
m.update_reftests()
assert m.compute_reftests({test.path: {test}}) == {test}
test_1 = manifestitem.RefTest(s, "/test-1", [("/test", "==")])
assert list(m) == [(test.path, {test})]
s_1 = sourcefile.SourceFile("/", "test-1", "/")
test_1 = manifestitem.RefTest(s_1, "/test-1", [("/test", "==")])
m.local_changes.add(test_1)
m.update_reftests()
assert m.compute_reftests({test.path: {test}, test_1.path: {test_1}}) == {test_1}
m.local_changes._deleted_reftests[test.path] = {test}
assert list(m) == [(test_1.path, {test_1})]
@ -43,9 +46,11 @@ def test_manifest_to_json():
s = sourcefile.SourceFile("/", "test", "/")
test = manifestitem.RefTest(s, "/test", [("/ref", "==")])
m.add(test)
test_1 = manifestitem.RefTest(s, "/test-1", [("/test", "==")])
s_1 = sourcefile.SourceFile("/", "test-1", "/")
test_1 = manifestitem.RefTest(s_1, "/test-1", [("/test", "==")])
m.local_changes.add(test_1)
m.local_changes._deleted_reftests[test.path] = {test}
m.local_changes.add_deleted(test.path)
m.update_reftests()
json_str = m.to_json()
loaded = manifest.Manifest.from_json("/", json_str)
@ -53,3 +58,23 @@ def test_manifest_to_json():
assert list(loaded) == list(m)
assert loaded.to_json() == json_str
def test_reftest_computation_chain():
m = manifest.Manifest()
s1 = sourcefile.SourceFile("/", "test1", "/")
s2 = sourcefile.SourceFile("/", "test2", "/")
test1 = manifestitem.RefTest(s1, "/test1", [("/test3", "==")])
test2 = manifestitem.RefTest(s2, "/test2", [("/test1", "==")])
m.add(test1)
m.add(test2)
m.update_reftests()
assert m.reftest_nodes == {'test1': {test1},
'test2': {test2}}
assert list(m) == [("test2", {test2})]
assert list(m.local_changes.itertypes()) == []

View file

@ -130,6 +130,8 @@ class GitTree(TestTree):
elif staged == "?" and worktree == "?":
# A new file. If it's a directory, recurse into it
if os.path.isdir(os.path.join(self.tests_root, filename)):
if filename[-1] != '/':
filename += '/'
rv.update(self.local_changes(filename))
else:
rv[filename] = "modified"

View file

@ -10,7 +10,6 @@ from .log import get_logger
from .tree import GitTree, NoVCSTree
here = os.path.dirname(__file__)
localpaths = imp.load_source("localpaths", os.path.abspath(os.path.join(here, os.pardir, "localpaths.py")))
def update(tests_root, url_base, manifest, ignore_local=False):
if vcs.is_git_repo(tests_root):

View file

@ -16,7 +16,7 @@ import uuid
from collections import defaultdict, OrderedDict
from multiprocessing import Process, Event
from .. import localpaths
from ..localpaths import repo_root
import sslutils
from wptserve import server as wptserve, handlers
@ -24,8 +24,6 @@ from wptserve import stash
from wptserve.logger import set_logger
from mod_pywebsocket import standalone as pywebsocket
repo_root = localpaths.repo_root
def replace_end(s, old, new):
"""
Given a string `s` that ends with `old`, replace that occurrence of `old`

View file

@ -10,6 +10,7 @@ deps =
{toxinidir}/html5lib
pytest-travis-fold
coverage
mock
commands =
coverage run -m pytest

View file

@ -0,0 +1,24 @@
language: python
sudo: false
cache:
directories:
- $HOME/.cache/pip
matrix:
include:
- python: 2.7
env: TOXENV=py27
- python: pypy
env: TOXENV=pypy
install:
- pip install -U tox codecov
script:
- tox
after_success:
- coverage combine
- codecov

View file

@ -1,3 +1,5 @@
from __future__ import print_function
import base64
import logging
import os
@ -10,6 +12,8 @@ import wptserve
logging.basicConfig()
wptserve.logger.set_logger(logging.getLogger())
here = os.path.split(__file__)[0]
doc_root = os.path.join(here, "docroot")
@ -24,7 +28,7 @@ class Request(urllib2.Request):
def add_data(self, data):
if hasattr(data, "iteritems"):
data = urllib.urlencode(data)
print data
print(data)
self.add_header("Content-Length", str(len(data)))
urllib2.Request.add_data(self, data)
@ -56,6 +60,6 @@ class TestUsingServer(unittest.TestCase):
req.add_data(body)
if auth is not None:
req.add_header("Authorization", "Basic %s" % base64.encodestring('%s:%s' % auth))
req.add_header("Authorization", "Basic %s" % base64.b64encode('%s:%s' % auth))
return urllib2.urlopen(req)

View file

@ -0,0 +1,3 @@
# Oops...
def main(request, response
return "FAIL"

View file

@ -0,0 +1,3 @@
# Oops...
def mian(request, response):
return "FAIL"

View file

@ -0,0 +1 @@
{{host}} {{domains[]}} {{ports[http][0]}}

View file

@ -0,0 +1 @@
{{headers[X-Test]}}

View file

@ -0,0 +1 @@
I am here to ensure that my containing directory exists.

View file

@ -1,3 +1,5 @@
Custom-Header: PASS
Another-Header: {{$id:uuid()}}
Same-Value-Header: {{$id}}
Double-Header: PA
Double-Header: SS

View file

@ -1,10 +1,7 @@
import os
import unittest
import urllib2
import json
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestResponseSetCookie(TestUsingServer):
def test_name_value(self):
@ -17,7 +14,7 @@ class TestResponseSetCookie(TestUsingServer):
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(resp.info()["Set-Cookie"], "name=value; Path=/")
self.assertEqual(resp.info()["Set-Cookie"], "name=value; Path=/")
def test_unset(self):
@wptserve.handlers.handler
@ -45,8 +42,8 @@ class TestResponseSetCookie(TestUsingServer):
parts = dict(item.split("=") for
item in resp.info()["Set-Cookie"].split("; ") if item)
self.assertEquals(parts["name"], "")
self.assertEquals(parts["Path"], "/")
self.assertEqual(parts["name"], "")
self.assertEqual(parts["Path"], "/")
#Should also check that expires is in the past
class TestRequestCookies(TestUsingServer):
@ -58,8 +55,7 @@ class TestRequestCookies(TestUsingServer):
route = ("GET", "/test/set_cookie", handler)
self.server.router.register(*route)
resp = self.request(route[1], headers={"Cookie": "name=value"})
self.assertEquals(resp.read(), "value")
self.assertEqual(resp.read(), b"value")
if __name__ == '__main__':
unittest.main()

View file

@ -1,85 +1,101 @@
import json
import os
import pytest
import unittest
import urllib2
import uuid
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer, doc_root
class TestFileHandler(TestUsingServer):
def test_GET(self):
resp = self.request("/document.txt")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/plain", resp.info()["Content-Type"])
self.assertEquals(open(os.path.join(doc_root, "document.txt")).read(), resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/plain", resp.info()["Content-Type"])
self.assertEqual(open(os.path.join(doc_root, "document.txt"), 'rb').read(), resp.read())
def test_headers(self):
resp = self.request("/with_headers.txt")
self.assertEquals(200, resp.getcode())
self.assertEquals("PASS", resp.info()["Custom-Header"])
self.assertEqual(200, resp.getcode())
self.assertEqual("PASS", resp.info()["Custom-Header"])
# This will fail if it isn't a valid uuid
uuid.UUID(resp.info()["Another-Header"])
self.assertEquals(resp.info()["Same-Value-Header"], resp.info()["Another-Header"])
self.assertEqual(resp.info()["Same-Value-Header"], resp.info()["Another-Header"])
self.assertEqual(resp.info()["Double-Header"], "PA, SS")
def test_range(self):
resp = self.request("/document.txt", headers={"Range":"bytes=10-19"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(10, len(data))
self.assertEquals("bytes 10-19/%i" % len(expected), resp.info()['Content-Range'])
self.assertEquals("10", resp.info()['Content-Length'])
self.assertEquals(expected[10:20], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(10, len(data))
self.assertEqual("bytes 10-19/%i" % len(expected), resp.info()['Content-Range'])
self.assertEqual("10", resp.info()['Content-Length'])
self.assertEqual(expected[10:20], data)
def test_range_no_end(self):
resp = self.request("/document.txt", headers={"Range":"bytes=10-"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(len(expected) - 10, len(data))
self.assertEquals("bytes 10-%i/%i" % (len(expected) - 1, len(expected)), resp.info()['Content-Range'])
self.assertEquals(expected[10:], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(len(expected) - 10, len(data))
self.assertEqual("bytes 10-%i/%i" % (len(expected) - 1, len(expected)), resp.info()['Content-Range'])
self.assertEqual(expected[10:], data)
def test_range_no_start(self):
resp = self.request("/document.txt", headers={"Range":"bytes=-10"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(10, len(data))
self.assertEquals("bytes %i-%i/%i" % (len(expected) - 10,
len(expected) - 1,
len(expected)), resp.info()['Content-Range'])
self.assertEquals(expected[-10:], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(10, len(data))
self.assertEqual("bytes %i-%i/%i" % (len(expected) - 10, len(expected) - 1, len(expected)),
resp.info()['Content-Range'])
self.assertEqual(expected[-10:], data)
def test_multiple_ranges(self):
resp = self.request("/document.txt", headers={"Range":"bytes=1-2,5-7,6-10"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertTrue(resp.info()["Content-Type"].startswith("multipart/byteranges; boundary="))
boundary = resp.info()["Content-Type"].split("boundary=")[1]
parts = data.split("--" + boundary)
self.assertEquals("\r\n", parts[0])
self.assertEquals("--", parts[-1])
self.assertEqual("\r\n", parts[0])
self.assertEqual("--", parts[-1])
expected_parts = [("1-2", expected[1:3]), ("5-10", expected[5:11])]
for expected_part, part in zip(expected_parts, parts[1:-1]):
header_string, body = part.split("\r\n\r\n")
headers = dict(item.split(": ", 1) for item in header_string.split("\r\n") if item.strip())
self.assertEquals(headers["Content-Type"], "text/plain")
self.assertEquals(headers["Content-Range"], "bytes %s/%i" % (expected_part[0], len(expected)))
self.assertEquals(expected_part[1] + "\r\n", body)
self.assertEqual(headers["Content-Type"], "text/plain")
self.assertEqual(headers["Content-Range"], "bytes %s/%i" % (expected_part[0], len(expected)))
self.assertEqual(expected_part[1] + "\r\n", body)
def test_range_invalid(self):
with self.assertRaises(urllib2.HTTPError) as cm:
self.request("/document.txt", headers={"Range":"bytes=11-10"})
self.assertEquals(cm.exception.code, 416)
self.assertEqual(cm.exception.code, 416)
expected = open(os.path.join(doc_root, "document.txt")).read()
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
with self.assertRaises(urllib2.HTTPError) as cm:
self.request("/document.txt", headers={"Range":"bytes=%i-%i" % (len(expected), len(expected) + 10)})
self.assertEquals(cm.exception.code, 416)
self.assertEqual(cm.exception.code, 416)
def test_sub_config(self):
resp = self.request("/sub.sub.txt")
expected = b"localhost localhost %i" % self.server.port
assert resp.read().rstrip() == expected
def test_sub_headers(self):
resp = self.request("/sub_headers.sub.txt", headers={"X-Test": "PASS"})
expected = b"PASS"
assert resp.read().rstrip() == expected
def test_sub_params(self):
resp = self.request("/sub_params.sub.txt", query="test=PASS")
expected = b"PASS"
assert resp.read().rstrip() == expected
class TestFunctionHandler(TestUsingServer):
@ -91,9 +107,22 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_string_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("9", resp.info()["Content-Length"])
self.assertEquals("test data", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("9", resp.info()["Content-Length"])
self.assertEqual("test data", resp.read())
def test_tuple_1_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return ()
route = ("GET", "/test/test_tuple_1_rv", handler)
self.server.router.register(*route)
with pytest.raises(urllib2.HTTPError) as cm:
self.request(route[1])
assert cm.value.code == 500
def test_tuple_2_rv(self):
@wptserve.handlers.handler
@ -103,10 +132,10 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_2_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("4", resp.info()["Content-Length"])
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("4", resp.info()["Content-Length"])
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test", resp.read())
def test_tuple_3_rv(self):
@wptserve.handlers.handler
@ -116,9 +145,9 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_3_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test data", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test data", resp.read())
def test_tuple_3_rv_1(self):
@wptserve.handlers.handler
@ -128,10 +157,36 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_3_rv_1", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("Some Status", resp.msg)
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test data", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Some Status", resp.msg)
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test data", resp.read())
def test_tuple_4_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return 202, [("test-header", "test-value")], "test data", "garbage"
route = ("GET", "/test/test_tuple_1_rv", handler)
self.server.router.register(*route)
with pytest.raises(urllib2.HTTPError) as cm:
self.request(route[1])
assert cm.value.code == 500
def test_none_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return None
route = ("GET", "/test/test_none_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
assert resp.getcode() == 200
assert "Content-Length" not in resp.info()
assert resp.read() == b""
class TestJSONHandler(TestUsingServer):
def test_json_0(self):
@ -142,8 +197,8 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_0", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(200, resp.getcode())
self.assertEqual({"data": "test data"}, json.load(resp))
def test_json_tuple_2(self):
@wptserve.handlers.json_handler
@ -153,9 +208,9 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_tuple_2", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(200, resp.getcode())
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual({"data": "test data"}, json.load(resp))
def test_json_tuple_3(self):
@wptserve.handlers.json_handler
@ -165,47 +220,78 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_tuple_2", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual({"data": "test data"}, json.load(resp))
class TestPythonHandler(TestUsingServer):
def test_string(self):
resp = self.request("/test_string.py")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/plain", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/plain", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.read())
def test_tuple_2(self):
resp = self.request("/test_tuple_2.py")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("PASS", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/html", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("PASS", resp.read())
def test_tuple_3(self):
resp = self.request("/test_tuple_3.py")
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("PASS", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("text/html", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("PASS", resp.read())
def test_no_main(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/no_main.py")
assert cm.value.code == 500
def test_invalid(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/invalid.py")
assert cm.value.code == 500
def test_missing(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/missing.py")
assert cm.value.code == 404
class TestDirectoryHandler(TestUsingServer):
def test_directory(self):
resp = self.request("/")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEqual(200, resp.getcode())
self.assertEqual("text/html", resp.info()["Content-Type"])
#Add a check that the response is actually sane
def test_subdirectory_trailing_slash(self):
resp = self.request("/subdir/")
assert resp.getcode() == 200
assert resp.info()["Content-Type"] == "text/html"
def test_subdirectory_no_trailing_slash(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/subdir")
assert cm.value.code == 404
class TestAsIsHandler(TestUsingServer):
def test_as_is(self):
resp = self.request("/test.asis")
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("Content", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("Content", resp.read())
#Add a check that the response is actually sane
if __name__ == '__main__':

View file

@ -1,70 +1,67 @@
import os
import unittest
import urllib2
import json
import time
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer, doc_root
class TestStatus(TestUsingServer):
def test_status(self):
resp = self.request("/document.txt", query="pipe=status(202)")
self.assertEquals(resp.getcode(), 202)
self.assertEqual(resp.getcode(), 202)
class TestHeader(TestUsingServer):
def test_not_set(self):
resp = self.request("/document.txt", query="pipe=header(X-TEST,PASS)")
self.assertEquals(resp.info()["X-TEST"], "PASS")
self.assertEqual(resp.info()["X-TEST"], "PASS")
def test_set(self):
resp = self.request("/document.txt", query="pipe=header(Content-Type,text/html)")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple(self):
resp = self.request("/document.txt", query="pipe=header(X-Test,PASS)|header(Content-Type,text/html)")
self.assertEquals(resp.info()["X-TEST"], "PASS")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["X-TEST"], "PASS")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple_same(self):
resp = self.request("/document.txt", query="pipe=header(Content-Type,FAIL)|header(Content-Type,text/html)")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple_append(self):
resp = self.request("/document.txt", query="pipe=header(X-Test,1)|header(X-Test,2,True)")
self.assertEquals(resp.info()["X-Test"], "1, 2")
self.assertEqual(resp.info()["X-Test"], "1, 2")
class TestSlice(TestUsingServer):
def test_both_bounds(self):
resp = self.request("/document.txt", query="pipe=slice(1,10)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[1:10])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[1:10])
def test_no_upper(self):
resp = self.request("/document.txt", query="pipe=slice(1)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[1:])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[1:])
def test_no_lower(self):
resp = self.request("/document.txt", query="pipe=slice(null,10)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[:10])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[:10])
class TestSub(TestUsingServer):
def test_sub_config(self):
resp = self.request("/sub.txt", query="pipe=sub")
expected = "localhost localhost %i\n" % self.server.port
self.assertEquals(resp.read(), expected)
expected = "localhost localhost %i" % self.server.port
self.assertEqual(resp.read().rstrip(), expected)
def test_sub_headers(self):
resp = self.request("/sub_headers.txt", query="pipe=sub", headers={"X-Test": "PASS"})
expected = "PASS\n"
self.assertEquals(resp.read(), expected)
expected = "PASS"
self.assertEqual(resp.read().rstrip(), expected)
def test_sub_params(self):
resp = self.request("/sub_params.txt", query="test=PASS&pipe=sub")
expected = "PASS\n"
self.assertEquals(resp.read(), expected)
expected = "PASS"
self.assertEqual(resp.read().rstrip(), expected)
class TestTrickle(TestUsingServer):
def test_trickle(self):
@ -72,8 +69,8 @@ class TestTrickle(TestUsingServer):
t0 = time.time()
resp = self.request("/document.txt", query="pipe=trickle(1:d2:5:d1:r2)")
t1 = time.time()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected)
self.assertGreater(6, t1-t0)
if __name__ == '__main__':

View file

@ -1,11 +1,7 @@
import os
import unittest
import urllib2
import json
import time
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestInputFile(TestUsingServer):
def test_seek(self):
@ -31,10 +27,10 @@ class TestInputFile(TestUsingServer):
route = ("POST", "/test/test_seek", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345ab\ncdef")
self.assertEquals(200, resp.getcode())
self.assertEquals(["ab", "7", "12345ab\n", "8", "cdef", "12",
"12345ab\ncdef", "12345ab\n", "cdef"],
resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["ab", "7", "12345ab\n", "8", "cdef", "12",
"12345ab\ncdef", "12345ab\n", "cdef"],
resp.read().split(" "))
def test_iter(self):
@wptserve.handlers.handler
@ -45,8 +41,8 @@ class TestInputFile(TestUsingServer):
route = ("POST", "/test/test_iter", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345\nabcdef\r\nzyxwv")
self.assertEquals(200, resp.getcode())
self.assertEquals(["12345\n", "abcdef\r\n", "zyxwv"], resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["12345\n", "abcdef\r\n", "zyxwv"], resp.read().split(" "))
class TestRequest(TestUsingServer):
def test_body(self):
@ -58,7 +54,7 @@ class TestRequest(TestUsingServer):
route = ("POST", "/test/test_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345ab\ncdef")
self.assertEquals("12345ab\ncdef", resp.read())
self.assertEqual("12345ab\ncdef", resp.read())
def test_route_match(self):
@wptserve.handlers.handler
@ -68,7 +64,7 @@ class TestRequest(TestUsingServer):
route = ("GET", "/test/{match}_*", handler)
self.server.router.register(*route)
resp = self.request("/test/some_route")
self.assertEquals("some route", resp.read())
self.assertEqual("some route", resp.read())
class TestAuth(TestUsingServer):
def test_auth(self):
@ -79,8 +75,8 @@ class TestAuth(TestUsingServer):
route = ("GET", "/test/test_auth", handler)
self.server.router.register(*route)
resp = self.request(route[1], auth=("test", "PASS"))
self.assertEquals(200, resp.getcode())
self.assertEquals(["test", "PASS"], resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["test", "PASS"], resp.read().split(" "))
if __name__ == '__main__':
unittest.main()

View file

@ -1,12 +1,8 @@
import os
import unittest
import urllib2
import json
import time
from types import MethodType
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
def send_body_as_header(self):
if self._response.add_required_headers:
@ -27,9 +23,9 @@ class TestResponse(TestUsingServer):
route = ("GET", "/test/test_head_without_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="HEAD")
self.assertEquals("6", resp.info()['Content-Length'])
self.assertEquals("TEST", resp.info()['x-Test'])
self.assertEquals("", resp.info()['x-body'])
self.assertEqual("6", resp.info()['Content-Length'])
self.assertEqual("TEST", resp.info()['x-Test'])
self.assertEqual("", resp.info()['x-body'])
def test_head_with_body(self):
@wptserve.handlers.handler
@ -43,9 +39,9 @@ class TestResponse(TestUsingServer):
route = ("GET", "/test/test_head_with_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="HEAD")
self.assertEquals("6", resp.info()['Content-Length'])
self.assertEquals("TEST", resp.info()['x-Test'])
self.assertEquals("body", resp.info()['X-Body'])
self.assertEqual("6", resp.info()['Content-Length'])
self.assertEqual("TEST", resp.info()['x-Test'])
self.assertEqual("body", resp.info()['X-Body'])
if __name__ == '__main__':
unittest.main()

View file

@ -1,17 +1,15 @@
import os
import unittest
import urllib2
import json
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestFileHandler(TestUsingServer):
def test_not_handled(self):
with self.assertRaises(urllib2.HTTPError) as cm:
resp = self.request("/not_existing")
self.assertEquals(cm.exception.code, 404)
self.assertEqual(cm.exception.code, 404)
class TestRewriter(TestUsingServer):
def test_rewrite(self):
@ -23,8 +21,8 @@ class TestRewriter(TestUsingServer):
self.server.rewriter.register("GET", "/test/original", route[1])
self.server.router.register(*route)
resp = self.request("/test/original")
self.assertEquals(200, resp.getcode())
self.assertEquals("/test/rewritten", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("/test/rewritten", resp.read())
class TestRequestHandler(TestUsingServer):
def test_exception(self):
@ -37,7 +35,7 @@ class TestRequestHandler(TestUsingServer):
with self.assertRaises(urllib2.HTTPError) as cm:
resp = self.request("/test/raises")
self.assertEquals(cm.exception.code, 500)
self.assertEqual(cm.exception.code, 500)
if __name__ == "__main__":
unittest.main()

View file

@ -1,14 +1,16 @@
import os
import unittest
import urllib2
import json
import uuid
import wptserve
from wptserve.router import any_method
from base import TestUsingServer, doc_root
from wptserve.stash import StashServer
from .base import TestUsingServer
class TestResponseSetCookie(TestUsingServer):
def run(self, result=None):
with StashServer(None, authkey=str(uuid.uuid4())):
super(TestResponseSetCookie, self).run(result)
def test_put_take(self):
@wptserve.handlers.handler
def handler(request, response):
@ -26,13 +28,13 @@ class TestResponseSetCookie(TestUsingServer):
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body={"id": id, "data": "Sample data"})
self.assertEquals(resp.read(), "OK")
self.assertEqual(resp.read(), "OK")
resp = self.request(route[1], query="id=" + id)
self.assertEquals(resp.read(), "Sample data")
self.assertEqual(resp.read(), "Sample data")
resp = self.request(route[1], query="id=" + id)
self.assertEquals(resp.read(), "NOT FOUND")
self.assertEqual(resp.read(), "NOT FOUND")
if __name__ == '__main__':

View file

@ -0,0 +1,17 @@
[tox]
envlist = py27,pypy
[testenv]
deps =
coverage
flake8
pytest
commands =
coverage run -m pytest tests/functional
flake8
[flake8]
ignore = E128,E129,E221,E226,E231,E251,E265,E302,E303,E402,E901,F821,F841
max-line-length = 141
exclude=docs,.git,__pycache__,.tox,.eggs,*.egg,tests/functional/docroot/

View file

@ -1,3 +1,3 @@
from server import WebTestHttpd, WebTestServer, Router
from request import Request
from response import Response
from .server import WebTestHttpd, WebTestServer, Router # noqa: F401
from .request import Request # noqa: F401
from .response import Response # noqa: F401

View file

@ -1,4 +1,4 @@
import utils
from . import utils
content_types = utils.invert_dict({"text/html": ["htm", "html"],
"application/json": ["json"],
@ -89,4 +89,4 @@ response_codes = {
504: ('Gateway Timeout',
'The gateway server did not receive a timely response'),
505: ('HTTP Version Not Supported', 'Cannot fulfill request.'),
}
}

View file

@ -5,12 +5,12 @@ import traceback
import urllib
import urlparse
from constants import content_types
from pipes import Pipeline, template
from ranges import RangeParser
from request import Authentication
from response import MultipartContent
from utils import HTTPException
from .constants import content_types
from .pipes import Pipeline, template
from .ranges import RangeParser
from .request import Authentication
from .response import MultipartContent
from .utils import HTTPException
__all__ = ["file_handler", "python_script_handler",
"FunctionHandler", "handler", "json_handler",
@ -55,13 +55,14 @@ class DirectoryHandler(object):
return "<%s base_path:%s url_base:%s>" % (self.__class__.__name__, self.base_path, self.url_base)
def __call__(self, request, response):
if not request.url_parts.path.endswith("/"):
url_path = request.url_parts.path
if not url_path.endswith("/"):
raise HTTPException(404)
path = filesystem_path(self.base_path, request, self.url_base)
if not os.path.isdir(path):
raise HTTPException(404, "%s is not a directory" % path)
assert os.path.isdir(path)
response.headers = [("Content-Type", "text/html")]
response.content = """<!doctype html>
@ -71,19 +72,18 @@ class DirectoryHandler(object):
<ul>
%(items)s
</ul>
""" % {"path": cgi.escape(request.url_parts.path),
"items": "\n".join(self.list_items(request, path))}
""" % {"path": cgi.escape(url_path),
"items": "\n".join(self.list_items(url_path, path))} # flake8: noqa
def list_items(self, base_path, path):
assert base_path.endswith("/")
def list_items(self, request, path):
# TODO: this won't actually list all routes, only the
# ones that correspond to a real filesystem path. It's
# not possible to list every route that will match
# something, but it should be possible to at least list the
# statically defined ones
base_path = request.url_parts.path
if not base_path.endswith("/"):
base_path += "/"
if base_path != "/":
link = urlparse.urljoin(base_path, "..")
yield ("""<li class="dir"><a href="%(link)s">%(name)s</a></li>""" %
@ -99,9 +99,6 @@ class DirectoryHandler(object):
{"link": link, "name": cgi.escape(item), "class": class_})
directory_handler = DirectoryHandler()
class FileHandler(object):
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path

View file

@ -424,7 +424,7 @@ def template(request, content, escape_type="html"):
return escape_func(unicode(value)).encode("utf-8")
template_regexp = re.compile(r"{{([^}]*)}}")
new_content, count = template_regexp.subn(config_replacement, content)
new_content = template_regexp.sub(config_replacement, content)
return new_content

View file

@ -1,4 +1,4 @@
from utils import HTTPException
from .utils import HTTPException
class RangeParser(object):

View file

@ -1,13 +1,12 @@
import base64
import cgi
import Cookie
import os
import StringIO
import tempfile
import urlparse
import stash
from utils import HTTPException
from . import stash
from .utils import HTTPException
missing = object()

View file

@ -6,8 +6,8 @@ import types
import uuid
import socket
from constants import response_codes
from logger import get_logger
from .constants import response_codes
from .logger import get_logger
missing = object()
@ -344,7 +344,7 @@ class ResponseHeaders(object):
def update(self, items_iter):
for name, value in items_iter:
self.set(name, value)
self.append(name, value)
def __repr__(self):
return repr(self.data)

View file

@ -2,7 +2,7 @@ import itertools
import re
import types
from logger import get_logger
from .logger import get_logger
any_method = object()
@ -78,7 +78,7 @@ def compile_path_match(route_pattern):
tokenizer = RouteTokenizer()
tokens, unmatched = tokenizer.scan(route_pattern)
assert unmatched is "", unmatched
assert unmatched == "", unmatched
compiler = RouteCompiler()

View file

@ -1,5 +1,5 @@
import handlers
from router import any_method
from . import handlers
from .router import any_method
routes = [(any_method, "*.py", handlers.python_script_handler),
("GET", "*.asis", handlers.as_is_handler),
("GET", "*", handlers.file_handler),

View file

@ -1,7 +1,6 @@
import BaseHTTPServer
import errno
import os
import re
import socket
from SocketServer import ThreadingMixIn
import ssl
@ -12,12 +11,12 @@ import traceback
import types
import urlparse
import routes as default_routes
from logger import get_logger
from request import Server, Request
from response import Response
from router import Router
from utils import HTTPException
from . import routes as default_routes
from .logger import get_logger
from .request import Server, Request
from .response import Response
from .router import Router
from .utils import HTTPException
"""HTTP server designed for testing purposes.
@ -183,12 +182,11 @@ class WebTestServer(ThreadingMixIn, BaseHTTPServer.HTTPServer):
server_side=True)
def handle_error(self, request, client_address):
error = sys.exc_value
error = sys.exc_info()[1]
if ((isinstance(error, socket.error) and
isinstance(error.args, tuple) and
error.args[0] in self.acceptable_errors)
or
error.args[0] in self.acceptable_errors) or
(isinstance(error, IOError) and
error.errno in self.acceptable_errors)):
pass # remote hang up before the result is sent
@ -282,7 +280,7 @@ class WebTestRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# Ensure that the whole request has been read from the socket
request.raw_input.read()
except socket.timeout, e:
except socket.timeout as e:
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
@ -419,7 +417,7 @@ class WebTestHttpd(object):
_host, self.port = self.httpd.socket.getsockname()
except Exception:
self.logger.error('Init failed! You may need to modify your hosts file. Refer to README.md.');
self.logger.error('Init failed! You may need to modify your hosts file. Refer to README.md.')
raise
def start(self, block=False):

View file

@ -2,7 +2,6 @@ import base64
import json
import os
import uuid
from multiprocessing import Process
from multiprocessing.managers import BaseManager, DictProxy
class ServerDictManager(BaseManager):
@ -131,7 +130,7 @@ class Stash(object):
the current request path)"""
internal_key = self._wrap_key(key, path)
value = self.data.get(internal_key, None)
if not value is None:
if value is not None:
try:
self.data.pop(internal_key)
except KeyError: