diff --git a/Cargo.lock b/Cargo.lock index b01de478a20..34a9c235a74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,12 +48,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "ahash" version = "0.8.11" @@ -233,6 +227,20 @@ dependencies = [ "libloading", ] +[[package]] +name = "async-compression" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -533,9 +541,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.5.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -544,9 +552,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.5.1" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -4024,18 +4032,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "libflate" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9135df43b1f5d0e333385cb6e7897ecd1a43d7d11b91ac003f4d2c2d2401fdd" -dependencies = [ - "adler32", - "crc32fast", - "rle-decode-fast", - "take_mut", -] - [[package]] name = "libgit2-sys" version = "0.17.0+1.8.1" @@ -4635,11 +4631,11 @@ dependencies = [ name = "net" version = "0.0.1" dependencies = [ + "async-compression", "async-recursion", "async-tungstenite", "base", "base64", - "brotli", "bytes", "chrono", "content-security-policy", @@ -4650,6 +4646,8 @@ dependencies = [ "embedder_traits", "flate2", "futures 0.3.30", + "futures-core", + "futures-util", "generic-array", "headers", "http", @@ -4658,7 +4656,6 @@ dependencies = [ "hyper_serde", "imsz", "ipc-channel", - "libflate", "log", "malloc_size_of", "malloc_size_of_derive", @@ -4683,6 +4680,7 @@ dependencies = [ "tokio-rustls", "tokio-stream", "tokio-test", + "tokio-util", "tungstenite", "url", "uuid", @@ -5728,12 +5726,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rle-decode-fast" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" - [[package]] name = "ron" version = "0.8.1" @@ -6977,12 +6969,6 @@ dependencies = [ "version-compare", ] -[[package]] -name = "take_mut" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" - [[package]] name = "tar" version = "0.4.42" diff --git a/components/config/prefs.rs b/components/config/prefs.rs index 9e2c161a32c..d6da7d0e5d9 100644 --- a/components/config/prefs.rs +++ b/components/config/prefs.rs @@ -577,7 +577,15 @@ mod gen { }, mime: { sniff: bool, - } + }, + tls: { + /// Ignore `std::io::Error` with `ErrorKind::UnexpectedEof` received when a TLS connection + /// is closed without a close_notify. + /// + /// Used for tests because WPT server doesn't properly close the TLS connection. + // TODO: remove this when WPT server is updated to use a proper TLS implementation. + ignore_unexpected_eof: bool, + }, }, session_history: { #[serde(rename = "session-history.max-length")] diff --git a/components/net/Cargo.toml b/components/net/Cargo.toml index ea26c4c108e..d6d083d3dcb 100644 --- a/components/net/Cargo.toml +++ b/components/net/Cargo.toml @@ -15,11 +15,11 @@ test = false doctest = false [dependencies] +async-compression = { version = "0.4.12", default-features = false, features = ["tokio", "brotli", "gzip", "zlib"] } async-recursion = "1.1" async-tungstenite = { workspace = true } base = { workspace = true } base64 = { workspace = true } -brotli = "3" bytes = "1" content-security-policy = { workspace = true } cookie = { workspace = true } @@ -27,8 +27,9 @@ crossbeam-channel = { workspace = true } data-url = { workspace = true } devtools_traits = { workspace = true } embedder_traits = { workspace = true } -flate2 = "1" futures = { version = "0.3", package = "futures" } +futures-core = { version = "0.3.30", default-features = false } +futures-util = { version = "0.3.30", default-features = false } generic-array = "0.14" headers = { workspace = true } http = { workspace = true } @@ -37,7 +38,6 @@ hyper-rustls = { workspace = true } hyper_serde = { workspace = true } imsz = { workspace = true } ipc-channel = { workspace = true } -libflate = "0.1" log = { workspace = true } malloc_size_of = { workspace = true } malloc_size_of_derive = { workspace = true } @@ -60,6 +60,7 @@ sha2 = "0.10" chrono = { workspace = true } time_03 = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } +tokio-util = { version = "0.7.12", default-features = false, features = ["codec", "io"] } tokio-rustls = { workspace = true } tokio-stream = "0.1" tungstenite = { workspace = true } @@ -70,6 +71,7 @@ webrender_traits = { workspace = true } webpki-roots = { workspace = true } [dev-dependencies] +flate2 = "1" futures = { version = "0.3", features = ["compat"] } tokio-test = "0.4" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/components/net/decoder.rs b/components/net/decoder.rs index 7d72ee3d8b4..eaa1cc663e7 100644 --- a/components/net/decoder.rs +++ b/components/net/decoder.rs @@ -5,67 +5,38 @@ //! Adapted from an implementation in reqwest. /*! -A potentially non-blocking response decoder. +A non-blocking response decoder. -The decoder wraps a stream of chunks and produces a new stream of decompressed chunks. -The decompressed chunks aren't guaranteed to align to the compressed ones. +The decoder wraps a stream of bytes and produces a new stream of decompressed bytes. +The decompressed bytes aren't guaranteed to align to the compressed ones. If the response is plaintext then no additional work is carried out. -Chunks are just passed along. +Bytes are just passed along. -If the response is gzip, then the chunks are decompressed into a buffer. -Slices of that buffer are emitted as new chunks. - -This module consists of a few main types: - -- `ReadableChunks` is a `Read`-like wrapper around a stream -- `Decoder` is a layer over `ReadableChunks` that applies the right decompression - -The following types directly support the gzip compression case: - -- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF -- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail +If the response is gzip, deflate or brotli then the bytes are decompressed. */ -use std::io::{self, Read}; +use std::error::Error; +use std::fmt; +use std::io::{self}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::Waker; -use std::{cmp, fmt, mem}; -use brotli::Decompressor; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use flate2::read::DeflateDecoder; +use async_compression::tokio::bufread::{BrotliDecoder, GzipDecoder, ZlibDecoder}; +use bytes::Bytes; +use futures::stream::Peekable; use futures::task::{Context, Poll}; use futures::{Future, Stream}; +use futures_util::StreamExt; +use headers::{ContentLength, HeaderMapExt}; use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING}; -use hyper::{self, Body, Response}; -use libflate::non_blocking::gzip; +use hyper::{Body, Response}; +use servo_config::pref; +use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::io::StreamReader; -use crate::connector::BUF_SIZE; +pub const DECODER_BUFFER_SIZE: usize = 8192; -#[derive(Debug)] -#[allow(dead_code)] -pub enum Error { - Io(io::Error), - Hyper(hyper::Error), -} - -impl From for Error { - fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl From for Error { - fn from(err: hyper::Error) -> Error { - Error::Hyper(err) - } -} - -const INIT_BUFFER_SIZE: usize = 8192; - -/// A response decompressor over a non-blocking stream of chunks. +/// A response decompressor over a non-blocking stream of bytes. /// /// The inner decoder may be constructed asynchronously. pub struct Decoder { @@ -81,31 +52,23 @@ enum DecoderType { enum Inner { /// A `PlainText` decoder just returns the response content as is. - PlainText(Body), + PlainText(BodyStream), /// A `Gzip` decoder will uncompress the gzipped response content before returning it. - Gzip(Gzip), + Gzip(FramedRead, Bytes>>, BytesCodec>), /// A `Delfate` decoder will uncompress the inflated response content before returning it. - Deflate(Deflate), + Deflate(FramedRead, Bytes>>, BytesCodec>), /// A `Brotli` decoder will uncompress the brotli-encoded response content before returning it. - Brotli(Brotli), + Brotli(FramedRead, Bytes>>, BytesCodec>), /// A decoder that doesn't have a value yet. Pending(Pending), } /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. struct Pending { - body: ReadableChunks, + body: Peekable, type_: DecoderType, } -/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results -/// as a `Bytes`. -struct Gzip { - inner: Box>>>, - buf: BytesMut, - reader: Arc>>, -} - impl fmt::Debug for Decoder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Decoder").finish() @@ -115,34 +78,43 @@ impl fmt::Debug for Decoder { impl Decoder { /// A plain text decoder. /// - /// This decoder will emit the underlying chunks as-is. + /// This decoder will emit the underlying bytes as-is. #[inline] - fn plain_text(body: Body) -> Decoder { + fn plain_text( + body: Body, + is_secure_scheme: bool, + content_length: Option, + ) -> Decoder { Decoder { - inner: Inner::PlainText(body), + inner: Inner::PlainText(BodyStream::new(body, is_secure_scheme, content_length)), } } /// A pending decoder. /// - /// This decoder will buffer and decompress chunks that are encoded in the expected format. + /// This decoder will buffer and decompress bytes that are encoded in the expected format. #[inline] - fn pending(body: Body, type_: DecoderType) -> Decoder { + fn pending( + body: Body, + type_: DecoderType, + is_secure_scheme: bool, + content_length: Option, + ) -> Decoder { Decoder { inner: Inner::Pending(Pending { - body: ReadableChunks::new(body), + body: BodyStream::new(body, is_secure_scheme, content_length).peekable(), type_, }), } } - /// Constructs a Decoder from a hyper request. + /// Constructs a Decoder from a hyper response. /// - /// A decoder is just a wrapper around the hyper request that knows - /// how to decode the content body of the request. + /// A decoder is just a wrapper around the hyper response that knows + /// how to decode the content body of the response. /// /// Uses the correct variant by inspecting the Content-Encoding header. - pub fn detect(response: Response) -> Response { + pub fn detect(response: Response, is_secure_scheme: bool) -> Response { let values = response .headers() .get_all(CONTENT_ENCODING) @@ -161,365 +133,157 @@ impl Decoder { } }) }); + let content_length = response.headers().typed_get::(); match decoder { - Some(type_) => response.map(|r| Decoder::pending(r, type_)), - None => response.map(Decoder::plain_text), + Some(type_) => { + response.map(|r| Decoder::pending(r, type_, is_secure_scheme, content_length)) + }, + None => response.map(|r| Decoder::plain_text(r, is_secure_scheme, content_length)), } } } impl Stream for Decoder { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Do a read or poll for a pending decoder value. - let new_value = match self.inner { - Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { - Poll::Ready(inner) => inner, - Poll::Pending => return Poll::Pending, + match self.inner { + Inner::Pending(ref mut future) => match futures_core::ready!(Pin::new(future).poll(cx)) + { + Ok(inner) => { + self.inner = inner; + self.poll_next(cx) + }, + Err(e) => Poll::Ready(Some(Err(e))), }, - Inner::PlainText(ref mut body) => { - return Pin::new(body).poll_next(cx).map_err(|e| e.into()) + Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), + Inner::Gzip(ref mut decoder) => { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } }, - Inner::Gzip(ref mut decoder) => return Pin::new(decoder).poll_next(cx), - Inner::Brotli(ref mut decoder) => return Pin::new(decoder).poll_next(cx), - Inner::Deflate(ref mut decoder) => return Pin::new(decoder).poll_next(cx), - }; - - // - self.inner = new_value; - self.poll_next(cx) + Inner::Brotli(ref mut decoder) => { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + }, + Inner::Deflate(ref mut decoder) => { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + }, + } } } impl Future for Pending { - type Output = Inner; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let body_state = match self.body.poll_stream(cx) { - Poll::Ready(state) => state, - Poll::Pending => return Poll::Pending, + match futures_core::ready!(Pin::new(&mut self.body).poll_peek(cx)) { + Some(Ok(_)) => { + // fallthrough + }, + Some(Err(_e)) => { + // error was just a ref, so we need to really poll to move it + return Poll::Ready(Err(futures_core::ready!( + Pin::new(&mut self.body).poll_next(cx) + ) + .expect("just peeked Some") + .unwrap_err())); + }, + None => return Poll::Ready(Ok(Inner::PlainText(BodyStream::empty()))), }; - let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty())); - // libflate does a read_exact([0; 2]), so its impossible to tell - // if the stream was empty, or truly had an UnexpectedEof. - // Therefore, we need to check for EOF first. - match body_state { - StreamState::Eof => Poll::Ready(Inner::PlainText(Body::empty())), - StreamState::HasMore => Poll::Ready(match self.type_ { - DecoderType::Gzip => Inner::Gzip(Gzip::new(body)), - DecoderType::Brotli => Inner::Brotli(Brotli::new(body)), - DecoderType::Deflate => Inner::Deflate(Deflate::new(body)), - }), + let body = std::mem::replace(&mut self.body, BodyStream::empty().peekable()); + + match self.type_ { + DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::with_capacity( + BrotliDecoder::new(StreamReader::new(body)), + BytesCodec::new(), + DECODER_BUFFER_SIZE, + )))), + DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::with_capacity( + GzipDecoder::new(StreamReader::new(body)), + BytesCodec::new(), + DECODER_BUFFER_SIZE, + )))), + DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(FramedRead::with_capacity( + ZlibDecoder::new(StreamReader::new(body)), + BytesCodec::new(), + DECODER_BUFFER_SIZE, + )))), } } } -impl Gzip { - fn new(stream: ReadableChunks) -> Self { - let stream = Arc::new(Mutex::new(stream)); - let reader = stream.clone(); - Gzip { - buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), - inner: Box::new(gzip::Decoder::new(Peeked::new(stream))), - reader, +struct BodyStream { + body: Body, + is_secure_scheme: bool, + content_length: Option, + total_read: u64, +} + +impl BodyStream { + fn empty() -> Self { + BodyStream { + body: Body::empty(), + is_secure_scheme: false, + content_length: None, + total_read: 0, + } + } + + fn new(body: Body, is_secure_scheme: bool, content_length: Option) -> Self { + BodyStream { + body, + is_secure_scheme, + content_length, + total_read: 0, } } } -#[allow(unsafe_code)] -fn poll_with_read(reader: &mut dyn Read, buf: &mut BytesMut) -> Poll>> { - // Ensure a full size buffer is available. - // `reserve` is optimized to reclaim space over allocating. - buf.reserve(INIT_BUFFER_SIZE); +impl Stream for BodyStream { + type Item = Result; - // The buffer contains uninitialised memory so getting a readable slice is unsafe. - // We trust the reader not to read from the memory given. - // - // To be safe, this memory could be zeroed before passing to the reader. - // Otherwise we might need to deal with the case where the reader panics. - - let read = { - let buf = unsafe { - let ptr = buf.chunk_mut().as_mut_ptr(); - std::slice::from_raw_parts_mut(ptr, buf.capacity()) - }; - reader.read(&mut *buf) - }; - - match read { - Ok(0) => Poll::Ready(None), - Ok(read) => { - unsafe { buf.advance_mut(read) }; - let chunk = buf.split_to(read).freeze(); - Poll::Ready(Some(Ok(chunk))) - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, - Err(e) => Poll::Ready(Some(Err(e.into()))), - } -} - -impl Stream for Gzip { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut buf = self.buf.clone(); - if let Ok(mut reader) = self.reader.lock() { - reader.waker = Some(cx.waker().clone()); - } - poll_with_read(&mut self.inner, &mut buf) - } -} - -/// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results -/// as a `Bytes`. -struct Brotli { - inner: Box>>>, - buf: BytesMut, - reader: Arc>>, -} - -impl Brotli { - fn new(stream: ReadableChunks) -> Self { - let stream = Arc::new(Mutex::new(stream)); - let reader = stream.clone(); - Self { - buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), - inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)), - reader, - } - } -} - -impl Stream for Brotli { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut buf = self.buf.clone(); - if let Ok(mut reader) = self.reader.lock() { - reader.waker = Some(cx.waker().clone()); - } - poll_with_read(&mut self.inner, &mut buf) - } -} - -/// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results -/// as a `Bytes`. -struct Deflate { - inner: Box>>>, - buf: BytesMut, - reader: Arc>>, -} - -impl Deflate { - fn new(stream: ReadableChunks) -> Self { - let stream = Arc::new(Mutex::new(stream)); - let reader = stream.clone(); - Self { - buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), - inner: Box::new(DeflateDecoder::new(Peeked::new(stream))), - reader, - } - } -} - -impl Stream for Deflate { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut buf = self.buf.clone(); - if let Ok(mut reader) = self.reader.lock() { - reader.waker = Some(cx.waker().clone()); - } - poll_with_read(&mut self.inner, &mut buf) - } -} - -/// A `Read`able wrapper over a stream of chunks. -pub struct ReadableChunks { - state: ReadState, - stream: S, - waker: Option, -} - -enum ReadState { - /// A chunk is ready to be read from. - Ready(Bytes), - /// The next chunk isn't ready yet. - NotReady, - /// The stream has finished. - Eof, - /// Stream is in err - Error(hyper::Error), -} - -#[derive(Debug)] -enum StreamState { - /// More bytes can be read from the stream. - HasMore, - /// No more bytes can be read from the stream. - Eof, -} - -/// A buffering reader that ensures `Read`s return at least a few bytes. -struct Peeked { - state: PeekedState, - peeked_buf: [u8; 10], - pos: usize, - inner: Arc>, -} - -enum PeekedState { - /// The internal buffer hasn't filled yet. - NotReady, - /// The internal buffer can be read. - Ready(usize), -} - -impl Peeked { - #[inline] - fn new(inner: Arc>) -> Self { - Peeked { - state: PeekedState::NotReady, - peeked_buf: [0; 10], - inner, - pos: 0, - } - } - - #[inline] - fn ready(&mut self) { - self.state = PeekedState::Ready(self.pos); - self.pos = 0; - } - - #[inline] - fn not_ready(&mut self) { - self.state = PeekedState::NotReady; - self.pos = 0; - } -} - -impl Read for Peeked { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - loop { - match self.state { - PeekedState::Ready(peeked_buf_len) => { - let len = cmp::min(buf.len(), peeked_buf_len - self.pos); - let start = self.pos; - let end = self.pos + len; - - buf[..len].copy_from_slice(&self.peeked_buf[start..end]); - self.pos += len; - if self.pos == peeked_buf_len { - self.not_ready(); - } - return Ok(len); - }, - PeekedState::NotReady => { - let buf = &mut self.peeked_buf[self.pos..]; - let stream = self.inner.clone(); - let mut reader = stream.lock().unwrap(); - let read = reader.read(buf); - - match read { - Ok(0) => self.ready(), - Ok(read) => { - self.pos += read; - if self.pos == self.peeked_buf.len() { - self.ready(); - } - }, - Err(e) => return Err(e), - } - }, - }; - } - } -} - -impl ReadableChunks { - #[inline] - fn new(stream: S) -> Self { - ReadableChunks { - state: ReadState::NotReady, - stream, - waker: None, - } - } -} - -impl fmt::Debug for ReadableChunks { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ReadableChunks").finish() - } -} - -impl Read for ReadableChunks -where - S: Stream> + std::marker::Unpin, -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let waker = self.waker.as_ref().unwrap().clone(); - let mut cx = Context::from_waker(&waker); - - loop { - let ret; - match self.state { - ReadState::Ready(ref mut chunk) => { - let len = cmp::min(buf.len(), chunk.remaining()); - - buf[..len].copy_from_slice(&chunk[..len]); - chunk.advance(len); - if chunk.is_empty() { - ret = len; - } else { - return Ok(len); - } - }, - ReadState::NotReady => match self.poll_stream(&mut cx) { - Poll::Ready(StreamState::HasMore) => continue, - Poll::Ready(StreamState::Eof) => return Ok(0), - Poll::Pending => return Err(io::ErrorKind::WouldBlock.into()), - }, - ReadState::Eof => return Ok(0), - ReadState::Error(ref err) => { - return Err(io::Error::new(io::ErrorKind::Other, err.to_string())) - }, - } - self.state = ReadState::NotReady; - return Ok(ret); - } - } -} - -impl ReadableChunks -where - S: Stream> + std::marker::Unpin, -{ - /// Poll the readiness of the inner reader. - /// - /// This function will update the internal state and return a simplified - /// version of the `ReadState`. - fn poll_stream(&mut self, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Ready(Some(Ok(chunk))) => { - self.state = ReadState::Ready(chunk); - - Poll::Ready(StreamState::HasMore) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match futures_core::ready!(Pin::new(&mut self.body).poll_next(cx)) { + Some(Ok(bytes)) => { + self.total_read += bytes.len() as u64; + Poll::Ready(Some(Ok(bytes))) }, - Poll::Ready(Some(Err(err))) => { - self.state = ReadState::Error(err); - - Poll::Ready(StreamState::Eof) + Some(Err(err)) => { + // To prevent truncation attacks rustls treats close connection without a close_notify as + // an error of type std::io::Error with ErrorKind::UnexpectedEof. + // https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof + // + // The error can be safely ignored if we known that all content was received or is explicitly + // set in preferences. + let all_content_read = self + .content_length + .map_or(false, |c| c.0 == self.total_read); + if self.is_secure_scheme && + (all_content_read || pref!(network.tls.ignore_unexpected_eof)) + { + let source = err.source(); + let is_unexpected_eof = source + .and_then(|e| e.downcast_ref::()) + .map_or(false, |e| e.kind() == io::ErrorKind::UnexpectedEof); + if is_unexpected_eof { + return Poll::Ready(None); + } + } + Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, err)))) }, - Poll::Ready(None) => { - self.state = ReadState::Eof; - Poll::Ready(StreamState::Eof) - }, - Poll::Pending => Poll::Pending, + None => Poll::Ready(None), } } } diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index 4ddba4203bb..2950fd8e85b 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -648,6 +648,7 @@ async fn obtain_response( let host = request.uri().host().unwrap_or("").to_owned(); let override_manager = context.state.override_manager.clone(); let headers = headers.clone(); + let is_secure_scheme = url.is_secure_scheme(); client .request(request) @@ -681,7 +682,7 @@ async fn obtain_response( debug!("Not notifying devtools (no request_id)"); None }; - future::ready(Ok((Decoder::detect(res), msg))) + future::ready(Ok((Decoder::detect(res, is_secure_scheme), msg))) }) .map_err(move |error| { NetworkError::from_hyper_error( diff --git a/components/net/lib.rs b/components/net/lib.rs index 98a70391c8c..ccbb7e0d43a 100644 --- a/components/net/lib.rs +++ b/components/net/lib.rs @@ -32,6 +32,7 @@ pub mod fetch { /// A module for re-exports of items used in unit tests. pub mod test { + pub use crate::decoder::DECODER_BUFFER_SIZE; pub use crate::hosts::{parse_hostsfile, replace_host_table}; pub use crate::http_loader::HttpState; } diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index c56618833aa..11b31c87606 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -13,12 +13,12 @@ use std::time::Duration; use base::id::TEST_PIPELINE_ID; use cookie::Cookie as CookiePair; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use devtools_traits::{ ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest, HttpResponse as DevtoolsHttpResponse, NetworkEvent, }; -use flate2::write::{DeflateEncoder, GzEncoder}; +use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::Compression; use headers::authorization::Basic; use headers::{ @@ -32,17 +32,19 @@ use ipc_channel::ipc; use ipc_channel::router::ROUTER; use net::cookie::ServoCookie; use net::cookie_storage::CookieStorage; +use net::fetch::methods::{self}; use net::http_loader::determine_requests_referrer; use net::resource_thread::AuthCacheEntry; -use net::test::replace_host_table; +use net::test::{replace_host_table, DECODER_BUFFER_SIZE}; use net_traits::http_status::HttpStatus; use net_traits::request::{ BodyChunkRequest, BodyChunkResponse, BodySource, CredentialsMode, Destination, Referrer, - RequestBody, RequestBuilder, + Request, RequestBody, RequestBuilder, }; -use net_traits::response::ResponseBody; -use net_traits::{CookieSource, NetworkError, ReferrerPolicy}; +use net_traits::response::{Response, ResponseBody}; +use net_traits::{CookieSource, FetchTaskTarget, NetworkError, ReferrerPolicy}; use servo_url::{ImmutableOrigin, ServoUrl}; +use tokio_test::block_on; use crate::{fetch, fetch_with_context, make_server, new_fetch_context}; @@ -437,7 +439,7 @@ fn test_load_should_decode_the_response_as_deflate_when_response_headers_have_co header::CONTENT_ENCODING, HeaderValue::from_static("deflate"), ); - let mut e = DeflateEncoder::new(Vec::new(), Compression::default()); + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); e.write(b"Yay!").unwrap(); let encoded_content = e.finish().unwrap(); *response.body_mut() = encoded_content.into(); @@ -1356,3 +1358,66 @@ fn test_determine_requests_referrer_longer_than_4k() { assert_eq!(referer.unwrap().as_str(), "http://example.com/"); } + +#[test] +fn test_fetch_compressed_response_update_count() { + // contents of ../../tests/wpt/tests/fetch/content-encoding/br/resources/foo.text.br + const DATA_BROTLI_COMPRESSED: [u8; 15] = [ + 0xe1, 0x18, 0x48, 0xc1, 0x2f, 0x65, 0xf6, 0x16, 0x9f, 0x05, 0x01, 0xbb, 0x20, 0x00, 0x06, + ]; + const DATA_DECOMPRESSED_LEN: usize = 10500; + + let handler = move |_: HyperRequest, response: &mut HyperResponse| { + response + .headers_mut() + .insert(header::CONTENT_ENCODING, HeaderValue::from_static("br")); + *response.body_mut() = DATA_BROTLI_COMPRESSED.to_vec().into(); + }; + let (server, url) = make_server(handler); + + let mut request = RequestBuilder::new(url.clone(), Referrer::NoReferrer) + .method(Method::GET) + .body(None) + .destination(Destination::Document) + .origin(mock_origin()) + .pipeline_id(Some(TEST_PIPELINE_ID)) + .build(); + + struct FetchResponseCollector { + sender: Sender, + update_count: usize, + } + impl FetchTaskTarget for FetchResponseCollector { + fn process_request_body(&mut self, _: &Request) {} + fn process_request_eof(&mut self, _: &Request) {} + fn process_response(&mut self, _: &Response) {} + fn process_response_chunk(&mut self, _: Vec) { + self.update_count += 1; + } + /// Fired when the response is fully fetched + fn process_response_eof(&mut self, _: &Response) { + let _ = self.sender.send(self.update_count); + } + } + + let (sender, receiver) = unbounded(); + let mut target = FetchResponseCollector { + sender: sender, + update_count: 0, + }; + let response_update_count = block_on(async move { + methods::fetch( + &mut request, + &mut target, + &mut new_fetch_context(None, None, None), + ) + .await; + receiver.recv().unwrap() + }); + + server.close(); + + const EXPECTED_UPDATE_COUNT: usize = + (DATA_DECOMPRESSED_LEN + DECODER_BUFFER_SIZE - 1) / DECODER_BUFFER_SIZE; + assert_eq!(response_update_count, EXPECTED_UPDATE_COUNT); +} diff --git a/resources/prefs.json b/resources/prefs.json index 475e344fea8..38594aee2fd 100644 --- a/resources/prefs.json +++ b/resources/prefs.json @@ -110,6 +110,7 @@ "network.http-cache.disabled": false, "network.local_directory_listing.enabled": false, "network.mime.sniff": false, + "network.tls.ignore_unexpected_eof": false, "session-history.max-length": 20, "shell.background-color.rgba": [1.0, 1.0, 1.0, 1.0], "shell.crash_reporter.enabled": false, diff --git a/resources/wpt-prefs.json b/resources/wpt-prefs.json index c9b43ee5c3b..bb3185e2630 100644 --- a/resources/wpt-prefs.json +++ b/resources/wpt-prefs.json @@ -1,3 +1,4 @@ { - "dom.webxr.test": true + "dom.webxr.test": true, + "network.tls.ignore_unexpected_eof": true } diff --git a/tests/wpt/meta-legacy-layout/xhr/response-data-deflate.htm.ini b/tests/wpt/meta-legacy-layout/xhr/response-data-deflate.htm.ini deleted file mode 100644 index db9d0e4931e..00000000000 --- a/tests/wpt/meta-legacy-layout/xhr/response-data-deflate.htm.ini +++ /dev/null @@ -1,5 +0,0 @@ -[response-data-deflate.htm] - type: testharness - [XMLHttpRequest: content-encoding:deflate response was correctly inflated] - expected: FAIL - diff --git a/tests/wpt/meta/xhr/response-data-deflate.htm.ini b/tests/wpt/meta/xhr/response-data-deflate.htm.ini deleted file mode 100644 index d92c58d79f8..00000000000 --- a/tests/wpt/meta/xhr/response-data-deflate.htm.ini +++ /dev/null @@ -1,3 +0,0 @@ -[response-data-deflate.htm] - [XMLHttpRequest: content-encoding:deflate response was correctly inflated] - expected: FAIL