net: Use RequestId to cancel fetches instead of creating an IPC channel (#34883)

Instead of creating an IPC channel for every fetch, allow cancelling
fetches based on the `RequestId` of the original request. This requires
that `RequestId`s be UUIDs so that they are unique between processes
that might communicating with the resource process.

In addition, the resource process loop now keeps a `HashMap` or `Weak`
handles to cancellers and cleans them up.

This allows for creating mutiple `FetchCanceller`s in `script` for a
single fetch request, allowing integration of the media and video
elements to integrate with the `Document` canceller list -- meaning
these fetches also get cancelled when the `Document` unloads.

Signed-off-by: Martin Robinson <mrobinson@igalia.com>
This commit is contained in:
Martin Robinson 2025-01-11 12:49:22 +01:00 committed by GitHub
parent e2be55b873
commit 748954d610
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 179 additions and 226 deletions

View file

@ -3,7 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::borrow::Cow;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{io, mem, str};
@ -15,7 +15,7 @@ use devtools_traits::DevtoolsControlMsg;
use headers::{AccessControlExposeHeaders, ContentType, HeaderMapExt};
use http::header::{self, HeaderMap, HeaderName};
use http::{Method, StatusCode};
use ipc_channel::ipc::{self, IpcReceiver};
use ipc_channel::ipc;
use log::warn;
use mime::{self, Mime};
use net_traits::filemanager_thread::{FileTokenCheck, RelativePos};
@ -59,37 +59,23 @@ pub struct FetchContext {
pub devtools_chan: Option<Arc<Mutex<Sender<DevtoolsControlMsg>>>>,
pub filemanager: Arc<Mutex<FileManager>>,
pub file_token: FileTokenCheck,
pub cancellation_listener: Arc<Mutex<CancellationListener>>,
pub cancellation_listener: Arc<CancellationListener>,
pub timing: ServoArc<Mutex<ResourceFetchTiming>>,
pub protocols: Arc<ProtocolRegistry>,
}
#[derive(Default)]
pub struct CancellationListener {
cancel_chan: Option<IpcReceiver<()>>,
cancelled: bool,
cancelled: AtomicBool,
}
impl CancellationListener {
pub fn new(cancel_chan: Option<IpcReceiver<()>>) -> Self {
Self {
cancel_chan,
cancelled: false,
}
pub(crate) fn cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
pub fn cancelled(&mut self) -> bool {
if let Some(ref cancel_chan) = self.cancel_chan {
if self.cancelled {
true
} else if cancel_chan.try_recv().is_ok() {
self.cancelled = true;
true
} else {
false
}
} else {
false
}
pub(crate) fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed)
}
}
pub type DoneChannel = Option<(TokioSender<Data>, TokioReceiver<Data>)>;

View file

@ -128,7 +128,7 @@ impl FileManager {
pub fn fetch_file(
&self,
done_sender: &mut TokioSender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
cancellation_listener: Arc<CancellationListener>,
id: Uuid,
file_token: &FileTokenCheck,
origin: FileOrigin,
@ -211,7 +211,7 @@ impl FileManager {
done_sender: &mut TokioSender<Data>,
mut reader: BufReader<File>,
res_body: ServoArc<Mutex<ResponseBody>>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
cancellation_listener: Arc<CancellationListener>,
range: RelativePos,
) {
let done_sender = done_sender.clone();
@ -220,7 +220,7 @@ impl FileManager {
.map(|pool| {
pool.spawn(move || {
loop {
if cancellation_listener.lock().unwrap().cancelled() {
if cancellation_listener.cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
@ -282,7 +282,7 @@ impl FileManager {
fn fetch_blob_buf(
&self,
done_sender: &mut TokioSender<Data>,
cancellation_listener: Arc<Mutex<CancellationListener>>,
cancellation_listener: Arc<CancellationListener>,
id: &Uuid,
file_token: &FileTokenCheck,
origin_in: &FileOrigin,

View file

@ -1944,7 +1944,7 @@ async fn http_network_fetch(
let meta_status = meta.status;
let meta_headers = meta.headers;
let cancellation_listener = context.cancellation_listener.clone();
if cancellation_listener.lock().unwrap().cancelled() {
if cancellation_listener.cancelled() {
return Response::network_error(NetworkError::Internal("Fetch aborted".into()));
}
@ -1983,7 +1983,7 @@ async fn http_network_fetch(
warn!("Error streaming response body: {:?}", e);
})
.try_fold(res_body, move |res_body, chunk| {
if cancellation_listener.lock().unwrap().cancelled() {
if cancellation_listener.cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return future::ready(Err(()));

View file

@ -10,7 +10,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::thread;
use std::time::Duration;
@ -24,7 +24,7 @@ use log::{debug, warn};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use net_traits::blob_url_store::parse_blob_url;
use net_traits::filemanager_thread::FileTokenCheck;
use net_traits::request::{Destination, RequestBuilder};
use net_traits::request::{Destination, RequestBuilder, RequestId};
use net_traits::response::{Response, ResponseInit};
use net_traits::storage_thread::StorageThreadMsg;
use net_traits::{
@ -142,6 +142,7 @@ pub fn new_core_resource_thread(
config_dir,
ca_certificates,
ignore_certificate_errors,
cancellation_listeners: Default::default(),
};
mem_profiler_chan.run_with_memory_reporting(
@ -168,6 +169,7 @@ struct ResourceChannelManager {
config_dir: Option<PathBuf>,
ca_certificates: CACertificates,
ignore_certificate_errors: bool,
cancellation_listeners: HashMap<RequestId, Weak<CancellationListener>>,
}
fn create_http_states(
@ -300,6 +302,30 @@ impl ResourceChannelManager {
msg.send(vec![public_report, private_report]);
}
fn cancellation_listener(&self, request_id: RequestId) -> Option<Arc<CancellationListener>> {
self.cancellation_listeners
.get(&request_id)
.and_then(Weak::upgrade)
}
fn get_or_create_cancellation_listener(
&mut self,
request_id: RequestId,
) -> Arc<CancellationListener> {
if let Some(listener) = self.cancellation_listener(request_id) {
return listener;
}
// Clear away any cancellation listeners that are no longer valid.
self.cancellation_listeners
.retain(|_, listener| listener.strong_count() > 0);
let cancellation_listener = Arc::new(Default::default());
self.cancellation_listeners
.insert(request_id, Arc::downgrade(&cancellation_listener));
cancellation_listener
}
/// Returns false if the thread should exit.
fn process_msg(
&mut self,
@ -308,33 +334,45 @@ impl ResourceChannelManager {
protocols: Arc<ProtocolRegistry>,
) -> bool {
match msg {
CoreResourceMsg::Fetch(req_init, channels) => match channels {
FetchChannels::ResponseMsg(sender, cancel_chan) => self.resource_manager.fetch(
req_init,
None,
sender,
http_state,
cancel_chan,
protocols,
),
CoreResourceMsg::Fetch(request_builder, channels) => match channels {
FetchChannels::ResponseMsg(sender) => {
let cancellation_listener =
self.get_or_create_cancellation_listener(request_builder.id);
self.resource_manager.fetch(
request_builder,
None,
sender,
http_state,
cancellation_listener,
protocols,
);
},
FetchChannels::WebSocket {
event_sender,
action_receiver,
} => self.resource_manager.websocket_connect(
req_init,
request_builder,
event_sender,
action_receiver,
http_state,
),
FetchChannels::Prefetch => self.resource_manager.fetch(
req_init,
request_builder,
None,
DiscardFetch,
http_state,
None,
Arc::new(Default::default()),
protocols,
),
},
CoreResourceMsg::Cancel(request_ids) => {
for cancellation_listener in request_ids
.into_iter()
.filter_map(|request_id| self.cancellation_listener(request_id))
{
cancellation_listener.cancel();
}
},
CoreResourceMsg::DeleteCookies(request) => {
http_state
.cookie_jar
@ -343,13 +381,15 @@ impl ResourceChannelManager {
.clear_storage(&request);
return true;
},
CoreResourceMsg::FetchRedirect(req_init, res_init, sender, cancel_chan) => {
CoreResourceMsg::FetchRedirect(request_builder, res_init, sender) => {
let cancellation_listener =
self.get_or_create_cancellation_listener(request_builder.id);
self.resource_manager.fetch(
req_init,
request_builder,
Some(res_init),
sender,
http_state,
cancel_chan,
cancellation_listener,
protocols,
)
},
@ -698,7 +738,7 @@ impl CoreResourceManager {
res_init_: Option<ResponseInit>,
mut sender: Target,
http_state: &Arc<HttpState>,
cancel_chan: Option<IpcReceiver<()>>,
cancellation_listener: Arc<CancellationListener>,
protocols: Arc<ProtocolRegistry>,
) {
let http_state = http_state.clone();
@ -746,7 +786,7 @@ impl CoreResourceManager {
devtools_chan: dc.map(|dc| Arc::new(Mutex::new(dc))),
filemanager: Arc::new(Mutex::new(filemanager)),
file_token,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(cancel_chan))),
cancellation_listener,
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(request.timing_type()))),
protocols,
};

View file

@ -26,7 +26,7 @@ use hyper::body::{Bytes, Incoming};
use hyper::{Request as HyperRequest, Response as HyperResponse};
use mime::{self, Mime};
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::{self, CancellationListener, FetchContext};
use net::fetch::methods::{self, FetchContext};
use net::filemanager_thread::FileManager;
use net::hsts::HstsEntry;
use net::protocols::ProtocolRegistry;
@ -702,7 +702,7 @@ fn test_fetch_with_hsts() {
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
cancellation_listener: Arc::new(Default::default()),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
ResourceTimingType::Navigation,
))),
@ -759,7 +759,7 @@ fn test_load_adds_host_to_hsts_list_when_url_is_https() {
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
cancellation_listener: Arc::new(Default::default()),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
ResourceTimingType::Navigation,
))),
@ -818,7 +818,7 @@ fn test_fetch_self_signed() {
Weak::new(),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
cancellation_listener: Arc::new(Default::default()),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
ResourceTimingType::Navigation,
))),

View file

@ -39,7 +39,7 @@ use hyper::{Request as HyperRequest, Response as HyperResponse};
use hyper_util::rt::tokio::TokioIo;
use net::connector::{create_http_client, create_tls_config};
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::{self, CancellationListener, FetchContext};
use net::fetch::methods::{self, FetchContext};
use net::filemanager_thread::FileManager;
use net::protocols::ProtocolRegistry;
use net::resource_thread::CoreResourceThreadPool;
@ -183,7 +183,7 @@ fn new_fetch_context(
pool_handle.unwrap_or_else(|| Weak::new()),
))),
file_token: FileTokenCheck::NotRequired,
cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))),
cancellation_listener: Arc::new(Default::default()),
timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new(
ResourceTimingType::Navigation,
))),