mirror of
https://github.com/servo/servo.git
synced 2025-06-06 16:45:39 +00:00
net: Refactor Decoder
(#33611)
* Refactor Decoder to be fully async Signed-off-by: crbrz <cristianb@gmail.com> * Update WPT results Signed-off-by: crbrz <cristianb@gmail.com> * Fix deflate unit test Signed-off-by: crbrz <cristianb@gmail.com> * Add compressed response update count test Signed-off-by: crbrz <cristianb@gmail.com> * Fix typo Signed-off-by: crbrz <cristianb@gmail.com> * Source error check without conversion to String Signed-off-by: crbrz <cristianb@gmail.com> * Simplify error check Signed-off-by: crbrz <cristianb@gmail.com> * Fix variable name Signed-off-by: crbrz <cristianb@gmail.com> * Added TODO note for network.tls.ignore_unexpected_eof Signed-off-by: crbrz <cristianb@gmail.com> --------- Signed-off-by: crbrz <cristianb@gmail.com>
This commit is contained in:
parent
c682172440
commit
c7a4e4f627
11 changed files with 278 additions and 457 deletions
58
Cargo.lock
generated
58
Cargo.lock
generated
|
@ -48,12 +48,6 @@ version = "2.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
|
||||
|
||||
[[package]]
|
||||
name = "adler32"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.11"
|
||||
|
@ -233,6 +227,20 @@ dependencies = [
|
|||
"libloading",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa"
|
||||
dependencies = [
|
||||
"brotli",
|
||||
"flate2",
|
||||
"futures-core",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-recursion"
|
||||
version = "1.1.1"
|
||||
|
@ -533,9 +541,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "3.5.0"
|
||||
version = "6.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391"
|
||||
checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
|
||||
dependencies = [
|
||||
"alloc-no-stdlib",
|
||||
"alloc-stdlib",
|
||||
|
@ -544,9 +552,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "brotli-decompressor"
|
||||
version = "2.5.1"
|
||||
version = "4.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
|
||||
checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362"
|
||||
dependencies = [
|
||||
"alloc-no-stdlib",
|
||||
"alloc-stdlib",
|
||||
|
@ -4024,18 +4032,6 @@ dependencies = [
|
|||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libflate"
|
||||
version = "0.1.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9135df43b1f5d0e333385cb6e7897ecd1a43d7d11b91ac003f4d2c2d2401fdd"
|
||||
dependencies = [
|
||||
"adler32",
|
||||
"crc32fast",
|
||||
"rle-decode-fast",
|
||||
"take_mut",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libgit2-sys"
|
||||
version = "0.17.0+1.8.1"
|
||||
|
@ -4635,11 +4631,11 @@ dependencies = [
|
|||
name = "net"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"async-recursion",
|
||||
"async-tungstenite",
|
||||
"base",
|
||||
"base64",
|
||||
"brotli",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"content-security-policy",
|
||||
|
@ -4650,6 +4646,8 @@ dependencies = [
|
|||
"embedder_traits",
|
||||
"flate2",
|
||||
"futures 0.3.30",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"generic-array",
|
||||
"headers",
|
||||
"http",
|
||||
|
@ -4658,7 +4656,6 @@ dependencies = [
|
|||
"hyper_serde",
|
||||
"imsz",
|
||||
"ipc-channel",
|
||||
"libflate",
|
||||
"log",
|
||||
"malloc_size_of",
|
||||
"malloc_size_of_derive",
|
||||
|
@ -4683,6 +4680,7 @@ dependencies = [
|
|||
"tokio-rustls",
|
||||
"tokio-stream",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tungstenite",
|
||||
"url",
|
||||
"uuid",
|
||||
|
@ -5728,12 +5726,6 @@ dependencies = [
|
|||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rle-decode-fast"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422"
|
||||
|
||||
[[package]]
|
||||
name = "ron"
|
||||
version = "0.8.1"
|
||||
|
@ -6977,12 +6969,6 @@ dependencies = [
|
|||
"version-compare",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "take_mut"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
|
||||
|
||||
[[package]]
|
||||
name = "tar"
|
||||
version = "0.4.42"
|
||||
|
|
|
@ -577,7 +577,15 @@ mod gen {
|
|||
},
|
||||
mime: {
|
||||
sniff: bool,
|
||||
}
|
||||
},
|
||||
tls: {
|
||||
/// Ignore `std::io::Error` with `ErrorKind::UnexpectedEof` received when a TLS connection
|
||||
/// is closed without a close_notify.
|
||||
///
|
||||
/// Used for tests because WPT server doesn't properly close the TLS connection.
|
||||
// TODO: remove this when WPT server is updated to use a proper TLS implementation.
|
||||
ignore_unexpected_eof: bool,
|
||||
},
|
||||
},
|
||||
session_history: {
|
||||
#[serde(rename = "session-history.max-length")]
|
||||
|
|
|
@ -15,11 +15,11 @@ test = false
|
|||
doctest = false
|
||||
|
||||
[dependencies]
|
||||
async-compression = { version = "0.4.12", default-features = false, features = ["tokio", "brotli", "gzip", "zlib"] }
|
||||
async-recursion = "1.1"
|
||||
async-tungstenite = { workspace = true }
|
||||
base = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
brotli = "3"
|
||||
bytes = "1"
|
||||
content-security-policy = { workspace = true }
|
||||
cookie = { workspace = true }
|
||||
|
@ -27,8 +27,9 @@ crossbeam-channel = { workspace = true }
|
|||
data-url = { workspace = true }
|
||||
devtools_traits = { workspace = true }
|
||||
embedder_traits = { workspace = true }
|
||||
flate2 = "1"
|
||||
futures = { version = "0.3", package = "futures" }
|
||||
futures-core = { version = "0.3.30", default-features = false }
|
||||
futures-util = { version = "0.3.30", default-features = false }
|
||||
generic-array = "0.14"
|
||||
headers = { workspace = true }
|
||||
http = { workspace = true }
|
||||
|
@ -37,7 +38,6 @@ hyper-rustls = { workspace = true }
|
|||
hyper_serde = { workspace = true }
|
||||
imsz = { workspace = true }
|
||||
ipc-channel = { workspace = true }
|
||||
libflate = "0.1"
|
||||
log = { workspace = true }
|
||||
malloc_size_of = { workspace = true }
|
||||
malloc_size_of_derive = { workspace = true }
|
||||
|
@ -60,6 +60,7 @@ sha2 = "0.10"
|
|||
chrono = { workspace = true }
|
||||
time_03 = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
|
||||
tokio-util = { version = "0.7.12", default-features = false, features = ["codec", "io"] }
|
||||
tokio-rustls = { workspace = true }
|
||||
tokio-stream = "0.1"
|
||||
tungstenite = { workspace = true }
|
||||
|
@ -70,6 +71,7 @@ webrender_traits = { workspace = true }
|
|||
webpki-roots = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
flate2 = "1"
|
||||
futures = { version = "0.3", features = ["compat"] }
|
||||
tokio-test = "0.4"
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
|
|
|
@ -5,67 +5,38 @@
|
|||
//! Adapted from an implementation in reqwest.
|
||||
|
||||
/*!
|
||||
A potentially non-blocking response decoder.
|
||||
A non-blocking response decoder.
|
||||
|
||||
The decoder wraps a stream of chunks and produces a new stream of decompressed chunks.
|
||||
The decompressed chunks aren't guaranteed to align to the compressed ones.
|
||||
The decoder wraps a stream of bytes and produces a new stream of decompressed bytes.
|
||||
The decompressed bytes aren't guaranteed to align to the compressed ones.
|
||||
|
||||
If the response is plaintext then no additional work is carried out.
|
||||
Chunks are just passed along.
|
||||
Bytes are just passed along.
|
||||
|
||||
If the response is gzip, then the chunks are decompressed into a buffer.
|
||||
Slices of that buffer are emitted as new chunks.
|
||||
|
||||
This module consists of a few main types:
|
||||
|
||||
- `ReadableChunks` is a `Read`-like wrapper around a stream
|
||||
- `Decoder` is a layer over `ReadableChunks` that applies the right decompression
|
||||
|
||||
The following types directly support the gzip compression case:
|
||||
|
||||
- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
|
||||
- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail
|
||||
If the response is gzip, deflate or brotli then the bytes are decompressed.
|
||||
*/
|
||||
|
||||
use std::io::{self, Read};
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::io::{self};
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::Waker;
|
||||
use std::{cmp, fmt, mem};
|
||||
|
||||
use brotli::Decompressor;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use flate2::read::DeflateDecoder;
|
||||
use async_compression::tokio::bufread::{BrotliDecoder, GzipDecoder, ZlibDecoder};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::Peekable;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream};
|
||||
use futures_util::StreamExt;
|
||||
use headers::{ContentLength, HeaderMapExt};
|
||||
use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING};
|
||||
use hyper::{self, Body, Response};
|
||||
use libflate::non_blocking::gzip;
|
||||
use hyper::{Body, Response};
|
||||
use servo_config::pref;
|
||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||
use tokio_util::io::StreamReader;
|
||||
|
||||
use crate::connector::BUF_SIZE;
|
||||
pub const DECODER_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub enum Error {
|
||||
Io(io::Error),
|
||||
Hyper(hyper::Error),
|
||||
}
|
||||
|
||||
impl From<io::Error> for Error {
|
||||
fn from(err: io::Error) -> Error {
|
||||
Error::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for Error {
|
||||
fn from(err: hyper::Error) -> Error {
|
||||
Error::Hyper(err)
|
||||
}
|
||||
}
|
||||
|
||||
const INIT_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
/// A response decompressor over a non-blocking stream of chunks.
|
||||
/// A response decompressor over a non-blocking stream of bytes.
|
||||
///
|
||||
/// The inner decoder may be constructed asynchronously.
|
||||
pub struct Decoder {
|
||||
|
@ -81,31 +52,23 @@ enum DecoderType {
|
|||
|
||||
enum Inner {
|
||||
/// A `PlainText` decoder just returns the response content as is.
|
||||
PlainText(Body),
|
||||
PlainText(BodyStream),
|
||||
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
|
||||
Gzip(Gzip),
|
||||
Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<BodyStream>, Bytes>>, BytesCodec>),
|
||||
/// A `Delfate` decoder will uncompress the inflated response content before returning it.
|
||||
Deflate(Deflate),
|
||||
Deflate(FramedRead<ZlibDecoder<StreamReader<Peekable<BodyStream>, Bytes>>, BytesCodec>),
|
||||
/// A `Brotli` decoder will uncompress the brotli-encoded response content before returning it.
|
||||
Brotli(Brotli),
|
||||
Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<BodyStream>, Bytes>>, BytesCodec>),
|
||||
/// A decoder that doesn't have a value yet.
|
||||
Pending(Pending),
|
||||
}
|
||||
|
||||
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
|
||||
struct Pending {
|
||||
body: ReadableChunks<Body>,
|
||||
body: Peekable<BodyStream>,
|
||||
type_: DecoderType,
|
||||
}
|
||||
|
||||
/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
|
||||
/// as a `Bytes`.
|
||||
struct Gzip {
|
||||
inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
reader: Arc<Mutex<ReadableChunks<Body>>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Decoder {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Decoder").finish()
|
||||
|
@ -115,34 +78,43 @@ impl fmt::Debug for Decoder {
|
|||
impl Decoder {
|
||||
/// A plain text decoder.
|
||||
///
|
||||
/// This decoder will emit the underlying chunks as-is.
|
||||
/// This decoder will emit the underlying bytes as-is.
|
||||
#[inline]
|
||||
fn plain_text(body: Body) -> Decoder {
|
||||
fn plain_text(
|
||||
body: Body,
|
||||
is_secure_scheme: bool,
|
||||
content_length: Option<ContentLength>,
|
||||
) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::PlainText(body),
|
||||
inner: Inner::PlainText(BodyStream::new(body, is_secure_scheme, content_length)),
|
||||
}
|
||||
}
|
||||
|
||||
/// A pending decoder.
|
||||
///
|
||||
/// This decoder will buffer and decompress chunks that are encoded in the expected format.
|
||||
/// This decoder will buffer and decompress bytes that are encoded in the expected format.
|
||||
#[inline]
|
||||
fn pending(body: Body, type_: DecoderType) -> Decoder {
|
||||
fn pending(
|
||||
body: Body,
|
||||
type_: DecoderType,
|
||||
is_secure_scheme: bool,
|
||||
content_length: Option<ContentLength>,
|
||||
) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::Pending(Pending {
|
||||
body: ReadableChunks::new(body),
|
||||
body: BodyStream::new(body, is_secure_scheme, content_length).peekable(),
|
||||
type_,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a Decoder from a hyper request.
|
||||
/// Constructs a Decoder from a hyper response.
|
||||
///
|
||||
/// A decoder is just a wrapper around the hyper request that knows
|
||||
/// how to decode the content body of the request.
|
||||
/// A decoder is just a wrapper around the hyper response that knows
|
||||
/// how to decode the content body of the response.
|
||||
///
|
||||
/// Uses the correct variant by inspecting the Content-Encoding header.
|
||||
pub fn detect(response: Response<Body>) -> Response<Decoder> {
|
||||
pub fn detect(response: Response<Body>, is_secure_scheme: bool) -> Response<Decoder> {
|
||||
let values = response
|
||||
.headers()
|
||||
.get_all(CONTENT_ENCODING)
|
||||
|
@ -161,365 +133,157 @@ impl Decoder {
|
|||
}
|
||||
})
|
||||
});
|
||||
let content_length = response.headers().typed_get::<ContentLength>();
|
||||
match decoder {
|
||||
Some(type_) => response.map(|r| Decoder::pending(r, type_)),
|
||||
None => response.map(Decoder::plain_text),
|
||||
Some(type_) => {
|
||||
response.map(|r| Decoder::pending(r, type_, is_secure_scheme, content_length))
|
||||
},
|
||||
None => response.map(|r| Decoder::plain_text(r, is_secure_scheme, content_length)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Decoder {
|
||||
type Item = Result<Bytes, Error>;
|
||||
type Item = Result<Bytes, io::Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// Do a read or poll for a pending decoder value.
|
||||
let new_value = match self.inner {
|
||||
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
|
||||
Poll::Ready(inner) => inner,
|
||||
Poll::Pending => return Poll::Pending,
|
||||
match self.inner {
|
||||
Inner::Pending(ref mut future) => match futures_core::ready!(Pin::new(future).poll(cx))
|
||||
{
|
||||
Ok(inner) => {
|
||||
self.inner = inner;
|
||||
self.poll_next(cx)
|
||||
},
|
||||
Err(e) => Poll::Ready(Some(Err(e))),
|
||||
},
|
||||
Inner::PlainText(ref mut body) => {
|
||||
return Pin::new(body).poll_next(cx).map_err(|e| e.into())
|
||||
Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx),
|
||||
Inner::Gzip(ref mut decoder) => {
|
||||
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
|
||||
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
|
||||
Some(Err(err)) => Poll::Ready(Some(Err(err))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
},
|
||||
Inner::Gzip(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
|
||||
Inner::Brotli(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
|
||||
Inner::Deflate(ref mut decoder) => return Pin::new(decoder).poll_next(cx),
|
||||
};
|
||||
|
||||
//
|
||||
self.inner = new_value;
|
||||
self.poll_next(cx)
|
||||
Inner::Brotli(ref mut decoder) => {
|
||||
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
|
||||
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
|
||||
Some(Err(err)) => Poll::Ready(Some(Err(err))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
},
|
||||
Inner::Deflate(ref mut decoder) => {
|
||||
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
|
||||
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
|
||||
Some(Err(err)) => Poll::Ready(Some(Err(err))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Pending {
|
||||
type Output = Inner;
|
||||
type Output = Result<Inner, io::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let body_state = match self.body.poll_stream(cx) {
|
||||
Poll::Ready(state) => state,
|
||||
Poll::Pending => return Poll::Pending,
|
||||
match futures_core::ready!(Pin::new(&mut self.body).poll_peek(cx)) {
|
||||
Some(Ok(_)) => {
|
||||
// fallthrough
|
||||
},
|
||||
Some(Err(_e)) => {
|
||||
// error was just a ref, so we need to really poll to move it
|
||||
return Poll::Ready(Err(futures_core::ready!(
|
||||
Pin::new(&mut self.body).poll_next(cx)
|
||||
)
|
||||
.expect("just peeked Some")
|
||||
.unwrap_err()));
|
||||
},
|
||||
None => return Poll::Ready(Ok(Inner::PlainText(BodyStream::empty()))),
|
||||
};
|
||||
|
||||
let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty()));
|
||||
// libflate does a read_exact([0; 2]), so its impossible to tell
|
||||
// if the stream was empty, or truly had an UnexpectedEof.
|
||||
// Therefore, we need to check for EOF first.
|
||||
match body_state {
|
||||
StreamState::Eof => Poll::Ready(Inner::PlainText(Body::empty())),
|
||||
StreamState::HasMore => Poll::Ready(match self.type_ {
|
||||
DecoderType::Gzip => Inner::Gzip(Gzip::new(body)),
|
||||
DecoderType::Brotli => Inner::Brotli(Brotli::new(body)),
|
||||
DecoderType::Deflate => Inner::Deflate(Deflate::new(body)),
|
||||
}),
|
||||
let body = std::mem::replace(&mut self.body, BodyStream::empty().peekable());
|
||||
|
||||
match self.type_ {
|
||||
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::with_capacity(
|
||||
BrotliDecoder::new(StreamReader::new(body)),
|
||||
BytesCodec::new(),
|
||||
DECODER_BUFFER_SIZE,
|
||||
)))),
|
||||
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::with_capacity(
|
||||
GzipDecoder::new(StreamReader::new(body)),
|
||||
BytesCodec::new(),
|
||||
DECODER_BUFFER_SIZE,
|
||||
)))),
|
||||
DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(FramedRead::with_capacity(
|
||||
ZlibDecoder::new(StreamReader::new(body)),
|
||||
BytesCodec::new(),
|
||||
DECODER_BUFFER_SIZE,
|
||||
)))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Gzip {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
let stream = Arc::new(Mutex::new(stream));
|
||||
let reader = stream.clone();
|
||||
Gzip {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(gzip::Decoder::new(Peeked::new(stream))),
|
||||
reader,
|
||||
struct BodyStream {
|
||||
body: Body,
|
||||
is_secure_scheme: bool,
|
||||
content_length: Option<ContentLength>,
|
||||
total_read: u64,
|
||||
}
|
||||
|
||||
impl BodyStream {
|
||||
fn empty() -> Self {
|
||||
BodyStream {
|
||||
body: Body::empty(),
|
||||
is_secure_scheme: false,
|
||||
content_length: None,
|
||||
total_read: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn new(body: Body, is_secure_scheme: bool, content_length: Option<ContentLength>) -> Self {
|
||||
BodyStream {
|
||||
body,
|
||||
is_secure_scheme,
|
||||
content_length,
|
||||
total_read: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unsafe_code)]
|
||||
fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
// Ensure a full size buffer is available.
|
||||
// `reserve` is optimized to reclaim space over allocating.
|
||||
buf.reserve(INIT_BUFFER_SIZE);
|
||||
impl Stream for BodyStream {
|
||||
type Item = Result<Bytes, io::Error>;
|
||||
|
||||
// The buffer contains uninitialised memory so getting a readable slice is unsafe.
|
||||
// We trust the reader not to read from the memory given.
|
||||
//
|
||||
// To be safe, this memory could be zeroed before passing to the reader.
|
||||
// Otherwise we might need to deal with the case where the reader panics.
|
||||
|
||||
let read = {
|
||||
let buf = unsafe {
|
||||
let ptr = buf.chunk_mut().as_mut_ptr();
|
||||
std::slice::from_raw_parts_mut(ptr, buf.capacity())
|
||||
};
|
||||
reader.read(&mut *buf)
|
||||
};
|
||||
|
||||
match read {
|
||||
Ok(0) => Poll::Ready(None),
|
||||
Ok(read) => {
|
||||
unsafe { buf.advance_mut(read) };
|
||||
let chunk = buf.split_to(read).freeze();
|
||||
Poll::Ready(Some(Ok(chunk)))
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Some(Err(e.into()))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Gzip {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut buf = self.buf.clone();
|
||||
if let Ok(mut reader) = self.reader.lock() {
|
||||
reader.waker = Some(cx.waker().clone());
|
||||
}
|
||||
poll_with_read(&mut self.inner, &mut buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results
|
||||
/// as a `Bytes`.
|
||||
struct Brotli {
|
||||
inner: Box<Decompressor<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
reader: Arc<Mutex<ReadableChunks<Body>>>,
|
||||
}
|
||||
|
||||
impl Brotli {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
let stream = Arc::new(Mutex::new(stream));
|
||||
let reader = stream.clone();
|
||||
Self {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)),
|
||||
reader,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Brotli {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut buf = self.buf.clone();
|
||||
if let Ok(mut reader) = self.reader.lock() {
|
||||
reader.waker = Some(cx.waker().clone());
|
||||
}
|
||||
poll_with_read(&mut self.inner, &mut buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results
|
||||
/// as a `Bytes`.
|
||||
struct Deflate {
|
||||
inner: Box<DeflateDecoder<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
reader: Arc<Mutex<ReadableChunks<Body>>>,
|
||||
}
|
||||
|
||||
impl Deflate {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
let stream = Arc::new(Mutex::new(stream));
|
||||
let reader = stream.clone();
|
||||
Self {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(DeflateDecoder::new(Peeked::new(stream))),
|
||||
reader,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Deflate {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut buf = self.buf.clone();
|
||||
if let Ok(mut reader) = self.reader.lock() {
|
||||
reader.waker = Some(cx.waker().clone());
|
||||
}
|
||||
poll_with_read(&mut self.inner, &mut buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Read`able wrapper over a stream of chunks.
|
||||
pub struct ReadableChunks<S> {
|
||||
state: ReadState,
|
||||
stream: S,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
enum ReadState {
|
||||
/// A chunk is ready to be read from.
|
||||
Ready(Bytes),
|
||||
/// The next chunk isn't ready yet.
|
||||
NotReady,
|
||||
/// The stream has finished.
|
||||
Eof,
|
||||
/// Stream is in err
|
||||
Error(hyper::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum StreamState {
|
||||
/// More bytes can be read from the stream.
|
||||
HasMore,
|
||||
/// No more bytes can be read from the stream.
|
||||
Eof,
|
||||
}
|
||||
|
||||
/// A buffering reader that ensures `Read`s return at least a few bytes.
|
||||
struct Peeked<R> {
|
||||
state: PeekedState,
|
||||
peeked_buf: [u8; 10],
|
||||
pos: usize,
|
||||
inner: Arc<Mutex<R>>,
|
||||
}
|
||||
|
||||
enum PeekedState {
|
||||
/// The internal buffer hasn't filled yet.
|
||||
NotReady,
|
||||
/// The internal buffer can be read.
|
||||
Ready(usize),
|
||||
}
|
||||
|
||||
impl<R> Peeked<R> {
|
||||
#[inline]
|
||||
fn new(inner: Arc<Mutex<R>>) -> Self {
|
||||
Peeked {
|
||||
state: PeekedState::NotReady,
|
||||
peeked_buf: [0; 10],
|
||||
inner,
|
||||
pos: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn ready(&mut self) {
|
||||
self.state = PeekedState::Ready(self.pos);
|
||||
self.pos = 0;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn not_ready(&mut self) {
|
||||
self.state = PeekedState::NotReady;
|
||||
self.pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for Peeked<R> {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
loop {
|
||||
match self.state {
|
||||
PeekedState::Ready(peeked_buf_len) => {
|
||||
let len = cmp::min(buf.len(), peeked_buf_len - self.pos);
|
||||
let start = self.pos;
|
||||
let end = self.pos + len;
|
||||
|
||||
buf[..len].copy_from_slice(&self.peeked_buf[start..end]);
|
||||
self.pos += len;
|
||||
if self.pos == peeked_buf_len {
|
||||
self.not_ready();
|
||||
}
|
||||
return Ok(len);
|
||||
},
|
||||
PeekedState::NotReady => {
|
||||
let buf = &mut self.peeked_buf[self.pos..];
|
||||
let stream = self.inner.clone();
|
||||
let mut reader = stream.lock().unwrap();
|
||||
let read = reader.read(buf);
|
||||
|
||||
match read {
|
||||
Ok(0) => self.ready(),
|
||||
Ok(read) => {
|
||||
self.pos += read;
|
||||
if self.pos == self.peeked_buf.len() {
|
||||
self.ready();
|
||||
}
|
||||
},
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S> {
|
||||
#[inline]
|
||||
fn new(stream: S) -> Self {
|
||||
ReadableChunks {
|
||||
state: ReadState::NotReady,
|
||||
stream,
|
||||
waker: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> fmt::Debug for ReadableChunks<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("ReadableChunks").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Read for ReadableChunks<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin,
|
||||
{
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let waker = self.waker.as_ref().unwrap().clone();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
let ret;
|
||||
match self.state {
|
||||
ReadState::Ready(ref mut chunk) => {
|
||||
let len = cmp::min(buf.len(), chunk.remaining());
|
||||
|
||||
buf[..len].copy_from_slice(&chunk[..len]);
|
||||
chunk.advance(len);
|
||||
if chunk.is_empty() {
|
||||
ret = len;
|
||||
} else {
|
||||
return Ok(len);
|
||||
}
|
||||
},
|
||||
ReadState::NotReady => match self.poll_stream(&mut cx) {
|
||||
Poll::Ready(StreamState::HasMore) => continue,
|
||||
Poll::Ready(StreamState::Eof) => return Ok(0),
|
||||
Poll::Pending => return Err(io::ErrorKind::WouldBlock.into()),
|
||||
},
|
||||
ReadState::Eof => return Ok(0),
|
||||
ReadState::Error(ref err) => {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()))
|
||||
},
|
||||
}
|
||||
self.state = ReadState::NotReady;
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, hyper::Error>> + std::marker::Unpin,
|
||||
{
|
||||
/// Poll the readiness of the inner reader.
|
||||
///
|
||||
/// This function will update the internal state and return a simplified
|
||||
/// version of the `ReadState`.
|
||||
fn poll_stream(&mut self, cx: &mut Context<'_>) -> Poll<StreamState> {
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(chunk))) => {
|
||||
self.state = ReadState::Ready(chunk);
|
||||
|
||||
Poll::Ready(StreamState::HasMore)
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
match futures_core::ready!(Pin::new(&mut self.body).poll_next(cx)) {
|
||||
Some(Ok(bytes)) => {
|
||||
self.total_read += bytes.len() as u64;
|
||||
Poll::Ready(Some(Ok(bytes)))
|
||||
},
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
self.state = ReadState::Error(err);
|
||||
|
||||
Poll::Ready(StreamState::Eof)
|
||||
Some(Err(err)) => {
|
||||
// To prevent truncation attacks rustls treats close connection without a close_notify as
|
||||
// an error of type std::io::Error with ErrorKind::UnexpectedEof.
|
||||
// https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof
|
||||
//
|
||||
// The error can be safely ignored if we known that all content was received or is explicitly
|
||||
// set in preferences.
|
||||
let all_content_read = self
|
||||
.content_length
|
||||
.map_or(false, |c| c.0 == self.total_read);
|
||||
if self.is_secure_scheme &&
|
||||
(all_content_read || pref!(network.tls.ignore_unexpected_eof))
|
||||
{
|
||||
let source = err.source();
|
||||
let is_unexpected_eof = source
|
||||
.and_then(|e| e.downcast_ref::<io::Error>())
|
||||
.map_or(false, |e| e.kind() == io::ErrorKind::UnexpectedEof);
|
||||
if is_unexpected_eof {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, err))))
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
self.state = ReadState::Eof;
|
||||
Poll::Ready(StreamState::Eof)
|
||||
},
|
||||
Poll::Pending => Poll::Pending,
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -648,6 +648,7 @@ async fn obtain_response(
|
|||
let host = request.uri().host().unwrap_or("").to_owned();
|
||||
let override_manager = context.state.override_manager.clone();
|
||||
let headers = headers.clone();
|
||||
let is_secure_scheme = url.is_secure_scheme();
|
||||
|
||||
client
|
||||
.request(request)
|
||||
|
@ -681,7 +682,7 @@ async fn obtain_response(
|
|||
debug!("Not notifying devtools (no request_id)");
|
||||
None
|
||||
};
|
||||
future::ready(Ok((Decoder::detect(res), msg)))
|
||||
future::ready(Ok((Decoder::detect(res, is_secure_scheme), msg)))
|
||||
})
|
||||
.map_err(move |error| {
|
||||
NetworkError::from_hyper_error(
|
||||
|
|
|
@ -32,6 +32,7 @@ pub mod fetch {
|
|||
|
||||
/// A module for re-exports of items used in unit tests.
|
||||
pub mod test {
|
||||
pub use crate::decoder::DECODER_BUFFER_SIZE;
|
||||
pub use crate::hosts::{parse_hostsfile, replace_host_table};
|
||||
pub use crate::http_loader::HttpState;
|
||||
}
|
||||
|
|
|
@ -13,12 +13,12 @@ use std::time::Duration;
|
|||
|
||||
use base::id::TEST_PIPELINE_ID;
|
||||
use cookie::Cookie as CookiePair;
|
||||
use crossbeam_channel::{unbounded, Receiver};
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use devtools_traits::{
|
||||
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
|
||||
HttpResponse as DevtoolsHttpResponse, NetworkEvent,
|
||||
};
|
||||
use flate2::write::{DeflateEncoder, GzEncoder};
|
||||
use flate2::write::{GzEncoder, ZlibEncoder};
|
||||
use flate2::Compression;
|
||||
use headers::authorization::Basic;
|
||||
use headers::{
|
||||
|
@ -32,17 +32,19 @@ use ipc_channel::ipc;
|
|||
use ipc_channel::router::ROUTER;
|
||||
use net::cookie::ServoCookie;
|
||||
use net::cookie_storage::CookieStorage;
|
||||
use net::fetch::methods::{self};
|
||||
use net::http_loader::determine_requests_referrer;
|
||||
use net::resource_thread::AuthCacheEntry;
|
||||
use net::test::replace_host_table;
|
||||
use net::test::{replace_host_table, DECODER_BUFFER_SIZE};
|
||||
use net_traits::http_status::HttpStatus;
|
||||
use net_traits::request::{
|
||||
BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, Referrer,
|
||||
RequestBody, RequestBuilder,
|
||||
Request, RequestBody, RequestBuilder,
|
||||
};
|
||||
use net_traits::response::ResponseBody;
|
||||
use net_traits::{CookieSource, NetworkError, ReferrerPolicy};
|
||||
use net_traits::response::{Response, ResponseBody};
|
||||
use net_traits::{CookieSource, FetchTaskTarget, NetworkError, ReferrerPolicy};
|
||||
use servo_url::{ImmutableOrigin, ServoUrl};
|
||||
use tokio_test::block_on;
|
||||
|
||||
use crate::{fetch, fetch_with_context, make_server, new_fetch_context};
|
||||
|
||||
|
@ -437,7 +439,7 @@ fn test_load_should_decode_the_response_as_deflate_when_response_headers_have_co
|
|||
header::CONTENT_ENCODING,
|
||||
HeaderValue::from_static("deflate"),
|
||||
);
|
||||
let mut e = DeflateEncoder::new(Vec::new(), Compression::default());
|
||||
let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
|
||||
e.write(b"Yay!").unwrap();
|
||||
let encoded_content = e.finish().unwrap();
|
||||
*response.body_mut() = encoded_content.into();
|
||||
|
@ -1356,3 +1358,66 @@ fn test_determine_requests_referrer_longer_than_4k() {
|
|||
|
||||
assert_eq!(referer.unwrap().as_str(), "http://example.com/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fetch_compressed_response_update_count() {
|
||||
// contents of ../../tests/wpt/tests/fetch/content-encoding/br/resources/foo.text.br
|
||||
const DATA_BROTLI_COMPRESSED: [u8; 15] = [
|
||||
0xe1, 0x18, 0x48, 0xc1, 0x2f, 0x65, 0xf6, 0x16, 0x9f, 0x05, 0x01, 0xbb, 0x20, 0x00, 0x06,
|
||||
];
|
||||
const DATA_DECOMPRESSED_LEN: usize = 10500;
|
||||
|
||||
let handler = move |_: HyperRequest<Body>, response: &mut HyperResponse<Body>| {
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(header::CONTENT_ENCODING, HeaderValue::from_static("br"));
|
||||
*response.body_mut() = DATA_BROTLI_COMPRESSED.to_vec().into();
|
||||
};
|
||||
let (server, url) = make_server(handler);
|
||||
|
||||
let mut request = RequestBuilder::new(url.clone(), Referrer::NoReferrer)
|
||||
.method(Method::GET)
|
||||
.body(None)
|
||||
.destination(Destination::Document)
|
||||
.origin(mock_origin())
|
||||
.pipeline_id(Some(TEST_PIPELINE_ID))
|
||||
.build();
|
||||
|
||||
struct FetchResponseCollector {
|
||||
sender: Sender<usize>,
|
||||
update_count: usize,
|
||||
}
|
||||
impl FetchTaskTarget for FetchResponseCollector {
|
||||
fn process_request_body(&mut self, _: &Request) {}
|
||||
fn process_request_eof(&mut self, _: &Request) {}
|
||||
fn process_response(&mut self, _: &Response) {}
|
||||
fn process_response_chunk(&mut self, _: Vec<u8>) {
|
||||
self.update_count += 1;
|
||||
}
|
||||
/// Fired when the response is fully fetched
|
||||
fn process_response_eof(&mut self, _: &Response) {
|
||||
let _ = self.sender.send(self.update_count);
|
||||
}
|
||||
}
|
||||
|
||||
let (sender, receiver) = unbounded();
|
||||
let mut target = FetchResponseCollector {
|
||||
sender: sender,
|
||||
update_count: 0,
|
||||
};
|
||||
let response_update_count = block_on(async move {
|
||||
methods::fetch(
|
||||
&mut request,
|
||||
&mut target,
|
||||
&mut new_fetch_context(None, None, None),
|
||||
)
|
||||
.await;
|
||||
receiver.recv().unwrap()
|
||||
});
|
||||
|
||||
server.close();
|
||||
|
||||
const EXPECTED_UPDATE_COUNT: usize =
|
||||
(DATA_DECOMPRESSED_LEN + DECODER_BUFFER_SIZE - 1) / DECODER_BUFFER_SIZE;
|
||||
assert_eq!(response_update_count, EXPECTED_UPDATE_COUNT);
|
||||
}
|
||||
|
|
|
@ -110,6 +110,7 @@
|
|||
"network.http-cache.disabled": false,
|
||||
"network.local_directory_listing.enabled": false,
|
||||
"network.mime.sniff": false,
|
||||
"network.tls.ignore_unexpected_eof": false,
|
||||
"session-history.max-length": 20,
|
||||
"shell.background-color.rgba": [1.0, 1.0, 1.0, 1.0],
|
||||
"shell.crash_reporter.enabled": false,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
{
|
||||
"dom.webxr.test": true
|
||||
"dom.webxr.test": true,
|
||||
"network.tls.ignore_unexpected_eof": true
|
||||
}
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
[response-data-deflate.htm]
|
||||
type: testharness
|
||||
[XMLHttpRequest: content-encoding:deflate response was correctly inflated]
|
||||
expected: FAIL
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
[response-data-deflate.htm]
|
||||
[XMLHttpRequest: content-encoding:deflate response was correctly inflated]
|
||||
expected: FAIL
|
Loading…
Add table
Add a link
Reference in a new issue