mirror of
https://github.com/servo/servo.git
synced 2025-08-03 12:40:06 +01:00
Redesign network response decoding to avoid creating decoders before some content is present.
This commit is contained in:
parent
2cf9a00c99
commit
6404a0ef53
11 changed files with 12998 additions and 12606 deletions
|
@ -22,6 +22,7 @@ crossbeam-channel = "0.3"
|
|||
devtools_traits = {path = "../devtools_traits"}
|
||||
embedder_traits = { path = "../embedder_traits" }
|
||||
flate2 = "1"
|
||||
futures = "0.1"
|
||||
headers-core = "0.0.1"
|
||||
headers-ext = "0.0.3"
|
||||
http = "0.1"
|
||||
|
@ -31,6 +32,7 @@ hyper-openssl = "0.7"
|
|||
immeta = "0.4"
|
||||
ipc-channel = "0.11"
|
||||
lazy_static = "1"
|
||||
libflate = "0.1"
|
||||
log = "0.4"
|
||||
malloc_size_of = { path = "../malloc_size_of" }
|
||||
malloc_size_of_derive = "0.1"
|
||||
|
|
|
@ -3,9 +3,6 @@
|
|||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use crate::hosts::replace_host;
|
||||
use crate::http_loader::Decoder;
|
||||
use flate2::read::GzDecoder;
|
||||
use hyper::body::Payload;
|
||||
use hyper::client::connect::{Connect, Destination};
|
||||
use hyper::client::HttpConnector as HyperHttpConnector;
|
||||
use hyper::rt::Future;
|
||||
|
@ -13,9 +10,7 @@ use hyper::{Body, Client};
|
|||
use hyper_openssl::HttpsConnector;
|
||||
use openssl::ssl::{SslConnector, SslConnectorBuilder, SslMethod, SslOptions};
|
||||
use openssl::x509;
|
||||
use std::io::{Cursor, Read};
|
||||
use tokio::prelude::future::Executor;
|
||||
use tokio::prelude::{Async, Stream};
|
||||
|
||||
pub const BUF_SIZE: usize = 32768;
|
||||
|
||||
|
@ -47,105 +42,6 @@ impl Connect for HttpConnector {
|
|||
}
|
||||
|
||||
pub type Connector = HttpsConnector<HttpConnector>;
|
||||
pub struct WrappedBody {
|
||||
pub body: Body,
|
||||
pub decoder: Decoder,
|
||||
}
|
||||
|
||||
impl WrappedBody {
|
||||
pub fn new(body: Body) -> Self {
|
||||
Self::new_with_decoder(body, Decoder::Plain)
|
||||
}
|
||||
|
||||
pub fn new_with_decoder(body: Body, decoder: Decoder) -> Self {
|
||||
WrappedBody { body, decoder }
|
||||
}
|
||||
}
|
||||
|
||||
impl Payload for WrappedBody {
|
||||
type Data = <Body as Payload>::Data;
|
||||
type Error = <Body as Payload>::Error;
|
||||
fn poll_data(&mut self) -> Result<Async<Option<Self::Data>>, Self::Error> {
|
||||
self.body.poll_data()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for WrappedBody {
|
||||
type Item = <Body as Stream>::Item;
|
||||
type Error = <Body as Stream>::Error;
|
||||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
||||
self.body.poll().map(|res| {
|
||||
res.map(|maybe_chunk| {
|
||||
if let Some(chunk) = maybe_chunk {
|
||||
match self.decoder {
|
||||
Decoder::Plain => Some(chunk),
|
||||
Decoder::Gzip(Some(ref mut decoder)) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
decoder.get_mut().get_mut().extend(chunk.as_ref());
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
Decoder::Gzip(None) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
let mut decoder = GzDecoder::new(Cursor::new(chunk.into_bytes()));
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
buf.truncate(len);
|
||||
self.decoder = Decoder::Gzip(Some(decoder));
|
||||
Some(buf.into())
|
||||
},
|
||||
Decoder::Deflate(ref mut decoder) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
decoder.get_mut().get_mut().extend(chunk.as_ref());
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
Decoder::Brotli(ref mut decoder) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
decoder.get_mut().get_mut().extend(chunk.as_ref());
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// Hyper is done downloading but we still have uncompressed data
|
||||
match self.decoder {
|
||||
Decoder::Gzip(Some(ref mut decoder)) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
if len == 0 {
|
||||
return None;
|
||||
}
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
Decoder::Deflate(ref mut decoder) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
if len == 0 {
|
||||
return None;
|
||||
}
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
Decoder::Brotli(ref mut decoder) => {
|
||||
let mut buf = vec![0; BUF_SIZE];
|
||||
let len = decoder.read(&mut buf).ok()?;
|
||||
if len == 0 {
|
||||
return None;
|
||||
}
|
||||
buf.truncate(len);
|
||||
Some(buf.into())
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_ssl_connector_builder(certs: &str) -> SslConnectorBuilder {
|
||||
// certs include multiple certificates. We could add all of them at once,
|
||||
|
@ -189,7 +85,7 @@ pub fn create_ssl_connector_builder(certs: &str) -> SslConnectorBuilder {
|
|||
pub fn create_http_client<E>(
|
||||
ssl_connector_builder: SslConnectorBuilder,
|
||||
executor: E,
|
||||
) -> Client<Connector, WrappedBody>
|
||||
) -> Client<Connector, Body>
|
||||
where
|
||||
E: Executor<Box<dyn Future<Error = (), Item = ()> + Send + 'static>> + Sync + Send + 'static,
|
||||
{
|
||||
|
|
483
components/net/decoder.rs
Normal file
483
components/net/decoder.rs
Normal file
|
@ -0,0 +1,483 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
//! Adapted from an implementation in reqwest.
|
||||
|
||||
/*!
|
||||
A potentially non-blocking response decoder.
|
||||
|
||||
The decoder wraps a stream of chunks and produces a new stream of decompressed chunks.
|
||||
The decompressed chunks aren't guaranteed to align to the compressed ones.
|
||||
|
||||
If the response is plaintext then no additional work is carried out.
|
||||
Chunks are just passed along.
|
||||
|
||||
If the response is gzip, then the chunks are decompressed into a buffer.
|
||||
Slices of that buffer are emitted as new chunks.
|
||||
|
||||
This module consists of a few main types:
|
||||
|
||||
- `ReadableChunks` is a `Read`-like wrapper around a stream
|
||||
- `Decoder` is a layer over `ReadableChunks` that applies the right decompression
|
||||
|
||||
The following types directly support the gzip compression case:
|
||||
|
||||
- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
|
||||
- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail
|
||||
*/
|
||||
|
||||
use crate::connector::BUF_SIZE;
|
||||
use brotli::Decompressor;
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use flate2::read::DeflateDecoder;
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use hyper::header::{HeaderValue, CONTENT_ENCODING, TRANSFER_ENCODING};
|
||||
use hyper::{self, Body, Chunk, Response};
|
||||
use libflate::non_blocking::gzip;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::io::{self, Read};
|
||||
use std::mem;
|
||||
|
||||
pub enum Error {
|
||||
Io(io::Error),
|
||||
Hyper(hyper::error::Error),
|
||||
}
|
||||
|
||||
impl From<io::Error> for Error {
|
||||
fn from(err: io::Error) -> Error {
|
||||
Error::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::error::Error> for Error {
|
||||
fn from(err: hyper::error::Error) -> Error {
|
||||
Error::Hyper(err)
|
||||
}
|
||||
}
|
||||
|
||||
const INIT_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
/// A response decompressor over a non-blocking stream of chunks.
|
||||
///
|
||||
/// The inner decoder may be constructed asynchronously.
|
||||
pub struct Decoder {
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum DecoderType {
|
||||
Gzip,
|
||||
Brotli,
|
||||
Deflate,
|
||||
}
|
||||
|
||||
enum Inner {
|
||||
/// A `PlainText` decoder just returns the response content as is.
|
||||
PlainText(Body),
|
||||
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
|
||||
Gzip(Gzip),
|
||||
/// A `Delfate` decoder will uncompress the inflated response content before returning it.
|
||||
Deflate(Deflate),
|
||||
/// A `Brotli` decoder will uncompress the brotli-encoded response content before returning it.
|
||||
Brotli(Brotli),
|
||||
/// A decoder that doesn't have a value yet.
|
||||
Pending(Pending),
|
||||
}
|
||||
|
||||
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
|
||||
struct Pending {
|
||||
body: ReadableChunks<Body>,
|
||||
type_: DecoderType,
|
||||
}
|
||||
|
||||
/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
|
||||
/// as a `Chunk`.
|
||||
struct Gzip {
|
||||
inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Decoder {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Decoder").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder {
|
||||
/// A plain text decoder.
|
||||
///
|
||||
/// This decoder will emit the underlying chunks as-is.
|
||||
#[inline]
|
||||
fn plain_text(body: Body) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::PlainText(body),
|
||||
}
|
||||
}
|
||||
|
||||
/// A pending decoder.
|
||||
///
|
||||
/// This decoder will buffer and decompress chunks that are encoded in the expected format.
|
||||
#[inline]
|
||||
fn pending(body: Body, type_: DecoderType) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::Pending(Pending {
|
||||
body: ReadableChunks::new(body),
|
||||
type_: type_,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a Decoder from a hyper request.
|
||||
///
|
||||
/// A decoder is just a wrapper around the hyper request that knows
|
||||
/// how to decode the content body of the request.
|
||||
///
|
||||
/// Uses the correct variant by inspecting the Content-Encoding header.
|
||||
pub fn detect(response: Response<Body>) -> Response<Decoder> {
|
||||
let values = response
|
||||
.headers()
|
||||
.get_all(CONTENT_ENCODING)
|
||||
.iter()
|
||||
.chain(response.headers().get_all(TRANSFER_ENCODING).iter());
|
||||
let decoder = values.fold(None, |acc, enc| {
|
||||
acc.or_else(|| {
|
||||
if enc == HeaderValue::from_static("gzip") {
|
||||
Some(DecoderType::Gzip)
|
||||
} else if enc == HeaderValue::from_static("br") {
|
||||
Some(DecoderType::Brotli)
|
||||
} else if enc == HeaderValue::from_static("deflate") {
|
||||
Some(DecoderType::Deflate)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
});
|
||||
match decoder {
|
||||
Some(type_) => response.map(|r| Decoder::pending(r, type_)),
|
||||
None => response.map(Decoder::plain_text),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Decoder {
|
||||
type Item = Chunk;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Do a read or poll for a pending decoder value.
|
||||
let new_value = match self.inner {
|
||||
Inner::Pending(ref mut future) => match future.poll() {
|
||||
Ok(Async::Ready(inner)) => inner,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e.into()),
|
||||
},
|
||||
Inner::PlainText(ref mut body) => return body.poll().map_err(|e| e.into()),
|
||||
Inner::Gzip(ref mut decoder) => return decoder.poll(),
|
||||
Inner::Brotli(ref mut decoder) => return decoder.poll(),
|
||||
Inner::Deflate(ref mut decoder) => return decoder.poll(),
|
||||
};
|
||||
|
||||
self.inner = new_value;
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Pending {
|
||||
type Item = Inner;
|
||||
type Error = hyper::error::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let body_state = match self.body.poll_stream() {
|
||||
Ok(Async::Ready(state)) => state,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty()));
|
||||
// libflate does a read_exact([0; 2]), so its impossible to tell
|
||||
// if the stream was empty, or truly had an UnexpectedEof.
|
||||
// Therefore, we need to check for EOF first.
|
||||
match body_state {
|
||||
StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))),
|
||||
StreamState::HasMore => Ok(Async::Ready(match self.type_ {
|
||||
DecoderType::Gzip => Inner::Gzip(Gzip::new(body)),
|
||||
DecoderType::Brotli => Inner::Brotli(Brotli::new(body)),
|
||||
DecoderType::Deflate => Inner::Deflate(Deflate::new(body)),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Gzip {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
Gzip {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(gzip::Decoder::new(Peeked::new(stream))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unsafe_code)]
|
||||
fn poll_with_read(reader: &mut Read, buf: &mut BytesMut) -> Poll<Option<Chunk>, Error> {
|
||||
if buf.remaining_mut() == 0 {
|
||||
buf.reserve(INIT_BUFFER_SIZE);
|
||||
}
|
||||
|
||||
// The buffer contains uninitialised memory so getting a readable slice is unsafe.
|
||||
// We trust the reader not to read from the memory given.
|
||||
//
|
||||
// To be safe, this memory could be zeroed before passing to the reader.
|
||||
// Otherwise we might need to deal with the case where the reader panics.
|
||||
let read = {
|
||||
let mut buf = unsafe { buf.bytes_mut() };
|
||||
reader.read(&mut buf)
|
||||
};
|
||||
|
||||
match read {
|
||||
Ok(read) if read == 0 => Ok(Async::Ready(None)),
|
||||
Ok(read) => {
|
||||
unsafe { buf.advance_mut(read) };
|
||||
let chunk = Chunk::from(buf.split_to(read).freeze());
|
||||
|
||||
Ok(Async::Ready(Some(chunk)))
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Gzip {
|
||||
type Item = Chunk;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
poll_with_read(&mut self.inner, &mut self.buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A brotli decoder that reads from a `brotli::Decompressor` into a `BytesMut` and emits the results
|
||||
/// as a `Chunk`.
|
||||
struct Brotli {
|
||||
inner: Box<Decompressor<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
}
|
||||
|
||||
impl Brotli {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
Self {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(Decompressor::new(Peeked::new(stream), BUF_SIZE)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Brotli {
|
||||
type Item = Chunk;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
poll_with_read(&mut self.inner, &mut self.buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A deflate decoder that reads from a `deflate::Decoder` into a `BytesMut` and emits the results
|
||||
/// as a `Chunk`.
|
||||
struct Deflate {
|
||||
inner: Box<DeflateDecoder<Peeked<ReadableChunks<Body>>>>,
|
||||
buf: BytesMut,
|
||||
}
|
||||
|
||||
impl Deflate {
|
||||
fn new(stream: ReadableChunks<Body>) -> Self {
|
||||
Self {
|
||||
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
|
||||
inner: Box::new(DeflateDecoder::new(Peeked::new(stream))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Deflate {
|
||||
type Item = Chunk;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
poll_with_read(&mut self.inner, &mut self.buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Read`able wrapper over a stream of chunks.
|
||||
pub struct ReadableChunks<S> {
|
||||
state: ReadState,
|
||||
stream: S,
|
||||
}
|
||||
|
||||
enum ReadState {
|
||||
/// A chunk is ready to be read from.
|
||||
Ready(Chunk),
|
||||
/// The next chunk isn't ready yet.
|
||||
NotReady,
|
||||
/// The stream has finished.
|
||||
Eof,
|
||||
}
|
||||
|
||||
enum StreamState {
|
||||
/// More bytes can be read from the stream.
|
||||
HasMore,
|
||||
/// No more bytes can be read from the stream.
|
||||
Eof,
|
||||
}
|
||||
|
||||
/// A buffering reader that ensures `Read`s return at least a few bytes.
|
||||
struct Peeked<R> {
|
||||
state: PeekedState,
|
||||
peeked_buf: [u8; 10],
|
||||
pos: usize,
|
||||
inner: R,
|
||||
}
|
||||
|
||||
enum PeekedState {
|
||||
/// The internal buffer hasn't filled yet.
|
||||
NotReady,
|
||||
/// The internal buffer can be read.
|
||||
Ready(usize),
|
||||
}
|
||||
|
||||
impl<R> Peeked<R> {
|
||||
#[inline]
|
||||
fn new(inner: R) -> Self {
|
||||
Peeked {
|
||||
state: PeekedState::NotReady,
|
||||
peeked_buf: [0; 10],
|
||||
inner: inner,
|
||||
pos: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn ready(&mut self) {
|
||||
self.state = PeekedState::Ready(self.pos);
|
||||
self.pos = 0;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn not_ready(&mut self) {
|
||||
self.state = PeekedState::NotReady;
|
||||
self.pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for Peeked<R> {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
loop {
|
||||
match self.state {
|
||||
PeekedState::Ready(peeked_buf_len) => {
|
||||
let len = cmp::min(buf.len(), peeked_buf_len - self.pos);
|
||||
let start = self.pos;
|
||||
let end = self.pos + len;
|
||||
|
||||
buf[..len].copy_from_slice(&self.peeked_buf[start..end]);
|
||||
self.pos += len;
|
||||
if self.pos == peeked_buf_len {
|
||||
self.not_ready();
|
||||
}
|
||||
|
||||
return Ok(len);
|
||||
},
|
||||
PeekedState::NotReady => {
|
||||
let read = self.inner.read(&mut self.peeked_buf[self.pos..]);
|
||||
|
||||
match read {
|
||||
Ok(0) => self.ready(),
|
||||
Ok(read) => {
|
||||
self.pos += read;
|
||||
if self.pos == self.peeked_buf.len() {
|
||||
self.ready();
|
||||
}
|
||||
},
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S> {
|
||||
#[inline]
|
||||
fn new(stream: S) -> Self {
|
||||
ReadableChunks {
|
||||
state: ReadState::NotReady,
|
||||
stream: stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> fmt::Debug for ReadableChunks<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("ReadableChunks").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Read for ReadableChunks<S>
|
||||
where
|
||||
S: Stream<Item = Chunk, Error = hyper::error::Error>,
|
||||
{
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
loop {
|
||||
let ret;
|
||||
match self.state {
|
||||
ReadState::Ready(ref mut chunk) => {
|
||||
let len = cmp::min(buf.len(), chunk.remaining());
|
||||
|
||||
buf[..len].copy_from_slice(&chunk[..len]);
|
||||
chunk.advance(len);
|
||||
if chunk.is_empty() {
|
||||
ret = len;
|
||||
} else {
|
||||
return Ok(len);
|
||||
}
|
||||
},
|
||||
ReadState::NotReady => match self.poll_stream() {
|
||||
Ok(Async::Ready(StreamState::HasMore)) => continue,
|
||||
Ok(Async::Ready(StreamState::Eof)) => return Ok(0),
|
||||
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
|
||||
Err(e) => {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, e));
|
||||
},
|
||||
},
|
||||
ReadState::Eof => return Ok(0),
|
||||
}
|
||||
self.state = ReadState::NotReady;
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S>
|
||||
where
|
||||
S: Stream<Item = Chunk, Error = hyper::error::Error>,
|
||||
{
|
||||
/// Poll the readiness of the inner reader.
|
||||
///
|
||||
/// This function will update the internal state and return a simplified
|
||||
/// version of the `ReadState`.
|
||||
fn poll_stream(&mut self) -> Poll<StreamState, hyper::error::Error> {
|
||||
match self.stream.poll() {
|
||||
Ok(Async::Ready(Some(chunk))) => {
|
||||
self.state = ReadState::Ready(chunk);
|
||||
|
||||
Ok(Async::Ready(StreamState::HasMore))
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.state = ReadState::Eof;
|
||||
|
||||
Ok(Async::Ready(StreamState::Eof))
|
||||
},
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,9 +2,10 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use crate::connector::{create_http_client, Connector, WrappedBody, BUF_SIZE};
|
||||
use crate::connector::{create_http_client, Connector};
|
||||
use crate::cookie;
|
||||
use crate::cookie_storage::CookieStorage;
|
||||
use crate::decoder::Decoder;
|
||||
use crate::fetch::cors_cache::CorsCache;
|
||||
use crate::fetch::methods::{
|
||||
is_cors_safelisted_method, is_cors_safelisted_request_header, main_fetch,
|
||||
|
@ -13,14 +14,11 @@ use crate::fetch::methods::{Data, DoneChannel, FetchContext, Target};
|
|||
use crate::hsts::HstsList;
|
||||
use crate::http_cache::HttpCache;
|
||||
use crate::resource_thread::AuthCache;
|
||||
use brotli::Decompressor;
|
||||
use bytes::Bytes;
|
||||
use crossbeam_channel::{unbounded, Sender};
|
||||
use devtools_traits::{
|
||||
ChromeToDevtoolsControlMsg, DevtoolsControlMsg, HttpRequest as DevtoolsHttpRequest,
|
||||
};
|
||||
use devtools_traits::{HttpResponse as DevtoolsHttpResponse, NetworkEvent};
|
||||
use flate2::read::{DeflateDecoder, GzDecoder};
|
||||
use headers_core::HeaderMapExt;
|
||||
use headers_ext::{AccessControlAllowCredentials, AccessControlAllowHeaders};
|
||||
use headers_ext::{
|
||||
|
@ -49,7 +47,6 @@ use openssl::ssl::SslConnectorBuilder;
|
|||
use servo_url::{ImmutableOrigin, ServoUrl};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::error::Error;
|
||||
use std::io::Cursor;
|
||||
use std::iter::FromIterator;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
|
@ -71,7 +68,7 @@ pub struct HttpState {
|
|||
pub http_cache: RwLock<HttpCache>,
|
||||
pub auth_cache: RwLock<AuthCache>,
|
||||
pub history_states: RwLock<HashMap<HistoryStateId, Vec<u8>>>,
|
||||
pub client: Client<Connector, WrappedBody>,
|
||||
pub client: Client<Connector, Body>,
|
||||
}
|
||||
|
||||
impl HttpState {
|
||||
|
@ -266,31 +263,6 @@ fn set_cookies_from_headers(
|
|||
}
|
||||
}
|
||||
|
||||
impl Decoder {
|
||||
fn from_http_response(response: &HyperResponse<Body>) -> Decoder {
|
||||
if let Some(encoding) = response.headers().typed_get::<ContentEncoding>() {
|
||||
if encoding.contains("gzip") {
|
||||
Decoder::Gzip(None)
|
||||
} else if encoding.contains("deflate") {
|
||||
Decoder::Deflate(DeflateDecoder::new(Cursor::new(Bytes::new())))
|
||||
} else if encoding.contains("br") {
|
||||
Decoder::Brotli(Decompressor::new(Cursor::new(Bytes::new()), BUF_SIZE))
|
||||
} else {
|
||||
Decoder::Plain
|
||||
}
|
||||
} else {
|
||||
Decoder::Plain
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Decoder {
|
||||
Gzip(Option<GzDecoder<Cursor<Bytes>>>),
|
||||
Deflate(DeflateDecoder<Cursor<Bytes>>),
|
||||
Brotli(Decompressor<Cursor<Bytes>>),
|
||||
Plain,
|
||||
}
|
||||
|
||||
fn prepare_devtools_request(
|
||||
request_id: String,
|
||||
url: ServoUrl,
|
||||
|
@ -367,7 +339,7 @@ fn auth_from_cache(
|
|||
}
|
||||
|
||||
fn obtain_response(
|
||||
client: &Client<Connector, WrappedBody>,
|
||||
client: &Client<Connector, Body>,
|
||||
url: &ServoUrl,
|
||||
method: &Method,
|
||||
request_headers: &HeaderMap,
|
||||
|
@ -379,10 +351,7 @@ fn obtain_response(
|
|||
is_xhr: bool,
|
||||
) -> Box<
|
||||
dyn Future<
|
||||
Item = (
|
||||
HyperResponse<WrappedBody>,
|
||||
Option<ChromeToDevtoolsControlMsg>,
|
||||
),
|
||||
Item = (HyperResponse<Decoder>, Option<ChromeToDevtoolsControlMsg>),
|
||||
Error = NetworkError,
|
||||
>,
|
||||
> {
|
||||
|
@ -423,7 +392,7 @@ fn obtain_response(
|
|||
.replace("{", "%7B")
|
||||
.replace("}", "%7D"),
|
||||
)
|
||||
.body(WrappedBody::new(request_body.clone().into()));
|
||||
.body(request_body.clone().into());
|
||||
|
||||
let mut request = match request {
|
||||
Ok(request) => request,
|
||||
|
@ -474,11 +443,7 @@ fn obtain_response(
|
|||
debug!("Not notifying devtools (no request_id)");
|
||||
None
|
||||
};
|
||||
let decoder = Decoder::from_http_response(&res);
|
||||
Ok((
|
||||
res.map(move |r| WrappedBody::new_with_decoder(r, decoder)),
|
||||
msg,
|
||||
))
|
||||
Ok((Decoder::detect(res), msg))
|
||||
})
|
||||
.map_err(move |e| NetworkError::from_hyper_error(&e)),
|
||||
)
|
||||
|
@ -1265,6 +1230,7 @@ fn http_network_fetch(
|
|||
}
|
||||
|
||||
*res_body.lock().unwrap() = ResponseBody::Receiving(vec![]);
|
||||
let res_body2 = res_body.clone();
|
||||
|
||||
if let Some(ref sender) = devtools_sender {
|
||||
if let Some(m) = msg {
|
||||
|
@ -1285,6 +1251,7 @@ fn http_network_fetch(
|
|||
}
|
||||
|
||||
let done_sender2 = done_sender.clone();
|
||||
let done_sender3 = done_sender.clone();
|
||||
HANDLE.lock().unwrap().spawn(
|
||||
res.into_body()
|
||||
.map_err(|_| ())
|
||||
|
@ -1311,7 +1278,15 @@ fn http_network_fetch(
|
|||
let _ = done_sender2.send(Data::Done);
|
||||
future::ok(())
|
||||
})
|
||||
.map_err(|_| ()),
|
||||
.map_err(move |_| {
|
||||
let mut body = res_body2.lock().unwrap();
|
||||
let completed_body = match *body {
|
||||
ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]),
|
||||
_ => vec![],
|
||||
};
|
||||
*body = ResponseBody::Done(completed_body);
|
||||
let _ = done_sender3.send(Data::Done);
|
||||
}),
|
||||
);
|
||||
|
||||
// TODO these substeps aren't possible yet
|
||||
|
|
|
@ -21,6 +21,7 @@ pub mod connector;
|
|||
pub mod cookie;
|
||||
pub mod cookie_storage;
|
||||
mod data_loader;
|
||||
mod decoder;
|
||||
pub mod filemanager_thread;
|
||||
mod hosts;
|
||||
pub mod hsts;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue