net: clean shutdown of the async runtime (#38425)

The previous use of a static variable for the runtime prevented it from
shutting down cleanly, because shutdown requires dropping or taking
ownership of it. This PR switches the static variable to a handle only,
and introduces a new trait to pass a handle to the async runtime to the
constellation, where it can be shut-down along with other components and
help reduce our count of still running threads after shutdown.

Testing: manual testing, and covered by unit-test in net, and wpt tests.
Fixes: part of - https://github.com/servo/servo/issues/30849

---------

Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian 2025-08-05 05:42:47 +08:00 committed by GitHub
parent 7ad32f944f
commit 77ff351cde
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 132 additions and 49 deletions

View file

@ -149,7 +149,7 @@ use media::WindowGLContext;
use net_traits::pub_domains::reg_host;
use net_traits::request::Referrer;
use net_traits::storage_thread::{StorageThreadMsg, StorageType};
use net_traits::{self, IpcSend, ReferrerPolicy, ResourceThreads};
use net_traits::{self, AsyncRuntime, IpcSend, ReferrerPolicy, ResourceThreads};
use profile_traits::mem::ProfilerMsg;
use profile_traits::{mem, time};
use script_traits::{
@ -458,6 +458,9 @@ pub struct Constellation<STF, SWF> {
/// The process manager.
process_manager: ProcessManager,
/// The async runtime.
async_runtime: Box<dyn AsyncRuntime>,
}
/// State needed to construct a constellation.
@ -510,6 +513,9 @@ pub struct InitialConstellationState {
/// User content manager
pub user_content_manager: UserContentManager,
/// The async runtime.
pub async_runtime: Box<dyn AsyncRuntime>,
}
/// When we are running reftests, we save an image to compare against a reference.
@ -704,6 +710,7 @@ where
rippy_data,
user_content_manager: state.user_content_manager,
process_manager: ProcessManager::new(state.mem_profiler_chan),
async_runtime: state.async_runtime,
};
constellation.run();
@ -2651,6 +2658,9 @@ where
debug!("Shutting-down IPC router thread in constellation.");
ROUTER.shutdown();
debug!("Shutting-down the async runtime in constellation.");
self.async_runtime.shutdown();
}
fn handle_pipeline_exited(&mut self, pipeline_id: PipelineId) {

View file

@ -1,15 +1,45 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::cmp::Ord;
use std::sync::LazyLock;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use futures::Future;
use net_traits::AsyncRuntime;
use tokio::runtime::{Builder, Handle, Runtime};
pub static HANDLE: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
/// The actual runtime,
/// to be used as part of shut-down.
pub struct AsyncRuntimeHolder {
runtime: Option<Runtime>,
}
impl AsyncRuntimeHolder {
pub(crate) fn new(runtime: Runtime) -> Self {
Self {
runtime: Some(runtime),
}
}
}
impl AsyncRuntime for AsyncRuntimeHolder {
fn shutdown(&mut self) {
self.runtime
.take()
.expect("Runtime should have been initialized on start-up.")
.shutdown_timeout(Duration::from_millis(100))
}
}
/// A shared handle to the runtime,
/// to be initialized on start-up.
static ASYNC_RUNTIME_HANDLE: OnceLock<Handle> = OnceLock::new();
pub fn init_async_runtime() -> Box<dyn AsyncRuntime> {
// Initialize a tokio runtime.
let runtime = Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
@ -24,5 +54,36 @@ pub static HANDLE: LazyLock<Runtime> = LazyLock::new(|| {
.enable_io()
.enable_time()
.build()
.expect("Unable to build tokio-runtime runtime")
});
.expect("Unable to build tokio-runtime runtime");
// Make the runtime available to users inside this crate.
ASYNC_RUNTIME_HANDLE
.set(runtime.handle().clone())
.expect("Runtime handle should be initialized once on start-up");
// Return an async runtime for use in shutdown.
Box::new(AsyncRuntimeHolder::new(runtime))
}
/// Spawn a task using the handle to the runtime.
pub fn spawn_task<F>(task: F)
where
F: Future + 'static + std::marker::Send,
F::Output: Send + 'static,
{
ASYNC_RUNTIME_HANDLE
.get()
.expect("Runtime handle should be initialized on start-up")
.spawn(task);
}
/// Spawn a blocking task using the handle to the runtime.
pub fn spawn_blocking_task<F, R>(task: F) -> F::Output
where
F: Future,
{
ASYNC_RUNTIME_HANDLE
.get()
.expect("Runtime handle should be initialized on start-up")
.block_on(task)
}

View file

@ -21,7 +21,7 @@ use rustls::{ClientConfig, RootCertStore};
use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
use tower_service::Service;
use crate::async_runtime::HANDLE;
use crate::async_runtime::spawn_task;
use crate::hosts::replace_host;
pub const BUF_SIZE: usize = 32768;
@ -165,7 +165,7 @@ where
F: Future<Output = ()> + 'static + std::marker::Send,
{
fn execute(&self, fut: F) {
HANDLE.spawn(fut);
spawn_task(fut);
}
}

View file

@ -66,7 +66,7 @@ use tokio::sync::mpsc::{
};
use tokio_stream::wrappers::ReceiverStream;
use crate::async_runtime::HANDLE;
use crate::async_runtime::spawn_task;
use crate::connector::{CertificateErrorOverrideManager, Connector};
use crate::cookie::ServoCookie;
use crate::cookie_storage::CookieStorage;
@ -574,7 +574,7 @@ impl BodySink {
match self {
BodySink::Chunked(sender) => {
let sender = sender.clone();
HANDLE.spawn(async move {
spawn_task(async move {
let _ = sender
.send(Ok(Frame::data(Bytes::copy_from_slice(&bytes))))
.await;
@ -2090,7 +2090,7 @@ async fn http_network_fetch(
let headers = response.headers.clone();
let devtools_chan = context.devtools_chan.clone();
HANDLE.spawn(
spawn_task(
res.into_body()
.map_err(|e| {
warn!("Error streaming response body: {:?}", e);

View file

@ -29,9 +29,9 @@ use net_traits::request::{Destination, RequestBuilder, RequestId};
use net_traits::response::{Response, ResponseInit};
use net_traits::storage_thread::StorageThreadMsg;
use net_traits::{
CookieSource, CoreResourceMsg, CoreResourceThread, CustomResponseMediator, DiscardFetch,
FetchChannels, FetchTaskTarget, ResourceFetchTiming, ResourceThreads, ResourceTimingType,
WebSocketDomAction, WebSocketNetworkEvent,
AsyncRuntime, CookieSource, CoreResourceMsg, CoreResourceThread, CustomResponseMediator,
DiscardFetch, FetchChannels, FetchTaskTarget, ResourceFetchTiming, ResourceThreads,
ResourceTimingType, WebSocketDomAction, WebSocketNetworkEvent,
};
use profile_traits::mem::{
ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, ReportsChan,
@ -44,7 +44,7 @@ use serde::{Deserialize, Serialize};
use servo_arc::Arc as ServoArc;
use servo_url::{ImmutableOrigin, ServoUrl};
use crate::async_runtime::HANDLE;
use crate::async_runtime::{init_async_runtime, spawn_task};
use crate::connector::{
CACertificates, CertificateErrorOverrideManager, create_http_client, create_tls_config,
};
@ -84,7 +84,10 @@ pub fn new_resource_threads(
certificate_path: Option<String>,
ignore_certificate_errors: bool,
protocols: Arc<ProtocolRegistry>,
) -> (ResourceThreads, ResourceThreads) {
) -> (ResourceThreads, ResourceThreads, Box<dyn AsyncRuntime>) {
// Initialize the async runtime, and get a handle to it for use in clean shutdown.
let async_runtime = init_async_runtime();
let ca_certificates = match certificate_path {
Some(path) => match load_root_cert_store_from_file(path) {
Ok(root_cert_store) => CACertificates::Override(root_cert_store),
@ -112,6 +115,7 @@ pub fn new_resource_threads(
(
ResourceThreads::new(public_core, storage.clone(), idb.clone()),
ResourceThreads::new(private_core, storage, idb),
async_runtime,
)
}
@ -781,7 +785,7 @@ impl CoreResourceManager {
_ => (FileTokenCheck::NotRequired, None),
};
HANDLE.spawn(async move {
spawn_task(async move {
// XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed)
// todo load context / mimesniff in fetch
// todo referrer policy?

View file

@ -26,6 +26,7 @@ use http_body_util::combinators::BoxBody;
use hyper::body::{Bytes, Incoming};
use hyper::{Request as HyperRequest, Response as HyperResponse};
use mime::{self, Mime};
use net::async_runtime::spawn_blocking_task;
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::{self, FetchContext};
use net::filemanager_thread::FileManager;
@ -200,7 +201,7 @@ fn test_fetch_blob() {
expected: bytes.to_vec(),
};
crate::HANDLE.block_on(methods::fetch(request, &mut target, &context));
spawn_blocking_task::<_, Response>(methods::fetch(request, &mut target, &context));
let fetch_response = receiver.recv().unwrap();
assert!(!fetch_response.is_network_error());

View file

@ -49,7 +49,7 @@ use url::Url;
use crate::{
create_embedder_proxy_and_receiver, fetch, fetch_with_context, make_body, make_server,
new_fetch_context, receive_credential_prompt_msgs,
new_fetch_context, receive_credential_prompt_msgs, spawn_blocking_task,
};
fn mock_origin() -> ImmutableOrigin {
@ -1611,7 +1611,7 @@ fn test_fetch_compressed_response_update_count() {
sender: Some(sender),
update_count: 0,
};
let response_update_count = crate::HANDLE.block_on(async move {
let response_update_count = spawn_blocking_task::<_, Response>(async move {
methods::fetch(
request,
&mut target,

View file

@ -37,6 +37,7 @@ use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request as HyperRequest, Response as HyperResponse};
use hyper_util::rt::tokio::TokioIo;
use net::async_runtime::{init_async_runtime, spawn_blocking_task, spawn_task};
use net::connector::{create_http_client, create_tls_config};
use net::fetch::cors_cache::CorsCache;
use net::fetch::methods::{self, FetchContext};
@ -48,25 +49,19 @@ use net::test::HttpState;
use net_traits::filemanager_thread::FileTokenCheck;
use net_traits::request::Request;
use net_traits::response::Response;
use net_traits::{FetchTaskTarget, ResourceFetchTiming, ResourceTimingType};
use net_traits::{AsyncRuntime, FetchTaskTarget, ResourceFetchTiming, ResourceTimingType};
use rustls_pemfile::{certs, pkcs8_private_keys};
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use servo_arc::Arc as ServoArc;
use servo_url::ServoUrl;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::{Builder, Runtime};
use tokio_rustls::{self, TlsAcceptor};
pub static HANDLE: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
.enable_io()
.worker_threads(10)
.build()
.unwrap()
});
const DEFAULT_USER_AGENT: &'static str = "Such Browser. Very Layout. Wow.";
static ASYNC_RUNTIME: LazyLock<Arc<Mutex<Box<dyn AsyncRuntime>>>> =
LazyLock::new(|| Arc::new(Mutex::new(init_async_runtime())));
struct FetchResponseCollector {
sender: Option<tokio::sync::oneshot::Sender<Response>>,
}
@ -168,6 +163,8 @@ fn new_fetch_context(
fc: Option<EmbedderProxy>,
pool_handle: Option<Weak<CoreResourceThreadPool>>,
) -> FetchContext {
let _ = &*ASYNC_RUNTIME;
let sender = fc.unwrap_or_else(|| create_embedder_proxy());
FetchContext {
@ -208,7 +205,7 @@ fn fetch_with_context(request: Request, mut context: &mut FetchContext) -> Respo
let mut target = FetchResponseCollector {
sender: Some(sender),
};
HANDLE.block_on(async move {
spawn_blocking_task::<_, Response>(async move {
methods::fetch(request, &mut target, &mut context).await;
receiver.await.unwrap()
})
@ -219,14 +216,9 @@ fn fetch_with_cors_cache(request: Request, cache: &mut CorsCache) -> Response {
let mut target = FetchResponseCollector {
sender: Some(sender),
};
HANDLE.block_on(async move {
methods::fetch_with_cors_cache(
request,
cache,
&mut target,
&mut new_fetch_context(None, None, None),
)
.await;
let mut fetch_context = new_fetch_context(None, None, None);
spawn_blocking_task::<_, Response>(async move {
methods::fetch_with_cors_cache(request, cache, &mut target, &mut fetch_context).await;
receiver.await.unwrap()
})
}
@ -249,11 +241,15 @@ where
+ Sync
+ 'static,
{
let _ = &*ASYNC_RUNTIME;
let handler = Arc::new(handler);
let listener = StdTcpListener::bind("0.0.0.0:0").unwrap();
listener.set_nonblocking(true).unwrap();
let listener = HANDLE.block_on(async move { TcpListener::from_std(listener).unwrap() });
let listener =
spawn_blocking_task::<_, TcpListener>(
async move { TcpListener::from_std(listener).unwrap() },
);
let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port());
let url = ServoUrl::parse(&url_string).unwrap();
@ -290,13 +286,13 @@ where
}),
);
let conn = graceful.watch(conn);
HANDLE.spawn(async move {
spawn_task(async move {
let _ = conn.await;
});
}
};
let _ = HANDLE.spawn(server);
let _ = spawn_task(server);
(
Server {
close_channel: tx,
@ -337,10 +333,14 @@ where
+ Sync
+ 'static,
{
let _ = &*ASYNC_RUNTIME;
let handler = Arc::new(handler);
let listener = StdTcpListener::bind("[::0]:0").unwrap();
listener.set_nonblocking(true).unwrap();
let listener = HANDLE.block_on(async move { TcpListener::from_std(listener).unwrap() });
let listener =
spawn_blocking_task::<_, TcpListener>(
async move { TcpListener::from_std(listener).unwrap() },
);
let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port());
let url = ServoUrl::parse(&url_string).unwrap();
@ -400,7 +400,7 @@ where
}
};
HANDLE.spawn(server);
spawn_task(server);
(
Server {

View file

@ -39,7 +39,7 @@ use tungstenite::handshake::client::{Request, Response};
use tungstenite::protocol::CloseFrame;
use url::Url;
use crate::async_runtime::HANDLE;
use crate::async_runtime::spawn_task;
use crate::connector::{CACertificates, TlsConfig, create_tls_config};
use crate::cookie::ServoCookie;
use crate::fetch::methods::{
@ -423,7 +423,7 @@ fn connect(
tls_config.alpn_protocols = vec!["http/1.1".to_string().into()];
let resource_event_sender2 = resource_event_sender.clone();
HANDLE.spawn(
spawn_task(
start_websocket(
http_state,
req_url.clone(),

View file

@ -1141,7 +1141,7 @@ fn create_constellation(
let bluetooth_thread: IpcSender<BluetoothRequest> =
BluetoothThreadFactory::new(embedder_proxy.clone());
let (public_resource_threads, private_resource_threads) = new_resource_threads(
let (public_resource_threads, private_resource_threads, async_runtime) = new_resource_threads(
devtools_sender.clone(),
time_profiler_chan.clone(),
mem_profiler_chan.clone(),
@ -1182,6 +1182,7 @@ fn create_constellation(
#[cfg(feature = "webgpu")]
wgpu_image_map,
user_content_manager,
async_runtime,
};
let layout_factory = Arc::new(LayoutFactoryImpl());

View file

@ -389,6 +389,12 @@ impl<T: FetchResponseListener> Action<T> for FetchResponseMsg {
}
}
/// Handle to an async runtime,
/// only used to shut it down for now.
pub trait AsyncRuntime: Send {
fn shutdown(&mut self);
}
/// Handle to a resource thread
pub type CoreResourceThread = IpcSender<CoreResourceMsg>;