mirror of
https://github.com/servo/servo.git
synced 2025-07-23 15:23:42 +01:00
fix: ReadableStream::get_in_memory_bytes
too large (#36914)
Fix a IPC hang due to `ReadableStream::get_in_memory_bytes` could return really huge chunk. Testing: WPT on ReadableStream should pass Fixes: IPC hang when transferring huge chunk bytes from `ReadableStream` cc @gterzian @Taym95 since this is also related to ReadableStream. --------- Signed-off-by: Yu Wei Wu <yuweiwu@YunoMacBook-Air.local> Co-authored-by: Yu Wei Wu <yuweiwu@YunoMacBook-Air.local>
This commit is contained in:
parent
c37d5572fd
commit
aa4ad0f2be
5 changed files with 29 additions and 20 deletions
|
@ -37,7 +37,7 @@ use hyper::ext::ReasonPhrase;
|
||||||
use hyper::header::{HeaderName, TRANSFER_ENCODING};
|
use hyper::header::{HeaderName, TRANSFER_ENCODING};
|
||||||
use hyper_serde::Serde;
|
use hyper_serde::Serde;
|
||||||
use hyper_util::client::legacy::Client;
|
use hyper_util::client::legacy::Client;
|
||||||
use ipc_channel::ipc::{self, IpcSender};
|
use ipc_channel::ipc::{self, IpcSender, IpcSharedMemory};
|
||||||
use ipc_channel::router::ROUTER;
|
use ipc_channel::router::ROUTER;
|
||||||
use log::{debug, error, info, log_enabled, warn};
|
use log::{debug, error, info, log_enabled, warn};
|
||||||
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
|
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
|
||||||
|
@ -462,7 +462,7 @@ fn auth_from_cache(
|
||||||
/// used to fill the body with bytes coming-in over IPC.
|
/// used to fill the body with bytes coming-in over IPC.
|
||||||
enum BodyChunk {
|
enum BodyChunk {
|
||||||
/// A chunk of bytes.
|
/// A chunk of bytes.
|
||||||
Chunk(Vec<u8>),
|
Chunk(IpcSharedMemory),
|
||||||
/// Body is done.
|
/// Body is done.
|
||||||
Done,
|
Done,
|
||||||
}
|
}
|
||||||
|
@ -489,12 +489,14 @@ enum BodySink {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodySink {
|
impl BodySink {
|
||||||
fn transmit_bytes(&self, bytes: Vec<u8>) {
|
fn transmit_bytes(&self, bytes: IpcSharedMemory) {
|
||||||
match self {
|
match self {
|
||||||
BodySink::Chunked(sender) => {
|
BodySink::Chunked(sender) => {
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
HANDLE.spawn(async move {
|
HANDLE.spawn(async move {
|
||||||
let _ = sender.send(Ok(Frame::data(bytes.into()))).await;
|
let _ = sender
|
||||||
|
.send(Ok(Frame::data(Bytes::copy_from_slice(&bytes))))
|
||||||
|
.await;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
BodySink::Buffered(sender) => {
|
BodySink::Buffered(sender) => {
|
||||||
|
@ -577,7 +579,7 @@ async fn obtain_response(
|
||||||
body_port,
|
body_port,
|
||||||
Box::new(move |message| {
|
Box::new(move |message| {
|
||||||
info!("Received message");
|
info!("Received message");
|
||||||
let bytes: Vec<u8> = match message.unwrap() {
|
let bytes = match message.unwrap() {
|
||||||
BodyChunkResponse::Chunk(bytes) => bytes,
|
BodyChunkResponse::Chunk(bytes) => bytes,
|
||||||
BodyChunkResponse::Done => {
|
BodyChunkResponse::Done => {
|
||||||
// Step 3, abort these parallel steps.
|
// Step 3, abort these parallel steps.
|
||||||
|
@ -622,8 +624,8 @@ async fn obtain_response(
|
||||||
let mut body = vec![];
|
let mut body = vec![];
|
||||||
loop {
|
loop {
|
||||||
match receiver.recv().await {
|
match receiver.recv().await {
|
||||||
Some(BodyChunk::Chunk(mut bytes)) => {
|
Some(BodyChunk::Chunk(bytes)) => {
|
||||||
body.append(&mut bytes);
|
body.extend_from_slice(&bytes);
|
||||||
},
|
},
|
||||||
Some(BodyChunk::Done) => break,
|
Some(BodyChunk::Done) => break,
|
||||||
None => warn!("Failed to read all chunks from request body."),
|
None => warn!("Failed to read all chunks from request body."),
|
||||||
|
|
|
@ -31,7 +31,7 @@ use http::{HeaderName, Method, StatusCode};
|
||||||
use http_body_util::combinators::BoxBody;
|
use http_body_util::combinators::BoxBody;
|
||||||
use hyper::body::{Body, Bytes, Incoming};
|
use hyper::body::{Body, Bytes, Incoming};
|
||||||
use hyper::{Request as HyperRequest, Response as HyperResponse};
|
use hyper::{Request as HyperRequest, Response as HyperResponse};
|
||||||
use ipc_channel::ipc;
|
use ipc_channel::ipc::{self, IpcSharedMemory};
|
||||||
use ipc_channel::router::ROUTER;
|
use ipc_channel::router::ROUTER;
|
||||||
use net::cookie::ServoCookie;
|
use net::cookie::ServoCookie;
|
||||||
use net::cookie_storage::CookieStorage;
|
use net::cookie_storage::CookieStorage;
|
||||||
|
@ -100,7 +100,7 @@ pub fn expect_devtools_http_response(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_request_body_with_content(content: Vec<u8>) -> RequestBody {
|
fn create_request_body_with_content(content: IpcSharedMemory) -> RequestBody {
|
||||||
let content_len = content.len();
|
let content_len = content.len();
|
||||||
|
|
||||||
let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
|
let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
|
||||||
|
@ -592,7 +592,7 @@ fn test_load_doesnt_send_request_body_on_any_redirect() {
|
||||||
let (pre_server, pre_url) = make_server(pre_handler);
|
let (pre_server, pre_url) = make_server(pre_handler);
|
||||||
|
|
||||||
let content = b"Body on POST!";
|
let content = b"Body on POST!";
|
||||||
let request_body = create_request_body_with_content(content.to_vec());
|
let request_body = create_request_body_with_content(IpcSharedMemory::from_bytes(content));
|
||||||
|
|
||||||
let request = RequestBuilder::new(None, pre_url.clone(), Referrer::NoReferrer)
|
let request = RequestBuilder::new(None, pre_url.clone(), Referrer::NoReferrer)
|
||||||
.body(Some(request_body))
|
.body(Some(request_body))
|
||||||
|
@ -904,7 +904,7 @@ fn test_load_sets_content_length_to_length_of_request_body() {
|
||||||
};
|
};
|
||||||
let (server, url) = make_server(handler);
|
let (server, url) = make_server(handler);
|
||||||
|
|
||||||
let request_body = create_request_body_with_content(content.to_vec());
|
let request_body = create_request_body_with_content(IpcSharedMemory::from_bytes(content));
|
||||||
|
|
||||||
let request = RequestBuilder::new(None, url.clone(), Referrer::NoReferrer)
|
let request = RequestBuilder::new(None, url.clone(), Referrer::NoReferrer)
|
||||||
.method(Method::POST)
|
.method(Method::POST)
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::{ptr, slice, str};
|
||||||
|
|
||||||
use constellation_traits::BlobImpl;
|
use constellation_traits::BlobImpl;
|
||||||
use encoding_rs::{Encoding, UTF_8};
|
use encoding_rs::{Encoding, UTF_8};
|
||||||
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
|
use ipc_channel::ipc::{self, IpcReceiver, IpcSender, IpcSharedMemory};
|
||||||
use ipc_channel::router::ROUTER;
|
use ipc_channel::router::ROUTER;
|
||||||
use js::jsapi::{Heap, JS_ClearPendingException, JSObject, Value as JSValue};
|
use js::jsapi::{Heap, JS_ClearPendingException, JSObject, Value as JSValue};
|
||||||
use js::jsval::{JSVal, UndefinedValue};
|
use js::jsval::{JSVal, UndefinedValue};
|
||||||
|
@ -73,7 +73,7 @@ struct TransmitBodyConnectHandler {
|
||||||
task_source: SendableTaskSource,
|
task_source: SendableTaskSource,
|
||||||
bytes_sender: Option<IpcSender<BodyChunkResponse>>,
|
bytes_sender: Option<IpcSender<BodyChunkResponse>>,
|
||||||
control_sender: IpcSender<BodyChunkRequest>,
|
control_sender: IpcSender<BodyChunkRequest>,
|
||||||
in_memory: Option<Vec<u8>>,
|
in_memory: Option<IpcSharedMemory>,
|
||||||
in_memory_done: bool,
|
in_memory_done: bool,
|
||||||
source: BodySource,
|
source: BodySource,
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,7 @@ impl TransmitBodyConnectHandler {
|
||||||
stream: Trusted<ReadableStream>,
|
stream: Trusted<ReadableStream>,
|
||||||
task_source: SendableTaskSource,
|
task_source: SendableTaskSource,
|
||||||
control_sender: IpcSender<BodyChunkRequest>,
|
control_sender: IpcSender<BodyChunkRequest>,
|
||||||
in_memory: Option<Vec<u8>>,
|
in_memory: Option<IpcSharedMemory>,
|
||||||
source: BodySource,
|
source: BodySource,
|
||||||
) -> TransmitBodyConnectHandler {
|
) -> TransmitBodyConnectHandler {
|
||||||
TransmitBodyConnectHandler {
|
TransmitBodyConnectHandler {
|
||||||
|
@ -160,7 +160,7 @@ impl TransmitBodyConnectHandler {
|
||||||
.bytes_sender
|
.bytes_sender
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("No bytes sender to transmit source.")
|
.expect("No bytes sender to transmit source.")
|
||||||
.send(BodyChunkResponse::Chunk(bytes.clone()));
|
.send(BodyChunkResponse::Chunk(bytes));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
warn!("Re-directs for file-based Blobs not supported yet.");
|
warn!("Re-directs for file-based Blobs not supported yet.");
|
||||||
|
@ -310,7 +310,11 @@ impl Callback for TransmitBodyPromiseHandler {
|
||||||
// Step 5.1 and 5.2, transmit chunk.
|
// Step 5.1 and 5.2, transmit chunk.
|
||||||
// Send the chunk to the body transmitter in net::http_loader::obtain_response.
|
// Send the chunk to the body transmitter in net::http_loader::obtain_response.
|
||||||
// TODO: queue a fetch task on request to process request body for request.
|
// TODO: queue a fetch task on request to process request body for request.
|
||||||
let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk));
|
let _ = self
|
||||||
|
.bytes_sender
|
||||||
|
.send(BodyChunkResponse::Chunk(IpcSharedMemory::from_bytes(
|
||||||
|
&chunk,
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ use std::rc::Rc;
|
||||||
use base::id::{MessagePortId, MessagePortIndex};
|
use base::id::{MessagePortId, MessagePortIndex};
|
||||||
use constellation_traits::MessagePortImpl;
|
use constellation_traits::MessagePortImpl;
|
||||||
use dom_struct::dom_struct;
|
use dom_struct::dom_struct;
|
||||||
|
use ipc_channel::ipc::IpcSharedMemory;
|
||||||
use js::conversions::ToJSValConvertible;
|
use js::conversions::ToJSValConvertible;
|
||||||
use js::jsapi::{Heap, JSObject};
|
use js::jsapi::{Heap, JSObject};
|
||||||
use js::jsval::{JSVal, NullValue, ObjectValue, UndefinedValue};
|
use js::jsval::{JSVal, NullValue, ObjectValue, UndefinedValue};
|
||||||
|
@ -1131,12 +1132,14 @@ impl ReadableStream {
|
||||||
|
|
||||||
/// Return bytes for synchronous use, if the stream has all data in memory.
|
/// Return bytes for synchronous use, if the stream has all data in memory.
|
||||||
/// Useful for native source integration only.
|
/// Useful for native source integration only.
|
||||||
pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
|
pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
|
||||||
match self.controller.borrow().as_ref() {
|
match self.controller.borrow().as_ref() {
|
||||||
Some(ControllerType::Default(controller)) => controller
|
Some(ControllerType::Default(controller)) => controller
|
||||||
.get()
|
.get()
|
||||||
.expect("Stream should have controller.")
|
.expect("Stream should have controller.")
|
||||||
.get_in_memory_bytes(),
|
.get_in_memory_bytes()
|
||||||
|
.as_deref()
|
||||||
|
.map(IpcSharedMemory::from_bytes),
|
||||||
_ => {
|
_ => {
|
||||||
unreachable!("Getting in-memory bytes for a stream with a non-default controller")
|
unreachable!("Getting in-memory bytes for a stream with a non-default controller")
|
||||||
},
|
},
|
||||||
|
|
|
@ -8,7 +8,7 @@ use base::id::{PipelineId, WebViewId};
|
||||||
use content_security_policy::{self as csp};
|
use content_security_policy::{self as csp};
|
||||||
use http::header::{AUTHORIZATION, HeaderName};
|
use http::header::{AUTHORIZATION, HeaderName};
|
||||||
use http::{HeaderMap, Method};
|
use http::{HeaderMap, Method};
|
||||||
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
|
use ipc_channel::ipc::{self, IpcReceiver, IpcSender, IpcSharedMemory};
|
||||||
use malloc_size_of_derive::MallocSizeOf;
|
use malloc_size_of_derive::MallocSizeOf;
|
||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -156,7 +156,7 @@ pub enum BodySource {
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub enum BodyChunkResponse {
|
pub enum BodyChunkResponse {
|
||||||
/// A chunk of bytes.
|
/// A chunk of bytes.
|
||||||
Chunk(Vec<u8>),
|
Chunk(IpcSharedMemory),
|
||||||
/// The body is done.
|
/// The body is done.
|
||||||
Done,
|
Done,
|
||||||
/// There was an error streaming the body,
|
/// There was an error streaming the body,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue