net: Remove CoreResourceThread from FetchThread state (#38422)

In single process mode, there is a race condition on the initialization
of the global fetch thread: once initialized the global fetch thread
will always use a given core resource thread, and this will be
determined by the component who first initializes it. For example, if
the canvas paint thread first does an async fetch, then this will set
the public core resource as used for all future fetches, including those
coming from a pipeline in private mode.

In multi-process mode, there is a race condition per window event-loop:
the first pipeline to use the fetch will set the core resource thread
for all others.

To ensure the fetch thread uses the correct core resource thread(private
vs public), we need to
pass the core resource thread to each fetch thread operation for which
is it needed.

Testing: It should not break existing fetch WPT tests. The race
condition is not something that can be tested reliably, but it seems to
be based on solid logic.
Fixes: follow-up from
https://github.com/servo/servo/pull/38421/files#r2248950924

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian 2025-08-14 01:40:10 +08:00 committed by GitHub
parent 5ff084a688
commit 70be996a29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 87 additions and 50 deletions

View file

@ -733,7 +733,7 @@ where
// Start a fetch thread.
// In single-process mode this will be the global fetch thread;
// in multi-process mode this will be used only by the canvas paint thread.
let join_handle = start_fetch_thread(&self.public_resource_threads.core_thread);
let join_handle = start_fetch_thread();
while !self.shutting_down || !self.pipelines.is_empty() {
// Randomly close a pipeline if --random-pipeline-closure-probability is set

View file

@ -23,7 +23,7 @@ mod font_context {
SystemFontServiceProxySender, fallback_font_families,
};
use ipc_channel::ipc::{self, IpcReceiver};
use net_traits::{ResourceThreads, exit_fetch_thread, start_fetch_thread};
use net_traits::{ResourceThreads, start_fetch_thread};
use parking_lot::Mutex;
use servo_arc::Arc as ServoArc;
use style::ArcSlice;
@ -56,7 +56,7 @@ mod font_context {
let proxy_clone = Arc::new(system_font_service_proxy.to_sender().to_proxy());
INIT.call_once(|| {
start_fetch_thread(&mock_resource_threads.core_thread);
start_fetch_thread();
});
Self {
context: FontContext::new(proxy_clone, mock_compositor_api, mock_resource_threads),

View file

@ -137,7 +137,10 @@ impl DocumentLoader {
request: RequestBuilder,
callback: BoxedFetchCallback,
) {
self.cancellers.push(FetchCanceller::new(request.id));
self.cancellers.push(FetchCanceller::new(
request.id,
self.resource_threads.core_thread.clone(),
));
fetch_async(&self.resource_threads.core_thread, request, None, callback);
}

View file

@ -648,7 +648,10 @@ impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
listener.notify_fetch(message.unwrap());
}),
);
ev.droppable.set_canceller(FetchCanceller::new(request.id));
ev.droppable.set_canceller(FetchCanceller::new(
request.id,
global.core_resource_thread(),
));
global
.core_resource_thread()
.send(CoreResourceMsg::Fetch(

View file

@ -24,8 +24,8 @@ use layout_api::MediaFrame;
use media::{GLPlayerMsg, GLPlayerMsgForward, WindowGLContext};
use net_traits::request::{Destination, RequestId};
use net_traits::{
FetchMetadata, FetchResponseListener, FilteredMetadata, Metadata, NetworkError,
ResourceFetchTiming, ResourceTimingType,
CoreResourceThread, FetchMetadata, FetchResponseListener, FilteredMetadata, Metadata,
NetworkError, ResourceFetchTiming, ResourceTimingType,
};
use pixels::RasterImage;
use script_bindings::codegen::GenericBindings::TimeRangesBinding::TimeRangesMethods;
@ -945,7 +945,10 @@ impl HTMLMediaElement {
current_fetch_context.cancel(CancelReason::Overridden);
}
*current_fetch_context = Some(HTMLMediaElementFetchContext::new(request.id));
*current_fetch_context = Some(HTMLMediaElementFetchContext::new(
request.id,
global.core_resource_thread(),
));
let listener =
HTMLMediaElementFetchListener::new(self, request.id, url.clone(), offset.unwrap_or(0));
@ -2848,14 +2851,17 @@ pub(crate) struct HTMLMediaElementFetchContext {
}
impl HTMLMediaElementFetchContext {
fn new(request_id: RequestId) -> HTMLMediaElementFetchContext {
fn new(
request_id: RequestId,
core_resource_thread: CoreResourceThread,
) -> HTMLMediaElementFetchContext {
HTMLMediaElementFetchContext {
request_id,
cancel_reason: None,
is_seekable: false,
origin_clean: true,
data_source: DomRefCell::new(BufferedDataSource::new()),
fetch_canceller: FetchCanceller::new(request_id),
fetch_canceller: FetchCanceller::new(request_id, core_resource_thread.clone()),
}
}

View file

@ -16,8 +16,8 @@ use net_traits::image_cache::{
};
use net_traits::request::{CredentialsMode, Destination, RequestBuilder, RequestId};
use net_traits::{
FetchMetadata, FetchResponseListener, FetchResponseMsg, NetworkError, ResourceFetchTiming,
ResourceTimingType,
CoreResourceThread, FetchMetadata, FetchResponseListener, FetchResponseMsg, NetworkError,
ResourceFetchTiming, ResourceTimingType,
};
use pixels::{Snapshot, SnapshotAlphaMode, SnapshotPixelFormat};
use servo_media::player::video::VideoFrame;
@ -271,7 +271,13 @@ impl HTMLVideoElement {
LoadType::Image(poster_url.clone()),
));
let context = PosterFrameFetchContext::new(self, poster_url, id, request.id);
let context = PosterFrameFetchContext::new(
self,
poster_url,
id,
request.id,
self.global().core_resource_thread(),
);
self.owner_document().fetch_background(request, context);
}
@ -498,6 +504,7 @@ impl PosterFrameFetchContext {
url: ServoUrl,
id: PendingImageId,
request_id: RequestId,
core_resource_thread: CoreResourceThread,
) -> PosterFrameFetchContext {
let window = elem.owner_window();
PosterFrameFetchContext {
@ -507,7 +514,7 @@ impl PosterFrameFetchContext {
cancelled: false,
resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
url,
fetch_canceller: FetchCanceller::new(request_id),
fetch_canceller: FetchCanceller::new(request_id, core_resource_thread),
}
}
}

View file

@ -1582,7 +1582,8 @@ impl XMLHttpRequest {
)
};
*self.canceller.borrow_mut() = FetchCanceller::new(request_builder.id);
*self.canceller.borrow_mut() =
FetchCanceller::new(request_builder.id, global.core_resource_thread());
global.fetch(request_builder, context.clone(), task_source);
if let Some(script_port) = script_port {

View file

@ -48,21 +48,27 @@ struct FetchContext {
resource_timing: ResourceFetchTiming,
}
/// RAII fetch canceller object. By default initialized to not having a canceller
/// in it, however you can ask it for a cancellation receiver to send to Fetch
/// in which case it will store the sender. You can manually cancel it
/// or let it cancel on Drop in that case.
/// RAII fetch canceller object.
/// By default initialized to having a
/// request associated with it, which can be manually cancelled with `cancel`,
/// or automatically cancelled on drop.
/// Calling `ignore` will sever the relationship with the request,
/// meaning it cannot be cancelled through this canceller from that point on.
#[derive(Default, JSTraceable, MallocSizeOf)]
pub(crate) struct FetchCanceller {
#[no_trace]
request_id: Option<RequestId>,
#[no_trace]
core_resource_thread: Option<CoreResourceThread>,
}
impl FetchCanceller {
/// Create an empty FetchCanceller
pub(crate) fn new(request_id: RequestId) -> Self {
/// Create a FetchCanceller associated with a request,
// and a particular(public vs private) resource thread.
pub(crate) fn new(request_id: RequestId, core_resource_thread: CoreResourceThread) -> Self {
Self {
request_id: Some(request_id),
core_resource_thread: Some(core_resource_thread),
}
}
@ -72,9 +78,11 @@ impl FetchCanceller {
// stop trying to make fetch happen
// it's not going to happen
// No error handling here. Cancellation is a courtesy call,
// we don't actually care if the other side heard.
cancel_async_fetch(vec![request_id]);
if let Some(ref core_resource_thread) = self.core_resource_thread {
// No error handling here. Cancellation is a courtesy call,
// we don't actually care if the other side heard.
cancel_async_fetch(vec![request_id], core_resource_thread);
}
}
}

View file

@ -3638,7 +3638,10 @@ impl ScriptThread {
.push((incomplete.pipeline_id, context));
let request_builder = incomplete.request_builder();
incomplete.canceller = FetchCanceller::new(request_builder.id);
incomplete.canceller = FetchCanceller::new(
request_builder.id,
self.resource_threads.core_thread.clone(),
);
NavigationListener::new(request_builder, self.senders.self_sender.clone())
.initiate_fetch(&self.resource_threads.core_thread, None);
self.incomplete_loads.borrow_mut().push(incomplete);
@ -3771,7 +3774,10 @@ impl ScriptThread {
.unwrap_or(200),
});
incomplete_load.canceller = FetchCanceller::new(request_builder.id);
incomplete_load.canceller = FetchCanceller::new(
request_builder.id,
self.resource_threads.core_thread.clone(),
);
NavigationListener::new(request_builder, self.senders.self_sender.clone())
.initiate_fetch(&self.resource_threads.core_thread, response_init);
}

View file

@ -1259,7 +1259,7 @@ pub fn run_content_process(token: String) {
media_platform::init();
// Start the fetch thread for this content process.
let fetch_thread_join_handle = start_fetch_thread(content.core_resource_thread());
let fetch_thread_join_handle = start_fetch_thread();
set_logger(content.script_to_constellation_chan().clone());

View file

@ -556,11 +556,12 @@ pub enum CoreResourceMsg {
// FIXME: https://github.com/servo/servo/issues/34591
#[expect(clippy::large_enum_variant)]
enum ToFetchThreadMessage {
Cancel(Vec<RequestId>),
Cancel(Vec<RequestId>, CoreResourceThread),
StartFetch(
/* request_builder */ RequestBuilder,
/* response_init */ Option<ResponseInit>,
/* callback */ BoxedFetchCallback,
/* core resource thread channel */ CoreResourceThread,
),
FetchResponse(FetchResponseMsg),
/// Stop the background thread.
@ -576,8 +577,6 @@ struct FetchThread {
/// A list of active fetches. A fetch is no longer active once the
/// [`FetchResponseMsg::ProcessResponseEOF`] is received.
active_fetches: HashMap<RequestId, BoxedFetchCallback>,
/// A reference to the [`CoreResourceThread`] used to kick off fetch requests.
core_resource_thread: CoreResourceThread,
/// A crossbeam receiver attached to the router proxy which converts incoming fetch
/// updates from IPC messages to crossbeam messages as well as another sender which
/// handles requests from clients wanting to do fetches.
@ -588,9 +587,7 @@ struct FetchThread {
}
impl FetchThread {
fn spawn(
core_resource_thread: &CoreResourceThread,
) -> (Sender<ToFetchThreadMessage>, JoinHandle<()>) {
fn spawn() -> (Sender<ToFetchThreadMessage>, JoinHandle<()>) {
let (sender, receiver) = unbounded();
let (to_fetch_sender, from_fetch_sender) = ipc::channel().unwrap();
@ -602,14 +599,11 @@ impl FetchThread {
let _ = sender_clone.send(ToFetchThreadMessage::FetchResponse(message));
}),
);
let core_resource_thread = core_resource_thread.clone();
let join_handle = thread::Builder::new()
.name("FetchThread".to_owned())
.spawn(move || {
let mut fetch_thread = FetchThread {
active_fetches: HashMap::new(),
core_resource_thread,
receiver,
to_fetch_sender,
};
@ -622,8 +616,13 @@ impl FetchThread {
fn run(&mut self) {
loop {
match self.receiver.recv().unwrap() {
ToFetchThreadMessage::StartFetch(request_builder, response_init, callback) => {
self.active_fetches.insert(request_builder.id, callback);
ToFetchThreadMessage::StartFetch(
request_builder,
response_init,
callback,
core_resource_thread,
) => {
let request_builder_id = request_builder.id;
// Only redirects have a `response_init` field.
let message = match response_init {
@ -638,7 +637,9 @@ impl FetchThread {
),
};
self.core_resource_thread.send(message).unwrap();
core_resource_thread.send(message).unwrap();
self.active_fetches.insert(request_builder_id, callback);
},
ToFetchThreadMessage::FetchResponse(fetch_response_msg) => {
let request_id = fetch_response_msg.request_id();
@ -655,13 +656,11 @@ impl FetchThread {
self.active_fetches.remove(&request_id);
}
},
ToFetchThreadMessage::Cancel(request_ids) => {
ToFetchThreadMessage::Cancel(request_ids, core_resource_thread) => {
// Errors are ignored here, because Servo sends many cancellation requests when shutting down.
// At this point the networking task might be shut down completely, so just ignore errors
// during this time.
let _ = self
.core_resource_thread
.send(CoreResourceMsg::Cancel(request_ids));
let _ = core_resource_thread.send(CoreResourceMsg::Cancel(request_ids));
},
ToFetchThreadMessage::Exit => break,
}
@ -671,10 +670,10 @@ impl FetchThread {
static FETCH_THREAD: OnceLock<Sender<ToFetchThreadMessage>> = OnceLock::new();
/// Starts a fetch thread,
/// and returns the join handle to it.
pub fn start_fetch_thread(core_resource_thread: &CoreResourceThread) -> JoinHandle<()> {
let (sender, join_handle) = FetchThread::spawn(core_resource_thread);
/// Start the fetch thread,
/// and returns the join handle to the background thread.
pub fn start_fetch_thread() -> JoinHandle<()> {
let (sender, join_handle) = FetchThread::spawn();
FETCH_THREAD
.set(sender)
.expect("Fetch thread should be set only once on start-up");
@ -694,7 +693,7 @@ pub fn exit_fetch_thread() {
/// Instruct the resource thread to make a new fetch request.
pub fn fetch_async(
_core_resource_thread: &CoreResourceThread,
core_resource_thread: &CoreResourceThread,
request: RequestBuilder,
response_init: Option<ResponseInit>,
callback: BoxedFetchCallback,
@ -706,16 +705,20 @@ pub fn fetch_async(
request,
response_init,
callback,
core_resource_thread.clone(),
));
}
/// Instruct the resource thread to cancel an existing request. Does nothing if the
/// request has already completed or has not been fetched yet.
pub fn cancel_async_fetch(request_ids: Vec<RequestId>) {
pub fn cancel_async_fetch(request_ids: Vec<RequestId>, core_resource_thread: &CoreResourceThread) {
let _ = FETCH_THREAD
.get()
.expect("Fetch thread should always be initialized on start-up")
.send(ToFetchThreadMessage::Cancel(request_ids));
.send(ToFetchThreadMessage::Cancel(
request_ids,
core_resource_thread.clone(),
));
}
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]