script: Move TaskManager to GlobalScope (#34827)

This is a simplification of the internal `TaskQueue` API that moves the
`TaskManager` to the `GlobalScope` itself. In addition, the handling of
cancellers is moved to the `TaskManager` as well. This means that no
arguments other than the `task` are necessary for queueing tasks, which
makes the API a lot easier to use and cleaner.

`TaskSource` now also keeps a copy of the canceller with it, so that
they always know the proper way to cancel any tasks queued on them.

There is one complication here. The event loop `sender` for dedicated
workers is constantly changing as it is set to `None` when not handling
messages. This is because this sender keeps a handle to the main
thread's `Worker` object, preventing garbage collection while any
messages are still in flight or being handled. This change allows
setting the `sender` on the `TaskManager` to `None` to allow proper
garbabge collection.

Signed-off-by: Martin Robinson <mrobinson@igalia.com>
This commit is contained in:
Martin Robinson 2025-01-04 09:41:50 +01:00 committed by GitHub
parent 75a22cfe2e
commit b2eda71952
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
54 changed files with 1060 additions and 1516 deletions

View file

@ -70,6 +70,7 @@ use super::bindings::codegen::Bindings::MessagePortBinding::StructuredSerializeO
use super::bindings::codegen::Bindings::WebGPUBinding::GPUDeviceLostReason;
use super::bindings::error::Fallible;
use super::bindings::trace::{HashMapTracedValues, RootedTraceableBox};
use super::serviceworkerglobalscope::ServiceWorkerGlobalScope;
use crate::dom::bindings::cell::{DomRefCell, RefMut};
use crate::dom::bindings::codegen::Bindings::BroadcastChannelBinding::BroadcastChannelMethods;
use crate::dom::bindings::codegen::Bindings::EventSourceBinding::EventSource_Binding::EventSourceMethods;
@ -127,7 +128,6 @@ use crate::dom::webgpu::identityhub::IdentityHub;
use crate::dom::window::Window;
use crate::dom::workerglobalscope::WorkerGlobalScope;
use crate::dom::workletglobalscope::WorkletGlobalScope;
use crate::messaging::MainThreadScriptChan;
use crate::microtask::{Microtask, MicrotaskQueue, UserMicrotask};
use crate::network_listener::{NetworkListener, PreInvoke};
use crate::realms::{enter_realm, AlreadyInRealm, InRealm};
@ -137,9 +137,8 @@ use crate::script_runtime::{
};
use crate::script_thread::{with_script_thread, ScriptThread};
use crate::security_manager::CSPViolationReporter;
use crate::task::TaskCanceller;
use crate::task_manager::TaskManager;
use crate::task_source::{TaskSource, TaskSourceName};
use crate::task_source::TaskSource;
use crate::timers::{
IsInterval, OneshotTimerCallback, OneshotTimerHandle, OneshotTimers, TimerCallback,
};
@ -195,6 +194,9 @@ pub struct GlobalScope {
eventtarget: EventTarget,
crypto: MutNullableDom<Crypto>,
/// A [`TaskManager`] for this [`GlobalScope`].
task_manager: OnceCell<TaskManager>,
/// The message-port router id for this global, if it is managing ports.
message_port_state: DomRefCell<MessagePortState>,
@ -374,14 +376,12 @@ pub struct GlobalScope {
/// A wrapper for glue-code between the ipc router and the event-loop.
struct MessageListener {
canceller: TaskCanceller,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
/// A wrapper for broadcasts coming in over IPC, and the event-loop.
struct BroadcastListener {
canceller: TaskCanceller,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
@ -389,7 +389,6 @@ struct BroadcastListener {
/// A wrapper between timer events coming in over IPC, and the event-loop.
#[derive(Clone)]
pub(crate) struct TimerListener {
canceller: TaskCanceller,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
@ -403,7 +402,6 @@ struct FileListener {
/// - Some(Empty) => None
state: Option<FileListenerState>,
task_source: TaskSource,
task_canceller: TaskCanceller,
}
enum FileListenerTarget {
@ -516,15 +514,14 @@ impl BroadcastListener {
// This however seems to be hard to avoid in the light of the IPC.
// One can imagine queueing tasks directly,
// for channels that would be in the same script-thread.
let _ = self.task_source.queue_with_canceller(
task!(broadcast_message_event: move || {
let _ = self
.task_source
.queue(task!(broadcast_message_event: move || {
let global = context.root();
// Step 10 of https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage,
// For each BroadcastChannel object destination in destinations, queue a task.
global.broadcast_message_event(event, None);
}),
&self.canceller,
);
}));
}
}
@ -535,7 +532,7 @@ impl TimerListener {
let context = self.context.clone();
// Step 18, queue a task,
// https://html.spec.whatwg.org/multipage/#timer-initialisation-steps
let _ = self.task_source.queue_with_canceller(
let _ = self.task_source.queue(
task!(timer_event: move || {
let global = context.root();
let TimerEvent(source, id) = event;
@ -550,8 +547,7 @@ impl TimerListener {
};
// Step 7, substeps run in a task.
global.fire_timer(id, CanGc::note());
}),
&self.canceller,
})
);
}
@ -568,7 +564,7 @@ impl MessageListener {
match msg {
MessagePortMsg::CompleteTransfer(ports) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
let _ = self.task_source.queue(
task!(process_complete_transfer: move || {
let global = context.root();
@ -598,39 +594,31 @@ impl MessageListener {
let _ = global.script_to_constellation_chan().send(
ScriptMsg::MessagePortTransferResult(Some(router_id), succeeded, failed),
);
}),
&self.canceller,
})
);
},
MessagePortMsg::CompletePendingTransfer(port_id, buffer) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(complete_pending: move || {
let global = context.root();
global.complete_port_transfer(port_id, buffer);
}),
&self.canceller,
);
let _ = self.task_source.queue(task!(complete_pending: move || {
let global = context.root();
global.complete_port_transfer(port_id, buffer);
}));
},
MessagePortMsg::NewTask(port_id, task) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_new_task: move || {
let global = context.root();
global.route_task_to_port(port_id, task, CanGc::note());
}),
&self.canceller,
);
let _ = self.task_source.queue(task!(process_new_task: move || {
let global = context.root();
global.route_task_to_port(port_id, task, CanGc::note());
}));
},
MessagePortMsg::RemoveMessagePort(port_id) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_remove_message_port: move || {
let _ = self
.task_source
.queue(task!(process_remove_message_port: move || {
let global = context.root();
global.note_entangled_port_removed(&port_id);
}),
&self.canceller,
);
}));
},
}
}
@ -665,10 +653,8 @@ impl FileListener {
let stream = trusted.root();
stream_handle_incoming(&stream, Ok(blob_buf.bytes));
});
let _ = self.task_source.queue(task);
let _ = self
.task_source
.queue_with_canceller(task, &self.task_canceller);
Vec::with_capacity(0)
} else {
blob_buf.bytes
@ -690,9 +676,7 @@ impl FileListener {
stream_handle_incoming(&stream, Ok(bytes_in));
});
let _ = self
.task_source
.queue_with_canceller(task, &self.task_canceller);
let _ = self.task_source.queue(task);
} else {
bytes.append(&mut bytes_in);
};
@ -712,9 +696,7 @@ impl FileListener {
callback(promise, Ok(bytes));
});
let _ = self
.task_source
.queue_with_canceller(task, &self.task_canceller);
let _ = self.task_source.queue(task);
},
FileListenerTarget::Stream(trusted_stream) => {
let trusted = trusted_stream.clone();
@ -724,9 +706,7 @@ impl FileListener {
stream_handle_eof(&stream);
});
let _ = self
.task_source
.queue_with_canceller(task, &self.task_canceller);
let _ = self.task_source.queue(task);
},
},
_ => {
@ -740,23 +720,17 @@ impl FileListener {
match target {
FileListenerTarget::Promise(trusted_promise, callback) => {
let _ = self.task_source.queue_with_canceller(
task!(reject_promise: move || {
let promise = trusted_promise.root();
let _ac = enter_realm(&*promise.global());
callback(promise, error);
}),
&self.task_canceller,
);
let _ = self.task_source.queue(task!(reject_promise: move || {
let promise = trusted_promise.root();
let _ac = enter_realm(&*promise.global());
callback(promise, error);
}));
},
FileListenerTarget::Stream(trusted_stream) => {
let _ = self.task_source.queue_with_canceller(
task!(error_stream: move || {
let stream = trusted_stream.root();
stream_handle_incoming(&stream, error);
}),
&self.task_canceller,
);
let _ = self.task_source.queue(task!(error_stream: move || {
let stream = trusted_stream.root();
stream_handle_incoming(&stream, error);
}));
},
}
},
@ -785,6 +759,7 @@ impl GlobalScope {
unminify_js: bool,
) -> Self {
Self {
task_manager: Default::default(),
message_port_state: DomRefCell::new(MessagePortState::UnManaged),
broadcast_channel_state: DomRefCell::new(BroadcastChannelState::UnManaged),
blob_state: DomRefCell::new(BlobState::UnManaged),
@ -851,16 +826,11 @@ impl GlobalScope {
fn timers(&self) -> &OneshotTimers {
self.timers.get_or_init(|| {
let (task_source, canceller) = (
self.task_manager().timer_task_source(),
self.task_canceller(TaskSourceName::Timer),
);
OneshotTimers::new(
self,
TimerListener {
context: Trusted::new(self),
task_source,
canceller,
task_source: self.task_manager().timer_task_source(),
},
)
})
@ -1099,7 +1069,6 @@ impl GlobalScope {
let target_global = this.root();
target_global.route_task_to_port(port_id, task, CanGc::note());
}),
self,
);
}
}
@ -1148,15 +1117,15 @@ impl GlobalScope {
if let Some(entangled_id) = entangled_port {
// Step 7
let this = Trusted::new(self);
let _ = self.task_manager().port_message_queue().queue(
task!(post_message: move || {
let global = this.root();
// Note: we do this in a task, as this will ensure the global and constellation
// are aware of any transfer that might still take place in the current task.
global.route_task_to_port(entangled_id, task, CanGc::note());
}),
self,
);
let _ =
self.task_manager()
.port_message_queue()
.queue(task!(post_message: move || {
let global = this.root();
// Note: we do this in a task, as this will ensure the global and constellation
// are aware of any transfer that might still take place in the current task.
global.route_task_to_port(entangled_id, task, CanGc::note());
}));
}
} else {
warn!("post_messageport_msg called on a global not managing any ports.");
@ -1273,8 +1242,7 @@ impl GlobalScope {
// Step 10.3, fire an event named messageerror at destination.
MessageEvent::dispatch_error(destination.upcast(), &global, CanGc::note());
}
}),
self,
})
);
});
}
@ -1438,13 +1406,8 @@ impl GlobalScope {
let (broadcast_control_sender, broadcast_control_receiver) =
ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self);
let (task_source, canceller) = (
self.task_manager().dom_manipulation_task_source(),
self.task_canceller(TaskSourceName::DOMManipulation),
);
let listener = BroadcastListener {
canceller,
task_source,
task_source: self.task_manager().dom_manipulation_task_source(),
context,
};
ROUTER.add_typed_route(
@ -1491,13 +1454,8 @@ impl GlobalScope {
let (port_control_sender, port_control_receiver) =
ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self);
let (task_source, canceller) = (
self.task_manager().port_message_queue(),
self.task_canceller(TaskSourceName::PortMessage),
);
let listener = MessageListener {
canceller,
task_source,
task_source: self.task_manager().port_message_queue(),
context,
};
ROUTER.add_typed_route(
@ -1540,7 +1498,6 @@ impl GlobalScope {
let target_global = this.root();
target_global.maybe_add_pending_ports();
}),
self,
);
} else {
// If this is a newly-created port, let the constellation immediately know.
@ -2017,15 +1974,11 @@ impl GlobalScope {
let recv = self.send_msg(file_id);
let trusted_stream = Trusted::new(&*stream.clone());
let task_canceller = self.task_canceller(TaskSourceName::FileReading);
let task_source = self.task_manager().file_reading_task_source();
let mut file_listener = FileListener {
state: Some(FileListenerState::Empty(FileListenerTarget::Stream(
trusted_stream,
))),
task_source,
task_canceller,
task_source: self.task_manager().file_reading_task_source(),
};
ROUTER.add_typed_route(
@ -2042,16 +1995,12 @@ impl GlobalScope {
let recv = self.send_msg(id);
let trusted_promise = TrustedPromise::new(promise);
let task_canceller = self.task_canceller(TaskSourceName::FileReading);
let task_source = self.task_manager().file_reading_task_source();
let mut file_listener = FileListener {
state: Some(FileListenerState::Empty(FileListenerTarget::Promise(
trusted_promise,
callback,
))),
task_source,
task_canceller,
task_source: self.task_manager().file_reading_task_source(),
};
ROUTER.add_typed_route(
@ -2574,28 +2523,36 @@ impl GlobalScope {
self.resource_threads().sender()
}
/// `ScriptChan` to send messages to the event loop of this global scope.
pub fn script_chan(&self) -> Box<dyn ScriptChan + Send> {
/// A sender to the event loop of this global scope. This either sends to the Worker event loop
/// or the ScriptThread event loop in the case of a `Window`. This can be `None` for dedicated
/// workers that are not currently handling a message.
pub(crate) fn event_loop_sender(&self) -> Option<Box<dyn ScriptChan + Send>> {
if let Some(window) = self.downcast::<Window>() {
return Box::new(
MainThreadScriptChan(window.main_thread_script_chan().clone()).clone(),
Some(window.event_loop_sender())
} else if let Some(dedicated) = self.downcast::<DedicatedWorkerGlobalScope>() {
dedicated.event_loop_sender()
} else if let Some(service_worker) = self.downcast::<ServiceWorkerGlobalScope>() {
Some(service_worker.event_loop_sender())
} else {
unreachable!(
"Tried to access event loop sender for incompatible \
GlobalScope (PaintWorklet or DissimilarOriginWindow)"
);
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.script_chan();
}
unreachable!();
}
/// The [`TaskManager`] used to schedule tasks for this [`GlobalScope`].
pub fn task_manager(&self) -> &TaskManager {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.task_manager();
}
unreachable!();
/// A reference to the [`TaskManager`] used to schedule tasks for this [`GlobalScope`].
pub(crate) fn task_manager(&self) -> &TaskManager {
let shared_canceller = self
.downcast::<WorkerGlobalScope>()
.map(WorkerGlobalScope::shared_task_canceller);
self.task_manager.get_or_init(|| {
TaskManager::new(
self.event_loop_sender(),
self.pipeline_id(),
shared_canceller,
)
})
}
/// Evaluate JS code on this global scope.
@ -2777,7 +2734,7 @@ impl GlobalScope {
);
self.task_manager()
.dom_manipulation_task_source()
.queue(task, self)
.queue(task)
.unwrap();
}
}
@ -2894,21 +2851,6 @@ impl GlobalScope {
true
}
/// Returns the task canceller of this global to ensure that everything is
/// properly cancelled when the global scope is destroyed.
pub fn task_canceller(&self, name: TaskSourceName) -> TaskCanceller {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().task_canceller(name);
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
// Note: the "name" is not passed to the worker,
// because 'closing' it only requires one task canceller for all task sources.
// https://html.spec.whatwg.org/multipage/#dom-workerglobalscope-closing
return worker.task_canceller();
}
unreachable!();
}
/// Perform a microtask checkpoint.
pub fn perform_a_microtask_checkpoint(&self, can_gc: CanGc) {
// Only perform the checkpoint if we're not shutting down.
@ -3163,29 +3105,26 @@ impl GlobalScope {
let this = Trusted::new(self);
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
task!(gamepad_connected: move || {
let global = this.root();
.queue(task!(gamepad_connected: move || {
let global = this.root();
if let Some(window) = global.downcast::<Window>() {
let navigator = window.Navigator();
let selected_index = navigator.select_gamepad_index();
let gamepad = Gamepad::new(
&global,
selected_index,
name,
"standard".into(),
axis_bounds,
button_bounds,
supported_haptic_effects,
false,
CanGc::note(),
);
navigator.set_gamepad(selected_index as usize, &gamepad, CanGc::note());
}
}),
&self.task_canceller(TaskSourceName::Gamepad),
)
if let Some(window) = global.downcast::<Window>() {
let navigator = window.Navigator();
let selected_index = navigator.select_gamepad_index();
let gamepad = Gamepad::new(
&global,
selected_index,
name,
"standard".into(),
axis_bounds,
button_bounds,
supported_haptic_effects,
false,
CanGc::note(),
);
navigator.set_gamepad(selected_index as usize, &gamepad, CanGc::note());
}
}))
.expect("Failed to queue gamepad connected task.");
}
@ -3194,21 +3133,18 @@ impl GlobalScope {
let this = Trusted::new(self);
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
task!(gamepad_disconnected: move || {
let global = this.root();
if let Some(window) = global.downcast::<Window>() {
let navigator = window.Navigator();
if let Some(gamepad) = navigator.get_gamepad(index) {
if window.Document().is_fully_active() {
gamepad.update_connected(false, gamepad.exposed(), CanGc::note());
navigator.remove_gamepad(index);
}
.queue(task!(gamepad_disconnected: move || {
let global = this.root();
if let Some(window) = global.downcast::<Window>() {
let navigator = window.Navigator();
if let Some(gamepad) = navigator.get_gamepad(index) {
if window.Document().is_fully_active() {
gamepad.update_connected(false, gamepad.exposed(), CanGc::note());
navigator.remove_gamepad(index);
}
}
}),
&self.task_canceller(TaskSourceName::Gamepad),
)
}
}))
.expect("Failed to queue gamepad disconnected task.");
}
@ -3217,9 +3153,7 @@ impl GlobalScope {
let this = Trusted::new(self);
// <https://w3c.github.io/gamepad/#dfn-update-gamepad-state>
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
self.task_manager().gamepad_task_source().queue(
task!(update_gamepad_state: move || {
let global = this.root();
if let Some(window) = global.downcast::<Window>() {
@ -3245,13 +3179,11 @@ impl GlobalScope {
gamepad.update_timestamp(*current_time);
let new_gamepad = Trusted::new(&**gamepad);
if window.Document().is_fully_active() {
window.task_manager().gamepad_task_source().queue_with_canceller(
global.task_manager().gamepad_task_source().queue(
task!(update_gamepad_connect: move || {
let gamepad = new_gamepad.root();
gamepad.notify_event(GamepadEventType::Connected, CanGc::note());
}),
&window.upcast::<GlobalScope>()
.task_canceller(TaskSourceName::Gamepad),
})
)
.expect("Failed to queue update gamepad connect task.");
}
@ -3259,8 +3191,7 @@ impl GlobalScope {
}
}
}
}),
&self.task_canceller(TaskSourceName::Gamepad),
})
)
.expect("Failed to queue update gamepad state task.");
}
@ -3333,11 +3264,9 @@ impl GlobalScope {
task_source: TaskSource,
cancellation_sender: Option<ipc::IpcReceiver<()>>,
) {
let canceller = Some(self.task_canceller(TaskSourceName::Networking));
let network_listener = NetworkListener {
context,
task_source,
canceller,
};
self.fetch_with_network_listener(request_builder, network_listener, cancellation_sender);
}