Update web-platform-tests and CSS tests.

- Update CSS tests to revision e05bfd5e30ed662c2f8a353577003f8eed230180.
- Update web-platform-tests to revision a052787dd5c069a340031011196b73affbd68cd9.
This commit is contained in:
Ms2ger 2017-02-06 11:06:12 +01:00
parent fb4f421c8b
commit 296fa2512b
21852 changed files with 2080936 additions and 892894 deletions

View file

@ -0,0 +1,24 @@
language: python
sudo: false
cache:
directories:
- $HOME/.cache/pip
matrix:
include:
- python: 2.7
env: TOXENV=py27
- python: pypy
env: TOXENV=pypy
install:
- pip install -U tox codecov
script:
- tox
after_success:
- coverage combine
- codecov

View file

@ -1,6 +1,6 @@
from setuptools import setup
PACKAGE_VERSION = '1.3.0'
PACKAGE_VERSION = '1.4.0'
deps = []
setup(name='wptserve',

View file

@ -1,3 +1,5 @@
from __future__ import print_function
import base64
import logging
import os
@ -10,6 +12,8 @@ import wptserve
logging.basicConfig()
wptserve.logger.set_logger(logging.getLogger())
here = os.path.split(__file__)[0]
doc_root = os.path.join(here, "docroot")
@ -24,7 +28,7 @@ class Request(urllib2.Request):
def add_data(self, data):
if hasattr(data, "iteritems"):
data = urllib.urlencode(data)
print data
print(data)
self.add_header("Content-Length", str(len(data)))
urllib2.Request.add_data(self, data)
@ -56,6 +60,6 @@ class TestUsingServer(unittest.TestCase):
req.add_data(body)
if auth is not None:
req.add_header("Authorization", "Basic %s" % base64.encodestring('%s:%s' % auth))
req.add_header("Authorization", "Basic %s" % base64.b64encode('%s:%s' % auth))
return urllib2.urlopen(req)

View file

@ -0,0 +1,3 @@
# Oops...
def main(request, response
return "FAIL"

View file

@ -0,0 +1,3 @@
# Oops...
def mian(request, response):
return "FAIL"

View file

@ -0,0 +1 @@
{{host}} {{domains[]}} {{ports[http][0]}}

View file

@ -0,0 +1 @@
{{headers[X-Test]}}

View file

@ -0,0 +1 @@
{{GET[test]}}

View file

@ -0,0 +1 @@
I am here to ensure that my containing directory exists.

View file

@ -1,3 +1,6 @@
Custom-Header: PASS
Another-Header: {{$id:uuid()}}
Same-Value-Header: {{$id}}
Double-Header: PA
Double-Header: SS
Content-Type: text/html

View file

@ -1,10 +1,7 @@
import os
import unittest
import urllib2
import json
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestResponseSetCookie(TestUsingServer):
def test_name_value(self):
@ -17,7 +14,7 @@ class TestResponseSetCookie(TestUsingServer):
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(resp.info()["Set-Cookie"], "name=value; Path=/")
self.assertEqual(resp.info()["Set-Cookie"], "name=value; Path=/")
def test_unset(self):
@wptserve.handlers.handler
@ -45,8 +42,8 @@ class TestResponseSetCookie(TestUsingServer):
parts = dict(item.split("=") for
item in resp.info()["Set-Cookie"].split("; ") if item)
self.assertEquals(parts["name"], "")
self.assertEquals(parts["Path"], "/")
self.assertEqual(parts["name"], "")
self.assertEqual(parts["Path"], "/")
#Should also check that expires is in the past
class TestRequestCookies(TestUsingServer):
@ -58,8 +55,7 @@ class TestRequestCookies(TestUsingServer):
route = ("GET", "/test/set_cookie", handler)
self.server.router.register(*route)
resp = self.request(route[1], headers={"Cookie": "name=value"})
self.assertEquals(resp.read(), "value")
self.assertEqual(resp.read(), b"value")
if __name__ == '__main__':
unittest.main()

View file

@ -1,85 +1,102 @@
import json
import os
import pytest
import unittest
import urllib2
import uuid
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer, doc_root
class TestFileHandler(TestUsingServer):
def test_GET(self):
resp = self.request("/document.txt")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/plain", resp.info()["Content-Type"])
self.assertEquals(open(os.path.join(doc_root, "document.txt")).read(), resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/plain", resp.info()["Content-Type"])
self.assertEqual(open(os.path.join(doc_root, "document.txt"), 'rb').read(), resp.read())
def test_headers(self):
resp = self.request("/with_headers.txt")
self.assertEquals(200, resp.getcode())
self.assertEquals("PASS", resp.info()["Custom-Header"])
self.assertEqual(200, resp.getcode())
self.assertEqual("text/html", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.info()["Custom-Header"])
# This will fail if it isn't a valid uuid
uuid.UUID(resp.info()["Another-Header"])
self.assertEquals(resp.info()["Same-Value-Header"], resp.info()["Another-Header"])
self.assertEqual(resp.info()["Same-Value-Header"], resp.info()["Another-Header"])
self.assertEqual(resp.info()["Double-Header"], "PA, SS")
def test_range(self):
resp = self.request("/document.txt", headers={"Range":"bytes=10-19"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(10, len(data))
self.assertEquals("bytes 10-19/%i" % len(expected), resp.info()['Content-Range'])
self.assertEquals("10", resp.info()['Content-Length'])
self.assertEquals(expected[10:20], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(10, len(data))
self.assertEqual("bytes 10-19/%i" % len(expected), resp.info()['Content-Range'])
self.assertEqual("10", resp.info()['Content-Length'])
self.assertEqual(expected[10:20], data)
def test_range_no_end(self):
resp = self.request("/document.txt", headers={"Range":"bytes=10-"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(len(expected) - 10, len(data))
self.assertEquals("bytes 10-%i/%i" % (len(expected) - 1, len(expected)), resp.info()['Content-Range'])
self.assertEquals(expected[10:], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(len(expected) - 10, len(data))
self.assertEqual("bytes 10-%i/%i" % (len(expected) - 1, len(expected)), resp.info()['Content-Range'])
self.assertEqual(expected[10:], data)
def test_range_no_start(self):
resp = self.request("/document.txt", headers={"Range":"bytes=-10"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(10, len(data))
self.assertEquals("bytes %i-%i/%i" % (len(expected) - 10,
len(expected) - 1,
len(expected)), resp.info()['Content-Range'])
self.assertEquals(expected[-10:], data)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(10, len(data))
self.assertEqual("bytes %i-%i/%i" % (len(expected) - 10, len(expected) - 1, len(expected)),
resp.info()['Content-Range'])
self.assertEqual(expected[-10:], data)
def test_multiple_ranges(self):
resp = self.request("/document.txt", headers={"Range":"bytes=1-2,5-7,6-10"})
self.assertEquals(206, resp.getcode())
self.assertEqual(206, resp.getcode())
data = resp.read()
expected = open(os.path.join(doc_root, "document.txt")).read()
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertTrue(resp.info()["Content-Type"].startswith("multipart/byteranges; boundary="))
boundary = resp.info()["Content-Type"].split("boundary=")[1]
parts = data.split("--" + boundary)
self.assertEquals("\r\n", parts[0])
self.assertEquals("--", parts[-1])
self.assertEqual("\r\n", parts[0])
self.assertEqual("--", parts[-1])
expected_parts = [("1-2", expected[1:3]), ("5-10", expected[5:11])]
for expected_part, part in zip(expected_parts, parts[1:-1]):
header_string, body = part.split("\r\n\r\n")
headers = dict(item.split(": ", 1) for item in header_string.split("\r\n") if item.strip())
self.assertEquals(headers["Content-Type"], "text/plain")
self.assertEquals(headers["Content-Range"], "bytes %s/%i" % (expected_part[0], len(expected)))
self.assertEquals(expected_part[1] + "\r\n", body)
self.assertEqual(headers["Content-Type"], "text/plain")
self.assertEqual(headers["Content-Range"], "bytes %s/%i" % (expected_part[0], len(expected)))
self.assertEqual(expected_part[1] + "\r\n", body)
def test_range_invalid(self):
with self.assertRaises(urllib2.HTTPError) as cm:
self.request("/document.txt", headers={"Range":"bytes=11-10"})
self.assertEquals(cm.exception.code, 416)
self.assertEqual(cm.exception.code, 416)
expected = open(os.path.join(doc_root, "document.txt")).read()
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
with self.assertRaises(urllib2.HTTPError) as cm:
self.request("/document.txt", headers={"Range":"bytes=%i-%i" % (len(expected), len(expected) + 10)})
self.assertEquals(cm.exception.code, 416)
self.assertEqual(cm.exception.code, 416)
def test_sub_config(self):
resp = self.request("/sub.sub.txt")
expected = b"localhost localhost %i" % self.server.port
assert resp.read().rstrip() == expected
def test_sub_headers(self):
resp = self.request("/sub_headers.sub.txt", headers={"X-Test": "PASS"})
expected = b"PASS"
assert resp.read().rstrip() == expected
def test_sub_params(self):
resp = self.request("/sub_params.sub.txt", query="test=PASS")
expected = b"PASS"
assert resp.read().rstrip() == expected
class TestFunctionHandler(TestUsingServer):
@ -91,9 +108,22 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_string_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("9", resp.info()["Content-Length"])
self.assertEquals("test data", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("9", resp.info()["Content-Length"])
self.assertEqual("test data", resp.read())
def test_tuple_1_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return ()
route = ("GET", "/test/test_tuple_1_rv", handler)
self.server.router.register(*route)
with pytest.raises(urllib2.HTTPError) as cm:
self.request(route[1])
assert cm.value.code == 500
def test_tuple_2_rv(self):
@wptserve.handlers.handler
@ -103,10 +133,10 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_2_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("4", resp.info()["Content-Length"])
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("4", resp.info()["Content-Length"])
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test", resp.read())
def test_tuple_3_rv(self):
@wptserve.handlers.handler
@ -116,9 +146,9 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_3_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test data", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test data", resp.read())
def test_tuple_3_rv_1(self):
@wptserve.handlers.handler
@ -128,10 +158,36 @@ class TestFunctionHandler(TestUsingServer):
route = ("GET", "/test/test_tuple_3_rv_1", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("Some Status", resp.msg)
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals("test data", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Some Status", resp.msg)
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual("test data", resp.read())
def test_tuple_4_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return 202, [("test-header", "test-value")], "test data", "garbage"
route = ("GET", "/test/test_tuple_1_rv", handler)
self.server.router.register(*route)
with pytest.raises(urllib2.HTTPError) as cm:
self.request(route[1])
assert cm.value.code == 500
def test_none_rv(self):
@wptserve.handlers.handler
def handler(request, response):
return None
route = ("GET", "/test/test_none_rv", handler)
self.server.router.register(*route)
resp = self.request(route[1])
assert resp.getcode() == 200
assert "Content-Length" not in resp.info()
assert resp.read() == b""
class TestJSONHandler(TestUsingServer):
def test_json_0(self):
@ -142,8 +198,8 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_0", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(200, resp.getcode())
self.assertEqual({"data": "test data"}, json.load(resp))
def test_json_tuple_2(self):
@wptserve.handlers.json_handler
@ -153,9 +209,9 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_tuple_2", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(200, resp.getcode())
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(200, resp.getcode())
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual({"data": "test data"}, json.load(resp))
def test_json_tuple_3(self):
@wptserve.handlers.json_handler
@ -165,47 +221,78 @@ class TestJSONHandler(TestUsingServer):
route = ("GET", "/test/test_json_tuple_2", handler)
self.server.router.register(*route)
resp = self.request(route[1])
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("test-value", resp.info()["test-header"])
self.assertEquals({"data": "test data"}, json.load(resp))
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("test-value", resp.info()["test-header"])
self.assertEqual({"data": "test data"}, json.load(resp))
class TestPythonHandler(TestUsingServer):
def test_string(self):
resp = self.request("/test_string.py")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/plain", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/plain", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.read())
def test_tuple_2(self):
resp = self.request("/test_tuple_2.py")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("PASS", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("text/html", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("PASS", resp.read())
def test_tuple_3(self):
resp = self.request("/test_tuple_3.py")
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("PASS", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("text/html", resp.info()["Content-Type"])
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("PASS", resp.read())
def test_no_main(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/no_main.py")
assert cm.value.code == 500
def test_invalid(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/invalid.py")
assert cm.value.code == 500
def test_missing(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/missing.py")
assert cm.value.code == 404
class TestDirectoryHandler(TestUsingServer):
def test_directory(self):
resp = self.request("/")
self.assertEquals(200, resp.getcode())
self.assertEquals("text/html", resp.info()["Content-Type"])
self.assertEqual(200, resp.getcode())
self.assertEqual("text/html", resp.info()["Content-Type"])
#Add a check that the response is actually sane
def test_subdirectory_trailing_slash(self):
resp = self.request("/subdir/")
assert resp.getcode() == 200
assert resp.info()["Content-Type"] == "text/html"
def test_subdirectory_no_trailing_slash(self):
with pytest.raises(urllib2.HTTPError) as cm:
self.request("/subdir")
assert cm.value.code == 404
class TestAsIsHandler(TestUsingServer):
def test_as_is(self):
resp = self.request("/test.asis")
self.assertEquals(202, resp.getcode())
self.assertEquals("Giraffe", resp.msg)
self.assertEquals("PASS", resp.info()["X-Test"])
self.assertEquals("Content", resp.read())
self.assertEqual(202, resp.getcode())
self.assertEqual("Giraffe", resp.msg)
self.assertEqual("PASS", resp.info()["X-Test"])
self.assertEqual("Content", resp.read())
#Add a check that the response is actually sane
if __name__ == '__main__':

View file

@ -1,70 +1,67 @@
import os
import unittest
import urllib2
import json
import time
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer, doc_root
class TestStatus(TestUsingServer):
def test_status(self):
resp = self.request("/document.txt", query="pipe=status(202)")
self.assertEquals(resp.getcode(), 202)
self.assertEqual(resp.getcode(), 202)
class TestHeader(TestUsingServer):
def test_not_set(self):
resp = self.request("/document.txt", query="pipe=header(X-TEST,PASS)")
self.assertEquals(resp.info()["X-TEST"], "PASS")
self.assertEqual(resp.info()["X-TEST"], "PASS")
def test_set(self):
resp = self.request("/document.txt", query="pipe=header(Content-Type,text/html)")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple(self):
resp = self.request("/document.txt", query="pipe=header(X-Test,PASS)|header(Content-Type,text/html)")
self.assertEquals(resp.info()["X-TEST"], "PASS")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["X-TEST"], "PASS")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple_same(self):
resp = self.request("/document.txt", query="pipe=header(Content-Type,FAIL)|header(Content-Type,text/html)")
self.assertEquals(resp.info()["Content-Type"], "text/html")
self.assertEqual(resp.info()["Content-Type"], "text/html")
def test_multiple_append(self):
resp = self.request("/document.txt", query="pipe=header(X-Test,1)|header(X-Test,2,True)")
self.assertEquals(resp.info()["X-Test"], "1, 2")
self.assertEqual(resp.info()["X-Test"], "1, 2")
class TestSlice(TestUsingServer):
def test_both_bounds(self):
resp = self.request("/document.txt", query="pipe=slice(1,10)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[1:10])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[1:10])
def test_no_upper(self):
resp = self.request("/document.txt", query="pipe=slice(1)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[1:])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[1:])
def test_no_lower(self):
resp = self.request("/document.txt", query="pipe=slice(null,10)")
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected[:10])
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected[:10])
class TestSub(TestUsingServer):
def test_sub_config(self):
resp = self.request("/sub.txt", query="pipe=sub")
expected = "localhost localhost %i\n" % self.server.port
self.assertEquals(resp.read(), expected)
expected = "localhost localhost %i" % self.server.port
self.assertEqual(resp.read().rstrip(), expected)
def test_sub_headers(self):
resp = self.request("/sub_headers.txt", query="pipe=sub", headers={"X-Test": "PASS"})
expected = "PASS\n"
self.assertEquals(resp.read(), expected)
expected = "PASS"
self.assertEqual(resp.read().rstrip(), expected)
def test_sub_params(self):
resp = self.request("/sub_params.txt", query="test=PASS&pipe=sub")
expected = "PASS\n"
self.assertEquals(resp.read(), expected)
expected = "PASS"
self.assertEqual(resp.read().rstrip(), expected)
class TestTrickle(TestUsingServer):
def test_trickle(self):
@ -72,8 +69,8 @@ class TestTrickle(TestUsingServer):
t0 = time.time()
resp = self.request("/document.txt", query="pipe=trickle(1:d2:5:d1:r2)")
t1 = time.time()
expected = open(os.path.join(doc_root, "document.txt")).read()
self.assertEquals(resp.read(), expected)
expected = open(os.path.join(doc_root, "document.txt"), 'rb').read()
self.assertEqual(resp.read(), expected)
self.assertGreater(6, t1-t0)
if __name__ == '__main__':

View file

@ -1,11 +1,7 @@
import os
import unittest
import urllib2
import json
import time
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestInputFile(TestUsingServer):
def test_seek(self):
@ -31,10 +27,10 @@ class TestInputFile(TestUsingServer):
route = ("POST", "/test/test_seek", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345ab\ncdef")
self.assertEquals(200, resp.getcode())
self.assertEquals(["ab", "7", "12345ab\n", "8", "cdef", "12",
"12345ab\ncdef", "12345ab\n", "cdef"],
resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["ab", "7", "12345ab\n", "8", "cdef", "12",
"12345ab\ncdef", "12345ab\n", "cdef"],
resp.read().split(" "))
def test_iter(self):
@wptserve.handlers.handler
@ -45,8 +41,8 @@ class TestInputFile(TestUsingServer):
route = ("POST", "/test/test_iter", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345\nabcdef\r\nzyxwv")
self.assertEquals(200, resp.getcode())
self.assertEquals(["12345\n", "abcdef\r\n", "zyxwv"], resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["12345\n", "abcdef\r\n", "zyxwv"], resp.read().split(" "))
class TestRequest(TestUsingServer):
def test_body(self):
@ -58,7 +54,7 @@ class TestRequest(TestUsingServer):
route = ("POST", "/test/test_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body="12345ab\ncdef")
self.assertEquals("12345ab\ncdef", resp.read())
self.assertEqual("12345ab\ncdef", resp.read())
def test_route_match(self):
@wptserve.handlers.handler
@ -68,7 +64,7 @@ class TestRequest(TestUsingServer):
route = ("GET", "/test/{match}_*", handler)
self.server.router.register(*route)
resp = self.request("/test/some_route")
self.assertEquals("some route", resp.read())
self.assertEqual("some route", resp.read())
class TestAuth(TestUsingServer):
def test_auth(self):
@ -79,8 +75,8 @@ class TestAuth(TestUsingServer):
route = ("GET", "/test/test_auth", handler)
self.server.router.register(*route)
resp = self.request(route[1], auth=("test", "PASS"))
self.assertEquals(200, resp.getcode())
self.assertEquals(["test", "PASS"], resp.read().split(" "))
self.assertEqual(200, resp.getcode())
self.assertEqual(["test", "PASS"], resp.read().split(" "))
if __name__ == '__main__':
unittest.main()

View file

@ -1,12 +1,8 @@
import os
import unittest
import urllib2
import json
import time
from types import MethodType
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
def send_body_as_header(self):
if self._response.add_required_headers:
@ -27,9 +23,9 @@ class TestResponse(TestUsingServer):
route = ("GET", "/test/test_head_without_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="HEAD")
self.assertEquals("6", resp.info()['Content-Length'])
self.assertEquals("TEST", resp.info()['x-Test'])
self.assertEquals("", resp.info()['x-body'])
self.assertEqual("6", resp.info()['Content-Length'])
self.assertEqual("TEST", resp.info()['x-Test'])
self.assertEqual("", resp.info()['x-body'])
def test_head_with_body(self):
@wptserve.handlers.handler
@ -43,9 +39,9 @@ class TestResponse(TestUsingServer):
route = ("GET", "/test/test_head_with_body", handler)
self.server.router.register(*route)
resp = self.request(route[1], method="HEAD")
self.assertEquals("6", resp.info()['Content-Length'])
self.assertEquals("TEST", resp.info()['x-Test'])
self.assertEquals("body", resp.info()['X-Body'])
self.assertEqual("6", resp.info()['Content-Length'])
self.assertEqual("TEST", resp.info()['x-Test'])
self.assertEqual("body", resp.info()['X-Body'])
if __name__ == '__main__':
unittest.main()

View file

@ -1,17 +1,15 @@
import os
import unittest
import urllib2
import json
import wptserve
from base import TestUsingServer, doc_root
from .base import TestUsingServer
class TestFileHandler(TestUsingServer):
def test_not_handled(self):
with self.assertRaises(urllib2.HTTPError) as cm:
resp = self.request("/not_existing")
self.assertEquals(cm.exception.code, 404)
self.assertEqual(cm.exception.code, 404)
class TestRewriter(TestUsingServer):
def test_rewrite(self):
@ -23,8 +21,8 @@ class TestRewriter(TestUsingServer):
self.server.rewriter.register("GET", "/test/original", route[1])
self.server.router.register(*route)
resp = self.request("/test/original")
self.assertEquals(200, resp.getcode())
self.assertEquals("/test/rewritten", resp.read())
self.assertEqual(200, resp.getcode())
self.assertEqual("/test/rewritten", resp.read())
class TestRequestHandler(TestUsingServer):
def test_exception(self):
@ -37,7 +35,7 @@ class TestRequestHandler(TestUsingServer):
with self.assertRaises(urllib2.HTTPError) as cm:
resp = self.request("/test/raises")
self.assertEquals(cm.exception.code, 500)
self.assertEqual(cm.exception.code, 500)
if __name__ == "__main__":
unittest.main()

View file

@ -1,14 +1,16 @@
import os
import unittest
import urllib2
import json
import uuid
import wptserve
from wptserve.router import any_method
from base import TestUsingServer, doc_root
from wptserve.stash import StashServer
from .base import TestUsingServer
class TestResponseSetCookie(TestUsingServer):
def run(self, result=None):
with StashServer(None, authkey=str(uuid.uuid4())):
super(TestResponseSetCookie, self).run(result)
def test_put_take(self):
@wptserve.handlers.handler
def handler(request, response):
@ -26,13 +28,13 @@ class TestResponseSetCookie(TestUsingServer):
self.server.router.register(*route)
resp = self.request(route[1], method="POST", body={"id": id, "data": "Sample data"})
self.assertEquals(resp.read(), "OK")
self.assertEqual(resp.read(), "OK")
resp = self.request(route[1], query="id=" + id)
self.assertEquals(resp.read(), "Sample data")
self.assertEqual(resp.read(), "Sample data")
resp = self.request(route[1], query="id=" + id)
self.assertEquals(resp.read(), "NOT FOUND")
self.assertEqual(resp.read(), "NOT FOUND")
if __name__ == '__main__':

View file

@ -0,0 +1,17 @@
[tox]
envlist = py27,pypy
[testenv]
deps =
coverage
flake8
pytest
commands =
coverage run -m pytest tests/functional
flake8
[flake8]
ignore = E128,E129,E221,E226,E231,E251,E265,E302,E303,E402,E901,F821,F841
max-line-length = 141
exclude=docs,.git,__pycache__,.tox,.eggs,*.egg,tests/functional/docroot/

View file

@ -1,3 +1,3 @@
from server import WebTestHttpd, WebTestServer, Router
from request import Request
from response import Response
from .server import WebTestHttpd, WebTestServer, Router # noqa: F401
from .request import Request # noqa: F401
from .response import Response # noqa: F401

View file

@ -1,4 +1,4 @@
import utils
from . import utils
content_types = utils.invert_dict({"text/html": ["htm", "html"],
"application/json": ["json"],
@ -89,4 +89,4 @@ response_codes = {
504: ('Gateway Timeout',
'The gateway server did not receive a timely response'),
505: ('HTTP Version Not Supported', 'Cannot fulfill request.'),
}
}

View file

@ -5,12 +5,12 @@ import traceback
import urllib
import urlparse
from constants import content_types
from pipes import Pipeline, template
from ranges import RangeParser
from request import Authentication
from response import MultipartContent
from utils import HTTPException
from .constants import content_types
from .pipes import Pipeline, template
from .ranges import RangeParser
from .request import Authentication
from .response import MultipartContent
from .utils import HTTPException
__all__ = ["file_handler", "python_script_handler",
"FunctionHandler", "handler", "json_handler",
@ -55,13 +55,14 @@ class DirectoryHandler(object):
return "<%s base_path:%s url_base:%s>" % (self.__class__.__name__, self.base_path, self.url_base)
def __call__(self, request, response):
if not request.url_parts.path.endswith("/"):
url_path = request.url_parts.path
if not url_path.endswith("/"):
raise HTTPException(404)
path = filesystem_path(self.base_path, request, self.url_base)
if not os.path.isdir(path):
raise HTTPException(404, "%s is not a directory" % path)
assert os.path.isdir(path)
response.headers = [("Content-Type", "text/html")]
response.content = """<!doctype html>
@ -71,19 +72,18 @@ class DirectoryHandler(object):
<ul>
%(items)s
</ul>
""" % {"path": cgi.escape(request.url_parts.path),
"items": "\n".join(self.list_items(request, path))}
""" % {"path": cgi.escape(url_path),
"items": "\n".join(self.list_items(url_path, path))} # flake8: noqa
def list_items(self, base_path, path):
assert base_path.endswith("/")
def list_items(self, request, path):
# TODO: this won't actually list all routes, only the
# ones that correspond to a real filesystem path. It's
# not possible to list every route that will match
# something, but it should be possible to at least list the
# statically defined ones
base_path = request.url_parts.path
if not base_path.endswith("/"):
base_path += "/"
if base_path != "/":
link = urlparse.urljoin(base_path, "..")
yield ("""<li class="dir"><a href="%(link)s">%(name)s</a></li>""" %
@ -99,9 +99,6 @@ class DirectoryHandler(object):
{"link": link, "name": cgi.escape(item), "class": class_})
directory_handler = DirectoryHandler()
class FileHandler(object):
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path
@ -140,6 +137,7 @@ class FileHandler(object):
ml_extensions = {".html", ".htm", ".xht", ".xhtml", ".xml", ".svg"}
escape_type = "html" if os.path.splitext(path)[1] in ml_extensions else "none"
pipeline = Pipeline("sub(%s)" % escape_type)
if pipeline is not None:
response = pipeline(request, response)
@ -149,9 +147,12 @@ class FileHandler(object):
raise HTTPException(404)
def get_headers(self, request, path):
rv = self.default_headers(path)
rv.extend(self.load_headers(request, os.path.join(os.path.split(path)[0], "__dir__")))
rv.extend(self.load_headers(request, path))
rv = (self.load_headers(request, os.path.join(os.path.split(path)[0], "__dir__")) +
self.load_headers(request, path))
if not any(key.lower() == "content-type" for (key, _) in rv):
rv.insert(0, ("Content-Type", guess_content_type(path)))
return rv
def load_headers(self, request, path):
@ -174,10 +175,12 @@ class FileHandler(object):
for line in data.splitlines() if line]
def get_data(self, response, path, byte_ranges):
with open(path, 'rb') as f:
if byte_ranges is None:
return f.read()
else:
"""Return either the handle to a file, or a string containing
the content of a chunk of the file, if we have a range request."""
if byte_ranges is None:
return open(path, 'rb')
else:
with open(path, 'rb') as f:
response.status = 206
if len(byte_ranges) > 1:
parts_content_type, content = self.set_response_multipart(response,
@ -206,9 +209,6 @@ class FileHandler(object):
f.seek(byte_range.lower)
return f.read(byte_range.upper - byte_range.lower)
def default_headers(self, path):
return [("Content-Type", guess_content_type(path))]
file_handler = FileHandler()

View file

@ -8,7 +8,7 @@ from cStringIO import StringIO
def resolve_content(response):
rv = "".join(item for item in response.iter_content())
rv = "".join(item for item in response.iter_content(read_file=True))
if type(rv) == unicode:
rv = rv.encode(response.encoding)
return rv
@ -232,33 +232,26 @@ def trickle(request, response, delays):
if not delays:
return response
content = resolve_content(response)
modified_content = []
offset = [0]
def sleep(seconds):
def inner():
time.sleep(seconds)
return ""
return inner
def add_content(delays, repeat=False):
for i, (item_type, value) in enumerate(delays):
if item_type == "bytes":
modified_content.append(content[offset[0]:offset[0] + value])
yield content[offset[0]:offset[0] + value]
offset[0] += value
elif item_type == "delay":
modified_content.append(sleep(value))
time.sleep(value)
elif item_type == "repeat":
assert i == len(delays) - 1
if i != len(delays) - 1:
continue
while offset[0] < len(content):
add_content(delays[-(value + 1):-1], True)
for item in add_content(delays[-(value + 1):-1], True):
yield item
if not repeat and offset[0] < len(content):
modified_content.append(content[offset[0]:])
yield content[offset[0]:]
add_content(delays)
response.content = modified_content
response.content = add_content(delays)
return response
@ -406,6 +399,7 @@ def template(request, content, escape_type="html"):
"hostname": request.url_parts.hostname,
"port": request.url_parts.port,
"path": request.url_parts.path,
"pathname": request.url_parts.path,
"query": "?%s" % request.url_parts.query}
elif field == "uuid()":
value = str(uuid.uuid4())
@ -430,7 +424,7 @@ def template(request, content, escape_type="html"):
return escape_func(unicode(value)).encode("utf-8")
template_regexp = re.compile(r"{{([^}]*)}}")
new_content, count = template_regexp.subn(config_replacement, content)
new_content = template_regexp.sub(config_replacement, content)
return new_content

View file

@ -1,4 +1,4 @@
from utils import HTTPException
from .utils import HTTPException
class RangeParser(object):

View file

@ -1,13 +1,12 @@
import base64
import cgi
import Cookie
import os
import StringIO
import tempfile
import urlparse
import stash
from utils import HTTPException
from . import stash
from .utils import HTTPException
missing = object()

View file

@ -6,8 +6,8 @@ import types
import uuid
import socket
from constants import response_codes
from logger import get_logger
from .constants import response_codes
from .logger import get_logger
missing = object()
@ -168,13 +168,25 @@ class Response(object):
self.set_cookie(name, None, path=path, domain=domain, max_age=0,
expires=timedelta(days=-1))
def iter_content(self):
def iter_content(self, read_file=False):
"""Iterator returning chunks of response body content.
If any part of the content is a function, this will be called
and the resulting value (if any) returned."""
if type(self.content) in types.StringTypes:
and the resulting value (if any) returned.
:param read_file: - boolean controlling the behaviour when content
is a file handle. When set to False the handle will be returned directly
allowing the file to be passed to the output in small chunks. When set to
True, the entire content of the file will be returned as a string facilitating
non-streaming operations like template substitution.
"""
if isinstance(self.content, types.StringTypes):
yield self.content
elif hasattr(self.content, "read"):
if read_file:
yield self.content.read()
else:
yield self.content
else:
for item in self.content:
if hasattr(item, "__call__"):
@ -332,7 +344,7 @@ class ResponseHeaders(object):
def update(self, items_iter):
for name, value in items_iter:
self.set(name, value)
self.append(name, value)
def __repr__(self):
return repr(self.data)
@ -355,6 +367,7 @@ class ResponseWriter(object):
self._headers_complete = False
self.content_written = False
self.request = response.request
self.file_chunk_size = 32 * 1024
def write_status(self, code, message=None):
"""Write out the status line of a response.
@ -411,7 +424,10 @@ class ResponseWriter(object):
def write_content(self, data):
"""Write the body of the response."""
self.write(self.encode(data))
if isinstance(data, types.StringTypes):
self.write(data)
else:
self.write_content_file(data)
if not self._response.explicit_flush:
self.flush()
@ -425,6 +441,20 @@ class ResponseWriter(object):
# This can happen if the socket got closed by the remote end
pass
def write_content_file(self, data):
"""Write a file-like object directly to the response in chunks.
Does not flush."""
self.content_written = True
while True:
buf = data.read(self.file_chunk_size)
if not buf:
break
try:
self._wfile.write(buf)
except socket.error:
break
data.close()
def encode(self, data):
"""Convert unicode to bytes according to response.encoding."""
if isinstance(data, str):

View file

@ -2,7 +2,7 @@ import itertools
import re
import types
from logger import get_logger
from .logger import get_logger
any_method = object()
@ -78,7 +78,7 @@ def compile_path_match(route_pattern):
tokenizer = RouteTokenizer()
tokens, unmatched = tokenizer.scan(route_pattern)
assert unmatched is "", unmatched
assert unmatched == "", unmatched
compiler = RouteCompiler()

View file

@ -1,5 +1,5 @@
import handlers
from router import any_method
from . import handlers
from .router import any_method
routes = [(any_method, "*.py", handlers.python_script_handler),
("GET", "*.asis", handlers.as_is_handler),
("GET", "*", handlers.file_handler),

View file

@ -1,7 +1,6 @@
import BaseHTTPServer
import errno
import os
import re
import socket
from SocketServer import ThreadingMixIn
import ssl
@ -12,12 +11,12 @@ import traceback
import types
import urlparse
import routes as default_routes
from logger import get_logger
from request import Server, Request
from response import Response
from router import Router
from utils import HTTPException
from . import routes as default_routes
from .logger import get_logger
from .request import Server, Request
from .response import Response
from .router import Router
from .utils import HTTPException
"""HTTP server designed for testing purposes.
@ -183,12 +182,11 @@ class WebTestServer(ThreadingMixIn, BaseHTTPServer.HTTPServer):
server_side=True)
def handle_error(self, request, client_address):
error = sys.exc_value
error = sys.exc_info()[1]
if ((isinstance(error, socket.error) and
isinstance(error.args, tuple) and
error.args[0] in self.acceptable_errors)
or
error.args[0] in self.acceptable_errors) or
(isinstance(error, IOError) and
error.errno in self.acceptable_errors)):
pass # remote hang up before the result is sent
@ -282,7 +280,7 @@ class WebTestRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# Ensure that the whole request has been read from the socket
request.raw_input.read()
except socket.timeout, e:
except socket.timeout as e:
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
@ -419,7 +417,7 @@ class WebTestHttpd(object):
_host, self.port = self.httpd.socket.getsockname()
except Exception:
self.logger.error('Init failed! You may need to modify your hosts file. Refer to README.md.');
self.logger.error('Init failed! You may need to modify your hosts file. Refer to README.md.')
raise
def start(self, block=False):

View file

@ -2,7 +2,6 @@ import base64
import json
import os
import uuid
from multiprocessing import Process
from multiprocessing.managers import BaseManager, DictProxy
class ServerDictManager(BaseManager):
@ -131,7 +130,7 @@ class Stash(object):
the current request path)"""
internal_key = self._wrap_key(key, path)
value = self.data.get(internal_key, None)
if not value is None:
if value is not None:
try:
self.data.pop(internal_key)
except KeyError: