webgpu: Use WGPU poller thread for poll_all_devices (#32266)

* Use special WGPU poller thread for poll_all_devices

* Switch to latest wgpu

This is required to fix some deadlocks.

* non-blocking poll unconditionally

* small fixes
This commit is contained in:
Samson 2024-05-15 05:36:01 +02:00 committed by GitHub
parent bb5906eeec
commit 00f267e289
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 156 additions and 35 deletions

16
Cargo.lock generated
View file

@ -1135,8 +1135,7 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"
[[package]]
name = "d3d12"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b28bfe653d79bd16c77f659305b195b82bb5ce0c0eb2a4846b82ddbd77586813"
source = "git+https://github.com/gfx-rs/wgpu?rev=d0a5e48aa7e84683114c3870051cc414ae92ac03#d0a5e48aa7e84683114c3870051cc414ae92ac03"
dependencies = [
"bitflags 2.5.0",
"libloading 0.7.4",
@ -3902,8 +3901,7 @@ checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]]
name = "naga"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e536ae46fcab0876853bd4a632ede5df4b1c2527a58f6c5a4150fe86be858231"
source = "git+https://github.com/gfx-rs/wgpu?rev=d0a5e48aa7e84683114c3870051cc414ae92ac03#d0a5e48aa7e84683114c3870051cc414ae92ac03"
dependencies = [
"arrayvec",
"bit-set",
@ -7203,14 +7201,12 @@ checksum = "53a85b86a771b1c87058196170769dd264f66c0782acf1ae6cc51bfd64b39082"
[[package]]
name = "wgpu-core"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6a86eaa5e763e59c73cf9e97d55fffd4dfda69fd8bda19589fcf851ddfef1f"
source = "git+https://github.com/gfx-rs/wgpu?rev=d0a5e48aa7e84683114c3870051cc414ae92ac03#d0a5e48aa7e84683114c3870051cc414ae92ac03"
dependencies = [
"arrayvec",
"bit-vec",
"bitflags 2.5.0",
"cfg_aliases",
"codespan-reporting",
"document-features",
"indexmap 2.2.6",
"log",
@ -7231,8 +7227,7 @@ dependencies = [
[[package]]
name = "wgpu-hal"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d71c8ae05170583049b65ee562fd839fdc0b3e9ddb84f4e40c9d5f8ea0d4c8c"
source = "git+https://github.com/gfx-rs/wgpu?rev=d0a5e48aa7e84683114c3870051cc414ae92ac03#d0a5e48aa7e84683114c3870051cc414ae92ac03"
dependencies = [
"android_system_properties",
"arrayvec",
@ -7270,8 +7265,7 @@ dependencies = [
[[package]]
name = "wgpu-types"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1353d9a46bff7f955a680577f34c69122628cc2076e1d6f3a9be6ef00ae793ef"
source = "git+https://github.com/gfx-rs/wgpu?rev=d0a5e48aa7e84683114c3870051cc414ae92ac03#d0a5e48aa7e84683114c3870051cc414ae92ac03"
dependencies = [
"bitflags 2.5.0",
"js-sys",

View file

@ -125,8 +125,8 @@ webpki-roots = "0.25"
webrender = { git = "https://github.com/servo/webrender", branch = "0.64", features = ["capture"] }
webrender_api = { git = "https://github.com/servo/webrender", branch = "0.64" }
webrender_traits = { path = "components/shared/webrender" }
wgpu-core = "0.20"
wgpu-types = "0.20"
wgpu-core = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
wgpu-types = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
winapi = "0.3"
xi-unicode = "0.1.0"
xml5ever = "0.18"

View file

@ -8,6 +8,7 @@ use wgpu_thread::WGPU;
pub use {wgpu_core as wgc, wgpu_types as wgt};
pub mod identity;
mod poll_thread;
mod wgpu_thread;
use std::borrow::Cow;
@ -86,7 +87,7 @@ impl WebGPU {
.run();
})
{
warn!("Failed to spwan WGPU thread ({})", e);
warn!("Failed to spawn WGPU thread ({})", e);
return None;
}
Some((WebGPU(sender), script_recv))

View file

@ -0,0 +1,126 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
//! Data and main loop of WGPU poll thread.
//!
//! This is roughly based on <https://github.com/LucentFlux/wgpu-async/blob/1322c7e3fcdfc1865a472c7bbbf0e2e06dcf4da8/src/wgpu_future.rs>
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use log::warn;
use crate::wgc::global::Global;
/// Polls devices while there is something to poll.
///
/// This objects corresponds to a thread that parks itself when there is no work,
/// waiting on it, and then calls `poll_all_devices` repeatedly to block.
///
/// The thread dies when this object is dropped, and all work in submission is done.
///
/// ## Example
/// ```no_run
/// let token = self.poller.token(); // create a new token
/// let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
/// drop(token); // drop token as closure has been fired
/// // ...
/// }));
/// let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
/// self.poller.wake(); // wake poller thread to actually poll
/// ```
#[derive(Debug)]
pub(crate) struct Poller {
/// The number of closures that still needs to be fired.
/// When this is 0, the thread can park itself.
work_count: Arc<AtomicUsize>,
/// True if thread should die after all work in submission is done
is_done: Arc<AtomicBool>,
/// Handle to the WGPU poller thread (to be used for unparking the thread)
handle: Option<JoinHandle<()>>,
}
#[inline]
fn poll_all_devices(global: &Arc<Global>, more_work: &mut bool, force_wait: bool) {
match global.poll_all_devices(force_wait) {
Ok(all_queue_empty) => *more_work = !all_queue_empty,
Err(e) => warn!("Poller thread got `{e}` on poll_all_devices."),
}
}
impl Poller {
pub(crate) fn new(global: Arc<Global>) -> Self {
let work_count = Arc::new(AtomicUsize::new(0));
let is_done = Arc::new(AtomicBool::new(false));
let work = work_count.clone();
let done = is_done.clone();
Self {
work_count,
is_done,
handle: Some(
std::thread::Builder::new()
.name("WGPU poller".into())
.spawn(move || {
while !done.load(Ordering::Acquire) {
let mut more_work = false;
// Do non-blocking poll unconditionally
// so every `ẁake` (even spurious) will do at least one poll.
// this is mostly useful for stuff that is deferred
// to maintain calls in wgpu (device resource destruction)
poll_all_devices(&global, &mut more_work, false);
while more_work || work.load(Ordering::Acquire) != 0 {
poll_all_devices(&global, &mut more_work, true);
}
std::thread::park(); //TODO: should we use timeout here
}
})
.expect("Spawning thread should not fail"),
),
}
}
/// Creates a token of work
pub(crate) fn token(&self) -> WorkToken {
let prev = self.work_count.fetch_add(1, Ordering::AcqRel);
debug_assert!(
prev < usize::MAX,
"cannot have more than `usize::MAX` outstanding operations on the GPU"
);
WorkToken {
work_count: Arc::clone(&self.work_count),
}
}
/// Wakes the poller thread to start polling.
pub(crate) fn wake(&self) {
self.handle
.as_ref()
.expect("Poller thread does not exist!")
.thread()
.unpark();
}
}
impl Drop for Poller {
fn drop(&mut self) {
self.is_done.store(true, Ordering::Release);
let handle = self.handle.take().expect("Poller dropped twice");
handle.thread().unpark();
handle.join().expect("Poller thread panicked");
}
}
/// RAII indicating that there is some work enqueued (closure to be fired),
/// while this token is held.
pub(crate) struct WorkToken {
work_count: Arc<AtomicUsize>,
}
impl Drop for WorkToken {
fn drop(&mut self) {
self.work_count.fetch_sub(1, Ordering::AcqRel);
}
}

View file

@ -8,7 +8,6 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::slice;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use arrayvec::ArrayVec;
use euclid::default::Size2D;
@ -27,12 +26,12 @@ use wgc::{gfx_select, id};
use wgt::InstanceDescriptor;
pub use {wgpu_core as wgc, wgpu_types as wgt};
use crate::poll_thread::Poller;
use crate::{
ErrorScopeId, PresentationData, Transmute, WebGPU, WebGPUAdapter, WebGPUDevice, WebGPUMsg,
WebGPUOpResult, WebGPUQueue, WebGPURequest, WebGPUResponse,
};
const DEVICE_POLL_INTERVAL: Duration = Duration::from_millis(50);
pub const PRESENTATION_BUFFER_COUNT: usize = 10;
#[allow(clippy::upper_case_acronyms)] // Name of the library
@ -51,7 +50,7 @@ pub(crate) struct WGPU {
webrender_document: DocumentId,
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
last_poll: Instant,
poller: Poller,
}
impl WGPU {
@ -64,17 +63,19 @@ impl WGPU {
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
) -> Self {
let global = Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
));
WGPU {
poller: Poller::new(Arc::clone(&global)),
receiver,
sender,
script_sender,
global: Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
)),
global,
adapters: Vec::new(),
devices: HashMap::new(),
_invalid_adapters: Vec::new(),
@ -83,21 +84,12 @@ impl WGPU {
webrender_document,
external_images,
wgpu_image_map,
last_poll: Instant::now(),
}
}
pub(crate) fn run(&mut self) {
loop {
let diff = DEVICE_POLL_INTERVAL.checked_sub(self.last_poll.elapsed());
if diff.is_none() {
let _ = self.global.poll_all_devices(false);
self.last_poll = Instant::now();
}
if let Ok((scope_id, msg)) = self
.receiver
.try_recv_timeout(diff.unwrap_or(DEVICE_POLL_INTERVAL))
{
if let Ok((scope_id, msg)) = self.receiver.recv() {
match msg {
WebGPURequest::BufferMapAsync {
sender,
@ -109,7 +101,9 @@ impl WGPU {
} => {
let glob = Arc::clone(&self.global);
let resp_sender = sender.clone();
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
@ -151,6 +145,7 @@ impl WGPU {
size,
operation
));
self.poller.wake();
if let Err(ref e) = result {
if let Err(w) = sender.send(Some(Err(format!("{:?}", e)))) {
warn!("Failed to send BufferMapAsync Response ({:?})", w);
@ -818,7 +813,9 @@ impl WGPU {
let wgpu_image_map = Arc::clone(&self.wgpu_image_map);
let webrender_api = Arc::clone(&self.webrender_api);
let webrender_document = self.webrender_document;
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
@ -866,6 +863,7 @@ impl WGPU {
};
let _ = gfx_select!(buffer_id
=> global.buffer_map_async(buffer_id, 0, Some(buffer_size), map_op));
self.poller.wake();
},
WebGPURequest::UnmapBuffer {
buffer_id,
@ -928,14 +926,16 @@ impl WGPU {
},
WebGPURequest::QueueOnSubmittedWorkDone { sender, queue_id } => {
let global = &self.global;
let token = self.poller.token();
let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
drop(token);
if let Err(e) = sender.send(Some(Ok(WebGPUResponse::SubmittedWorkDone)))
{
warn!("Could not send SubmittedWorkDone Response ({})", e);
}
}));
let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
self.poller.wake();
self.send_result(queue_id.transmute(), scope_id, result);
},
WebGPURequest::DropTexture(id) => {