From 77ff351cde2369c8e95af460a6670b728399b005 Mon Sep 17 00:00:00 2001 From: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Tue, 5 Aug 2025 05:42:47 +0800 Subject: [PATCH] net: clean shutdown of the async runtime (#38425) The previous use of a static variable for the runtime prevented it from shutting down cleanly, because shutdown requires dropping or taking ownership of it. This PR switches the static variable to a handle only, and introduces a new trait to pass a handle to the async runtime to the constellation, where it can be shut-down along with other components and help reduce our count of still running threads after shutdown. Testing: manual testing, and covered by unit-test in net, and wpt tests. Fixes: part of - https://github.com/servo/servo/issues/30849 --------- Signed-off-by: gterzian <2792687+gterzian@users.noreply.github.com> --- components/constellation/constellation.rs | 12 +++- components/net/async_runtime.rs | 75 ++++++++++++++++++++--- components/net/connector.rs | 4 +- components/net/http_loader.rs | 6 +- components/net/resource_thread.rs | 16 +++-- components/net/tests/fetch.rs | 3 +- components/net/tests/http_loader.rs | 4 +- components/net/tests/main.rs | 48 +++++++-------- components/net/websocket_loader.rs | 4 +- components/servo/lib.rs | 3 +- components/shared/net/lib.rs | 6 ++ 11 files changed, 132 insertions(+), 49 deletions(-) diff --git a/components/constellation/constellation.rs b/components/constellation/constellation.rs index c979ec01c95..c9e75476974 100644 --- a/components/constellation/constellation.rs +++ b/components/constellation/constellation.rs @@ -149,7 +149,7 @@ use media::WindowGLContext; use net_traits::pub_domains::reg_host; use net_traits::request::Referrer; use net_traits::storage_thread::{StorageThreadMsg, StorageType}; -use net_traits::{self, IpcSend, ReferrerPolicy, ResourceThreads}; +use net_traits::{self, AsyncRuntime, IpcSend, ReferrerPolicy, ResourceThreads}; use profile_traits::mem::ProfilerMsg; use profile_traits::{mem, time}; use script_traits::{ @@ -458,6 +458,9 @@ pub struct Constellation { /// The process manager. process_manager: ProcessManager, + + /// The async runtime. + async_runtime: Box, } /// State needed to construct a constellation. @@ -510,6 +513,9 @@ pub struct InitialConstellationState { /// User content manager pub user_content_manager: UserContentManager, + + /// The async runtime. + pub async_runtime: Box, } /// When we are running reftests, we save an image to compare against a reference. @@ -704,6 +710,7 @@ where rippy_data, user_content_manager: state.user_content_manager, process_manager: ProcessManager::new(state.mem_profiler_chan), + async_runtime: state.async_runtime, }; constellation.run(); @@ -2651,6 +2658,9 @@ where debug!("Shutting-down IPC router thread in constellation."); ROUTER.shutdown(); + + debug!("Shutting-down the async runtime in constellation."); + self.async_runtime.shutdown(); } fn handle_pipeline_exited(&mut self, pipeline_id: PipelineId) { diff --git a/components/net/async_runtime.rs b/components/net/async_runtime.rs index 909bdef8fb0..ad366bfc674 100644 --- a/components/net/async_runtime.rs +++ b/components/net/async_runtime.rs @@ -1,15 +1,45 @@ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::cmp::Ord; -use std::sync::LazyLock; +use std::sync::OnceLock; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; +use std::time::Duration; -use tokio::runtime::{Builder, Runtime}; +use futures::Future; +use net_traits::AsyncRuntime; +use tokio::runtime::{Builder, Handle, Runtime}; -pub static HANDLE: LazyLock = LazyLock::new(|| { - Builder::new_multi_thread() +/// The actual runtime, +/// to be used as part of shut-down. +pub struct AsyncRuntimeHolder { + runtime: Option, +} + +impl AsyncRuntimeHolder { + pub(crate) fn new(runtime: Runtime) -> Self { + Self { + runtime: Some(runtime), + } + } +} + +impl AsyncRuntime for AsyncRuntimeHolder { + fn shutdown(&mut self) { + self.runtime + .take() + .expect("Runtime should have been initialized on start-up.") + .shutdown_timeout(Duration::from_millis(100)) + } +} + +/// A shared handle to the runtime, +/// to be initialized on start-up. +static ASYNC_RUNTIME_HANDLE: OnceLock = OnceLock::new(); + +pub fn init_async_runtime() -> Box { + // Initialize a tokio runtime. + let runtime = Builder::new_multi_thread() .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed); @@ -24,5 +54,36 @@ pub static HANDLE: LazyLock = LazyLock::new(|| { .enable_io() .enable_time() .build() - .expect("Unable to build tokio-runtime runtime") -}); + .expect("Unable to build tokio-runtime runtime"); + + // Make the runtime available to users inside this crate. + ASYNC_RUNTIME_HANDLE + .set(runtime.handle().clone()) + .expect("Runtime handle should be initialized once on start-up"); + + // Return an async runtime for use in shutdown. + Box::new(AsyncRuntimeHolder::new(runtime)) +} + +/// Spawn a task using the handle to the runtime. +pub fn spawn_task(task: F) +where + F: Future + 'static + std::marker::Send, + F::Output: Send + 'static, +{ + ASYNC_RUNTIME_HANDLE + .get() + .expect("Runtime handle should be initialized on start-up") + .spawn(task); +} + +/// Spawn a blocking task using the handle to the runtime. +pub fn spawn_blocking_task(task: F) -> F::Output +where + F: Future, +{ + ASYNC_RUNTIME_HANDLE + .get() + .expect("Runtime handle should be initialized on start-up") + .block_on(task) +} diff --git a/components/net/connector.rs b/components/net/connector.rs index e02ff8971e3..aadce10cba7 100644 --- a/components/net/connector.rs +++ b/components/net/connector.rs @@ -21,7 +21,7 @@ use rustls::{ClientConfig, RootCertStore}; use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use tower_service::Service; -use crate::async_runtime::HANDLE; +use crate::async_runtime::spawn_task; use crate::hosts::replace_host; pub const BUF_SIZE: usize = 32768; @@ -165,7 +165,7 @@ where F: Future + 'static + std::marker::Send, { fn execute(&self, fut: F) { - HANDLE.spawn(fut); + spawn_task(fut); } } diff --git a/components/net/http_loader.rs b/components/net/http_loader.rs index 56d709e07ca..4d0a07177d4 100644 --- a/components/net/http_loader.rs +++ b/components/net/http_loader.rs @@ -66,7 +66,7 @@ use tokio::sync::mpsc::{ }; use tokio_stream::wrappers::ReceiverStream; -use crate::async_runtime::HANDLE; +use crate::async_runtime::spawn_task; use crate::connector::{CertificateErrorOverrideManager, Connector}; use crate::cookie::ServoCookie; use crate::cookie_storage::CookieStorage; @@ -574,7 +574,7 @@ impl BodySink { match self { BodySink::Chunked(sender) => { let sender = sender.clone(); - HANDLE.spawn(async move { + spawn_task(async move { let _ = sender .send(Ok(Frame::data(Bytes::copy_from_slice(&bytes)))) .await; @@ -2090,7 +2090,7 @@ async fn http_network_fetch( let headers = response.headers.clone(); let devtools_chan = context.devtools_chan.clone(); - HANDLE.spawn( + spawn_task( res.into_body() .map_err(|e| { warn!("Error streaming response body: {:?}", e); diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index a150c9d4f41..e8afa6e2c13 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -29,9 +29,9 @@ use net_traits::request::{Destination, RequestBuilder, RequestId}; use net_traits::response::{Response, ResponseInit}; use net_traits::storage_thread::StorageThreadMsg; use net_traits::{ - CookieSource, CoreResourceMsg, CoreResourceThread, CustomResponseMediator, DiscardFetch, - FetchChannels, FetchTaskTarget, ResourceFetchTiming, ResourceThreads, ResourceTimingType, - WebSocketDomAction, WebSocketNetworkEvent, + AsyncRuntime, CookieSource, CoreResourceMsg, CoreResourceThread, CustomResponseMediator, + DiscardFetch, FetchChannels, FetchTaskTarget, ResourceFetchTiming, ResourceThreads, + ResourceTimingType, WebSocketDomAction, WebSocketNetworkEvent, }; use profile_traits::mem::{ ProcessReports, ProfilerChan as MemProfilerChan, Report, ReportKind, ReportsChan, @@ -44,7 +44,7 @@ use serde::{Deserialize, Serialize}; use servo_arc::Arc as ServoArc; use servo_url::{ImmutableOrigin, ServoUrl}; -use crate::async_runtime::HANDLE; +use crate::async_runtime::{init_async_runtime, spawn_task}; use crate::connector::{ CACertificates, CertificateErrorOverrideManager, create_http_client, create_tls_config, }; @@ -84,7 +84,10 @@ pub fn new_resource_threads( certificate_path: Option, ignore_certificate_errors: bool, protocols: Arc, -) -> (ResourceThreads, ResourceThreads) { +) -> (ResourceThreads, ResourceThreads, Box) { + // Initialize the async runtime, and get a handle to it for use in clean shutdown. + let async_runtime = init_async_runtime(); + let ca_certificates = match certificate_path { Some(path) => match load_root_cert_store_from_file(path) { Ok(root_cert_store) => CACertificates::Override(root_cert_store), @@ -112,6 +115,7 @@ pub fn new_resource_threads( ( ResourceThreads::new(public_core, storage.clone(), idb.clone()), ResourceThreads::new(private_core, storage, idb), + async_runtime, ) } @@ -781,7 +785,7 @@ impl CoreResourceManager { _ => (FileTokenCheck::NotRequired, None), }; - HANDLE.spawn(async move { + spawn_task(async move { // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) // todo load context / mimesniff in fetch // todo referrer policy? diff --git a/components/net/tests/fetch.rs b/components/net/tests/fetch.rs index 2bf7768deea..0540d0538cf 100644 --- a/components/net/tests/fetch.rs +++ b/components/net/tests/fetch.rs @@ -26,6 +26,7 @@ use http_body_util::combinators::BoxBody; use hyper::body::{Bytes, Incoming}; use hyper::{Request as HyperRequest, Response as HyperResponse}; use mime::{self, Mime}; +use net::async_runtime::spawn_blocking_task; use net::fetch::cors_cache::CorsCache; use net::fetch::methods::{self, FetchContext}; use net::filemanager_thread::FileManager; @@ -200,7 +201,7 @@ fn test_fetch_blob() { expected: bytes.to_vec(), }; - crate::HANDLE.block_on(methods::fetch(request, &mut target, &context)); + spawn_blocking_task::<_, Response>(methods::fetch(request, &mut target, &context)); let fetch_response = receiver.recv().unwrap(); assert!(!fetch_response.is_network_error()); diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index 76ab3b7197c..49b8b1b706b 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -49,7 +49,7 @@ use url::Url; use crate::{ create_embedder_proxy_and_receiver, fetch, fetch_with_context, make_body, make_server, - new_fetch_context, receive_credential_prompt_msgs, + new_fetch_context, receive_credential_prompt_msgs, spawn_blocking_task, }; fn mock_origin() -> ImmutableOrigin { @@ -1611,7 +1611,7 @@ fn test_fetch_compressed_response_update_count() { sender: Some(sender), update_count: 0, }; - let response_update_count = crate::HANDLE.block_on(async move { + let response_update_count = spawn_blocking_task::<_, Response>(async move { methods::fetch( request, &mut target, diff --git a/components/net/tests/main.rs b/components/net/tests/main.rs index 77bc11d4c61..0e7cb35a1dd 100644 --- a/components/net/tests/main.rs +++ b/components/net/tests/main.rs @@ -37,6 +37,7 @@ use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Request as HyperRequest, Response as HyperResponse}; use hyper_util::rt::tokio::TokioIo; +use net::async_runtime::{init_async_runtime, spawn_blocking_task, spawn_task}; use net::connector::{create_http_client, create_tls_config}; use net::fetch::cors_cache::CorsCache; use net::fetch::methods::{self, FetchContext}; @@ -48,25 +49,19 @@ use net::test::HttpState; use net_traits::filemanager_thread::FileTokenCheck; use net_traits::request::Request; use net_traits::response::Response; -use net_traits::{FetchTaskTarget, ResourceFetchTiming, ResourceTimingType}; +use net_traits::{AsyncRuntime, FetchTaskTarget, ResourceFetchTiming, ResourceTimingType}; use rustls_pemfile::{certs, pkcs8_private_keys}; use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use servo_arc::Arc as ServoArc; use servo_url::ServoUrl; use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime::{Builder, Runtime}; use tokio_rustls::{self, TlsAcceptor}; -pub static HANDLE: LazyLock = LazyLock::new(|| { - Builder::new_multi_thread() - .enable_io() - .worker_threads(10) - .build() - .unwrap() -}); - const DEFAULT_USER_AGENT: &'static str = "Such Browser. Very Layout. Wow."; +static ASYNC_RUNTIME: LazyLock>>> = + LazyLock::new(|| Arc::new(Mutex::new(init_async_runtime()))); + struct FetchResponseCollector { sender: Option>, } @@ -168,6 +163,8 @@ fn new_fetch_context( fc: Option, pool_handle: Option>, ) -> FetchContext { + let _ = &*ASYNC_RUNTIME; + let sender = fc.unwrap_or_else(|| create_embedder_proxy()); FetchContext { @@ -208,7 +205,7 @@ fn fetch_with_context(request: Request, mut context: &mut FetchContext) -> Respo let mut target = FetchResponseCollector { sender: Some(sender), }; - HANDLE.block_on(async move { + spawn_blocking_task::<_, Response>(async move { methods::fetch(request, &mut target, &mut context).await; receiver.await.unwrap() }) @@ -219,14 +216,9 @@ fn fetch_with_cors_cache(request: Request, cache: &mut CorsCache) -> Response { let mut target = FetchResponseCollector { sender: Some(sender), }; - HANDLE.block_on(async move { - methods::fetch_with_cors_cache( - request, - cache, - &mut target, - &mut new_fetch_context(None, None, None), - ) - .await; + let mut fetch_context = new_fetch_context(None, None, None); + spawn_blocking_task::<_, Response>(async move { + methods::fetch_with_cors_cache(request, cache, &mut target, &mut fetch_context).await; receiver.await.unwrap() }) } @@ -249,11 +241,15 @@ where + Sync + 'static, { + let _ = &*ASYNC_RUNTIME; let handler = Arc::new(handler); let listener = StdTcpListener::bind("0.0.0.0:0").unwrap(); listener.set_nonblocking(true).unwrap(); - let listener = HANDLE.block_on(async move { TcpListener::from_std(listener).unwrap() }); + let listener = + spawn_blocking_task::<_, TcpListener>( + async move { TcpListener::from_std(listener).unwrap() }, + ); let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); let url = ServoUrl::parse(&url_string).unwrap(); @@ -290,13 +286,13 @@ where }), ); let conn = graceful.watch(conn); - HANDLE.spawn(async move { + spawn_task(async move { let _ = conn.await; }); } }; - let _ = HANDLE.spawn(server); + let _ = spawn_task(server); ( Server { close_channel: tx, @@ -337,10 +333,14 @@ where + Sync + 'static, { + let _ = &*ASYNC_RUNTIME; let handler = Arc::new(handler); let listener = StdTcpListener::bind("[::0]:0").unwrap(); listener.set_nonblocking(true).unwrap(); - let listener = HANDLE.block_on(async move { TcpListener::from_std(listener).unwrap() }); + let listener = + spawn_blocking_task::<_, TcpListener>( + async move { TcpListener::from_std(listener).unwrap() }, + ); let url_string = format!("http://localhost:{}", listener.local_addr().unwrap().port()); let url = ServoUrl::parse(&url_string).unwrap(); @@ -400,7 +400,7 @@ where } }; - HANDLE.spawn(server); + spawn_task(server); ( Server { diff --git a/components/net/websocket_loader.rs b/components/net/websocket_loader.rs index 0b5a81f47ae..26cdf9bc50f 100644 --- a/components/net/websocket_loader.rs +++ b/components/net/websocket_loader.rs @@ -39,7 +39,7 @@ use tungstenite::handshake::client::{Request, Response}; use tungstenite::protocol::CloseFrame; use url::Url; -use crate::async_runtime::HANDLE; +use crate::async_runtime::spawn_task; use crate::connector::{CACertificates, TlsConfig, create_tls_config}; use crate::cookie::ServoCookie; use crate::fetch::methods::{ @@ -423,7 +423,7 @@ fn connect( tls_config.alpn_protocols = vec!["http/1.1".to_string().into()]; let resource_event_sender2 = resource_event_sender.clone(); - HANDLE.spawn( + spawn_task( start_websocket( http_state, req_url.clone(), diff --git a/components/servo/lib.rs b/components/servo/lib.rs index 04bfd820efb..72ec27a4c5c 100644 --- a/components/servo/lib.rs +++ b/components/servo/lib.rs @@ -1141,7 +1141,7 @@ fn create_constellation( let bluetooth_thread: IpcSender = BluetoothThreadFactory::new(embedder_proxy.clone()); - let (public_resource_threads, private_resource_threads) = new_resource_threads( + let (public_resource_threads, private_resource_threads, async_runtime) = new_resource_threads( devtools_sender.clone(), time_profiler_chan.clone(), mem_profiler_chan.clone(), @@ -1182,6 +1182,7 @@ fn create_constellation( #[cfg(feature = "webgpu")] wgpu_image_map, user_content_manager, + async_runtime, }; let layout_factory = Arc::new(LayoutFactoryImpl()); diff --git a/components/shared/net/lib.rs b/components/shared/net/lib.rs index a4bd72f419c..382554b147e 100644 --- a/components/shared/net/lib.rs +++ b/components/shared/net/lib.rs @@ -389,6 +389,12 @@ impl Action for FetchResponseMsg { } } +/// Handle to an async runtime, +/// only used to shut it down for now. +pub trait AsyncRuntime: Send { + fn shutdown(&mut self); +} + /// Handle to a resource thread pub type CoreResourceThread = IpcSender;