mirror of
https://github.com/servo/servo.git
synced 2025-06-06 16:45:39 +00:00
net: Start reducing number of IPCs channels used for fetch with a FetchThread
(#33863)
Instead of creating a `ROUTER` for each fetch, create a fetch thread which handles all incoming and outcoming fetch requests. Now messages involving fetches carry a "request id" which indicates which fetch is being addressed by the message. This greatly reduces the number of file descriptors used by fetch. In addition, the interface for kicking off fetches is simplified when using the `Listener` with `Document`s and the `GlobalScope`. This does not fix all leaked file descriptors / mach ports, but greatly eliminates the number used. Now tests can be run without limiting procesess on modern macOS systems. Followup work: 1. There are more instances where fetch is done using the old method. Some of these require more changes in order to be converted to the `FetchThread` approach. 2. Eliminate usage of IPC channels when doing redirects. 3. Also eliminate the IPC channel used for cancel handling. 4. This change opens up the possiblity of controlling the priority of fetch requests. Fixes #29834. Signed-off-by: Martin Robinson <mrobinson@igalia.com>
This commit is contained in:
parent
2115267328
commit
036e74524a
31 changed files with 761 additions and 766 deletions
|
@ -4,12 +4,15 @@
|
|||
|
||||
#![deny(unsafe_code)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::{LazyLock, OnceLock};
|
||||
use std::thread;
|
||||
|
||||
use base::cross_process_instant::CrossProcessInstant;
|
||||
use base::id::HistoryStateId;
|
||||
use cookie::Cookie;
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use headers::{ContentType, HeaderMapExt, ReferrerPolicy as ReferrerPolicyHeader};
|
||||
use http::{Error as HttpError, HeaderMap, StatusCode};
|
||||
use hyper::Error as HyperError;
|
||||
|
@ -20,6 +23,7 @@ use ipc_channel::Error as IpcError;
|
|||
use malloc_size_of::malloc_size_of_is_0;
|
||||
use malloc_size_of_derive::MallocSizeOf;
|
||||
use mime::Mime;
|
||||
use request::RequestId;
|
||||
use rustls::Certificate;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servo_rand::RngCore;
|
||||
|
@ -177,12 +181,24 @@ impl From<ReferrerPolicy> for ReferrerPolicyHeader {
|
|||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub enum FetchResponseMsg {
|
||||
// todo: should have fields for transmitted/total bytes
|
||||
ProcessRequestBody,
|
||||
ProcessRequestEOF,
|
||||
ProcessRequestBody(RequestId),
|
||||
ProcessRequestEOF(RequestId),
|
||||
// todo: send more info about the response (or perhaps the entire Response)
|
||||
ProcessResponse(Result<FetchMetadata, NetworkError>),
|
||||
ProcessResponseChunk(Vec<u8>),
|
||||
ProcessResponseEOF(Result<ResourceFetchTiming, NetworkError>),
|
||||
ProcessResponse(RequestId, Result<FetchMetadata, NetworkError>),
|
||||
ProcessResponseChunk(RequestId, Vec<u8>),
|
||||
ProcessResponseEOF(RequestId, Result<ResourceFetchTiming, NetworkError>),
|
||||
}
|
||||
|
||||
impl FetchResponseMsg {
|
||||
fn request_id(&self) -> RequestId {
|
||||
match self {
|
||||
FetchResponseMsg::ProcessRequestBody(id) |
|
||||
FetchResponseMsg::ProcessRequestEOF(id) |
|
||||
FetchResponseMsg::ProcessResponse(id, ..) |
|
||||
FetchResponseMsg::ProcessResponseChunk(id, ..) |
|
||||
FetchResponseMsg::ProcessResponseEOF(id, ..) => *id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait FetchTaskTarget {
|
||||
|
@ -199,15 +215,15 @@ pub trait FetchTaskTarget {
|
|||
/// <https://fetch.spec.whatwg.org/#process-response>
|
||||
///
|
||||
/// Fired when headers are received
|
||||
fn process_response(&mut self, response: &Response);
|
||||
fn process_response(&mut self, request: &Request, response: &Response);
|
||||
|
||||
/// Fired when a chunk of response content is received
|
||||
fn process_response_chunk(&mut self, chunk: Vec<u8>);
|
||||
fn process_response_chunk(&mut self, request: &Request, chunk: Vec<u8>);
|
||||
|
||||
/// <https://fetch.spec.whatwg.org/#process-response-end-of-file>
|
||||
///
|
||||
/// Fired when the response is fully fetched
|
||||
fn process_response_eof(&mut self, response: &Response);
|
||||
fn process_response_eof(&mut self, request: &Request, response: &Response);
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
|
@ -228,43 +244,52 @@ pub enum FetchMetadata {
|
|||
}
|
||||
|
||||
pub trait FetchResponseListener {
|
||||
fn process_request_body(&mut self);
|
||||
fn process_request_eof(&mut self);
|
||||
fn process_response(&mut self, metadata: Result<FetchMetadata, NetworkError>);
|
||||
fn process_response_chunk(&mut self, chunk: Vec<u8>);
|
||||
fn process_response_eof(&mut self, response: Result<ResourceFetchTiming, NetworkError>);
|
||||
fn process_request_body(&mut self, request_id: RequestId);
|
||||
fn process_request_eof(&mut self, request_id: RequestId);
|
||||
fn process_response(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
metadata: Result<FetchMetadata, NetworkError>,
|
||||
);
|
||||
fn process_response_chunk(&mut self, request_id: RequestId, chunk: Vec<u8>);
|
||||
fn process_response_eof(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
response: Result<ResourceFetchTiming, NetworkError>,
|
||||
);
|
||||
fn resource_timing(&self) -> &ResourceFetchTiming;
|
||||
fn resource_timing_mut(&mut self) -> &mut ResourceFetchTiming;
|
||||
fn submit_resource_timing(&mut self);
|
||||
}
|
||||
|
||||
impl FetchTaskTarget for IpcSender<FetchResponseMsg> {
|
||||
fn process_request_body(&mut self, _: &Request) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessRequestBody);
|
||||
fn process_request_body(&mut self, request: &Request) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessRequestBody(request.id));
|
||||
}
|
||||
|
||||
fn process_request_eof(&mut self, _: &Request) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessRequestEOF);
|
||||
fn process_request_eof(&mut self, request: &Request) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessRequestEOF(request.id));
|
||||
}
|
||||
|
||||
fn process_response(&mut self, response: &Response) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponse(response.metadata()));
|
||||
fn process_response(&mut self, request: &Request, response: &Response) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponse(
|
||||
request.id,
|
||||
response.metadata(),
|
||||
));
|
||||
}
|
||||
|
||||
fn process_response_chunk(&mut self, chunk: Vec<u8>) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponseChunk(chunk));
|
||||
fn process_response_chunk(&mut self, request: &Request, chunk: Vec<u8>) {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponseChunk(request.id, chunk));
|
||||
}
|
||||
|
||||
fn process_response_eof(&mut self, response: &Response) {
|
||||
if let Some(e) = response.get_network_error() {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Err(e.clone())));
|
||||
fn process_response_eof(&mut self, request: &Request, response: &Response) {
|
||||
let payload = if let Some(network_error) = response.get_network_error() {
|
||||
Err(network_error.clone())
|
||||
} else {
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(Ok(response
|
||||
.get_resource_timing()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.clone())));
|
||||
}
|
||||
Ok(response.get_resource_timing().lock().unwrap().clone())
|
||||
};
|
||||
|
||||
let _ = self.send(FetchResponseMsg::ProcessResponseEOF(request.id, payload));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,14 +300,10 @@ pub struct DiscardFetch;
|
|||
|
||||
impl FetchTaskTarget for DiscardFetch {
|
||||
fn process_request_body(&mut self, _: &Request) {}
|
||||
|
||||
fn process_request_eof(&mut self, _: &Request) {}
|
||||
|
||||
fn process_response(&mut self, _: &Response) {}
|
||||
|
||||
fn process_response_chunk(&mut self, _: Vec<u8>) {}
|
||||
|
||||
fn process_response_eof(&mut self, _: &Response) {}
|
||||
fn process_response(&mut self, _: &Request, _: &Response) {}
|
||||
fn process_response_chunk(&mut self, _: &Request, _: Vec<u8>) {}
|
||||
fn process_response_eof(&mut self, _: &Request, _: &Response) {}
|
||||
}
|
||||
|
||||
pub trait Action<Listener> {
|
||||
|
@ -293,16 +314,25 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg {
|
|||
/// Execute the default action on a provided listener.
|
||||
fn process(self, listener: &mut T) {
|
||||
match self {
|
||||
FetchResponseMsg::ProcessRequestBody => listener.process_request_body(),
|
||||
FetchResponseMsg::ProcessRequestEOF => listener.process_request_eof(),
|
||||
FetchResponseMsg::ProcessResponse(meta) => listener.process_response(meta),
|
||||
FetchResponseMsg::ProcessResponseChunk(data) => listener.process_response_chunk(data),
|
||||
FetchResponseMsg::ProcessResponseEOF(data) => {
|
||||
FetchResponseMsg::ProcessRequestBody(request_id) => {
|
||||
listener.process_request_body(request_id)
|
||||
},
|
||||
FetchResponseMsg::ProcessRequestEOF(request_id) => {
|
||||
listener.process_request_eof(request_id)
|
||||
},
|
||||
FetchResponseMsg::ProcessResponse(request_id, meta) => {
|
||||
listener.process_response(request_id, meta)
|
||||
},
|
||||
FetchResponseMsg::ProcessResponseChunk(request_id, data) => {
|
||||
listener.process_response_chunk(request_id, data)
|
||||
},
|
||||
FetchResponseMsg::ProcessResponseEOF(request_id, data) => {
|
||||
match data {
|
||||
Ok(ref response_resource_timing) => {
|
||||
// update listener with values from response
|
||||
*listener.resource_timing_mut() = response_resource_timing.clone();
|
||||
listener.process_response_eof(Ok(response_resource_timing.clone()));
|
||||
listener
|
||||
.process_response_eof(request_id, Ok(response_resource_timing.clone()));
|
||||
// TODO timing check https://w3c.github.io/resource-timing/#dfn-timing-allow-check
|
||||
|
||||
listener.submit_resource_timing();
|
||||
|
@ -311,7 +341,7 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg {
|
|||
// (e.g. due to a network error) MAY be included as PerformanceResourceTiming
|
||||
// objects in the Performance Timeline and MUST contain initialized attribute
|
||||
// values for processed substeps of the processing model.
|
||||
Err(e) => listener.process_response_eof(Err(e)),
|
||||
Err(e) => listener.process_response_eof(request_id, Err(e)),
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -467,22 +497,110 @@ pub enum CoreResourceMsg {
|
|||
Exit(IpcSender<()>),
|
||||
}
|
||||
|
||||
enum ToFetchThreadMessage {
|
||||
StartFetch(
|
||||
/* request_builder */ RequestBuilder,
|
||||
/* cancel_chan */ Option<IpcReceiver<()>>,
|
||||
/* callback */ BoxedFetchCallback,
|
||||
),
|
||||
FetchResponse(FetchResponseMsg),
|
||||
}
|
||||
|
||||
pub type BoxedFetchCallback = Box<dyn Fn(FetchResponseMsg) + Send + 'static>;
|
||||
|
||||
/// A thread to handle fetches in a Servo process. This thread is responsible for
|
||||
/// listening for new fetch requests as well as updates on those operations and forwarding
|
||||
/// them to crossbeam channels.
|
||||
struct FetchThread {
|
||||
/// A list of active fetches. A fetch is no longer active once the
|
||||
/// [`FetchResponseMsg::ProcessResponseEOF`] is received.
|
||||
active_fetches: HashMap<RequestId, BoxedFetchCallback>,
|
||||
/// A reference to the [`CoreResourceThread`] used to kick off fetch requests.
|
||||
core_resource_thread: CoreResourceThread,
|
||||
/// A crossbeam receiver attached to the router proxy which converts incoming fetch
|
||||
/// updates from IPC messages to crossbeam messages as well as another sender which
|
||||
/// handles requests from clients wanting to do fetches.
|
||||
receiver: Receiver<ToFetchThreadMessage>,
|
||||
/// An [`IpcSender`] that's sent with every fetch request and leads back to our
|
||||
/// router proxy.
|
||||
to_fetch_sender: IpcSender<FetchResponseMsg>,
|
||||
}
|
||||
|
||||
impl FetchThread {
|
||||
fn spawn(core_resource_thread: &CoreResourceThread) -> Sender<ToFetchThreadMessage> {
|
||||
let (sender, receiver) = unbounded();
|
||||
let (to_fetch_sender, from_fetch_sender) = ipc::channel().unwrap();
|
||||
|
||||
let sender_clone = sender.clone();
|
||||
ROUTER.add_route(
|
||||
from_fetch_sender.to_opaque(),
|
||||
Box::new(move |message| {
|
||||
let message: FetchResponseMsg = message.to().unwrap();
|
||||
let _ = sender_clone.send(ToFetchThreadMessage::FetchResponse(message));
|
||||
}),
|
||||
);
|
||||
|
||||
let core_resource_thread = core_resource_thread.clone();
|
||||
thread::Builder::new()
|
||||
.name("FetchThread".to_owned())
|
||||
.spawn(move || {
|
||||
let mut fetch_thread = FetchThread {
|
||||
active_fetches: HashMap::new(),
|
||||
core_resource_thread,
|
||||
receiver,
|
||||
to_fetch_sender,
|
||||
};
|
||||
fetch_thread.run();
|
||||
})
|
||||
.expect("Thread spawning failed");
|
||||
sender
|
||||
}
|
||||
|
||||
fn run(&mut self) {
|
||||
loop {
|
||||
match self.receiver.recv().unwrap() {
|
||||
ToFetchThreadMessage::StartFetch(request_builder, canceller, callback) => {
|
||||
self.active_fetches.insert(request_builder.id, callback);
|
||||
self.core_resource_thread
|
||||
.send(CoreResourceMsg::Fetch(
|
||||
request_builder,
|
||||
FetchChannels::ResponseMsg(self.to_fetch_sender.clone(), canceller),
|
||||
))
|
||||
.unwrap();
|
||||
},
|
||||
ToFetchThreadMessage::FetchResponse(fetch_response_msg) => {
|
||||
let request_id = fetch_response_msg.request_id();
|
||||
let fetch_finished =
|
||||
matches!(fetch_response_msg, FetchResponseMsg::ProcessResponseEOF(..));
|
||||
|
||||
self.active_fetches
|
||||
.get(&request_id)
|
||||
.expect("Got fetch response for unknown fetch")(
|
||||
fetch_response_msg
|
||||
);
|
||||
|
||||
if fetch_finished {
|
||||
self.active_fetches.remove(&request_id);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Instruct the resource thread to make a new request.
|
||||
pub fn fetch_async<F>(request: RequestBuilder, core_resource_thread: &CoreResourceThread, f: F)
|
||||
where
|
||||
F: Fn(FetchResponseMsg) + Send + 'static,
|
||||
{
|
||||
let (action_sender, action_receiver) = ipc::channel().unwrap();
|
||||
ROUTER.add_route(
|
||||
action_receiver.to_opaque(),
|
||||
Box::new(move |message| f(message.to().unwrap())),
|
||||
);
|
||||
core_resource_thread
|
||||
.send(CoreResourceMsg::Fetch(
|
||||
request,
|
||||
FetchChannels::ResponseMsg(action_sender, None),
|
||||
))
|
||||
.unwrap();
|
||||
pub fn fetch_async(
|
||||
core_resource_thread: &CoreResourceThread,
|
||||
request: RequestBuilder,
|
||||
canceller: Option<IpcReceiver<()>>,
|
||||
callback: BoxedFetchCallback,
|
||||
) {
|
||||
static FETCH_THREAD: OnceLock<Sender<ToFetchThreadMessage>> = OnceLock::new();
|
||||
let _ = FETCH_THREAD
|
||||
.get_or_init(|| FetchThread::spawn(core_resource_thread))
|
||||
.send(ToFetchThreadMessage::StartFetch(
|
||||
request, canceller, callback,
|
||||
));
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue