script: Eliminate code duplication in the task queue (#34798)

Instead of creating a type for each `TaskSource` variety have each `TaskSource`
hold the same kind of sender (this was inconsistent before, but each
sender was effectively the same trait object), a pipeline, and a
`TaskSourceName`. This elminates the need to reimplement the same
queuing code for every task source.

In addition, have workers hold their own `TaskManager`. This allows just
exposing the manager on the `GlobalScope`. Currently the `TaskCanceller`
is different, but this will also be eliminated in a followup change.

This is a the first step toward having a shared set of `Sender`s on
`GlobalScope`.

Signed-off-by: Martin Robinson <mrobinson@igalia.com>
This commit is contained in:
Martin Robinson 2025-01-01 14:50:52 +01:00 committed by GitHub
parent deb819f233
commit 77cfca65c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
67 changed files with 432 additions and 1200 deletions

View file

@ -138,15 +138,7 @@ use crate::script_runtime::{
use crate::script_thread::{with_script_thread, ScriptThread};
use crate::security_manager::CSPViolationReporter;
use crate::task::TaskCanceller;
use crate::task_source::dom_manipulation::DOMManipulationTaskSource;
use crate::task_source::file_reading::FileReadingTaskSource;
use crate::task_source::gamepad::GamepadTaskSource;
use crate::task_source::networking::NetworkingTaskSource;
use crate::task_source::performance_timeline::PerformanceTimelineTaskSource;
use crate::task_source::port_message::PortMessageQueue;
use crate::task_source::remote_event::RemoteEventTaskSource;
use crate::task_source::timer::TimerTaskSource;
use crate::task_source::websocket::WebsocketTaskSource;
use crate::task_manager::TaskManager;
use crate::task_source::{TaskSource, TaskSourceName};
use crate::timers::{
IsInterval, OneshotTimerCallback, OneshotTimerHandle, OneshotTimers, TimerCallback,
@ -383,14 +375,14 @@ pub struct GlobalScope {
/// A wrapper for glue-code between the ipc router and the event-loop.
struct MessageListener {
canceller: TaskCanceller,
task_source: PortMessageQueue,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
/// A wrapper for broadcasts coming in over IPC, and the event-loop.
struct BroadcastListener {
canceller: TaskCanceller,
task_source: DOMManipulationTaskSource,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
@ -398,7 +390,7 @@ struct BroadcastListener {
#[derive(Clone)]
pub(crate) struct TimerListener {
canceller: TaskCanceller,
task_source: TimerTaskSource,
task_source: TaskSource,
context: Trusted<GlobalScope>,
}
@ -410,7 +402,7 @@ struct FileListener {
/// - Some(Empty) => Some(Receiving) => None
/// - Some(Empty) => None
state: Option<FileListenerState>,
task_source: FileReadingTaskSource,
task_source: TaskSource,
task_canceller: TaskCanceller,
}
@ -860,7 +852,7 @@ impl GlobalScope {
fn timers(&self) -> &OneshotTimers {
self.timers.get_or_init(|| {
let (task_source, canceller) = (
self.timer_task_source(),
self.task_manager().timer_task_source(),
self.task_canceller(TaskSourceName::Timer),
);
OneshotTimers::new(
@ -1102,7 +1094,7 @@ impl GlobalScope {
for task in message_buffer {
let port_id = *port_id;
let this = Trusted::new(self);
let _ = self.port_message_queue().queue(
let _ = self.task_manager().port_message_queue().queue(
task!(process_pending_port_messages: move || {
let target_global = this.root();
target_global.route_task_to_port(port_id, task, CanGc::note());
@ -1156,7 +1148,7 @@ impl GlobalScope {
if let Some(entangled_id) = entangled_port {
// Step 7
let this = Trusted::new(self);
let _ = self.port_message_queue().queue(
let _ = self.task_manager().port_message_queue().queue(
task!(post_message: move || {
let global = this.root();
// Note: we do this in a task, as this will ensure the global and constellation
@ -1253,7 +1245,7 @@ impl GlobalScope {
// to fire the message event
let channel = Trusted::new(&*channel);
let global = Trusted::new(self);
let _ = self.dom_manipulation_task_source().queue(
let _ = self.task_manager().dom_manipulation_task_source().queue(
task!(process_pending_port_messages: move || {
let destination = channel.root();
let global = global.root();
@ -1447,7 +1439,7 @@ impl GlobalScope {
ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self);
let (task_source, canceller) = (
self.dom_manipulation_task_source(),
self.task_manager().dom_manipulation_task_source(),
self.task_canceller(TaskSourceName::DOMManipulation),
);
let listener = BroadcastListener {
@ -1500,7 +1492,7 @@ impl GlobalScope {
ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self);
let (task_source, canceller) = (
self.port_message_queue(),
self.task_manager().port_message_queue(),
self.task_canceller(TaskSourceName::PortMessage),
);
let listener = MessageListener {
@ -1543,7 +1535,7 @@ impl GlobalScope {
// Queue a task to complete the transfer,
// unless the port is re-transferred in the current task.
let this = Trusted::new(self);
let _ = self.port_message_queue().queue(
let _ = self.task_manager().port_message_queue().queue(
task!(process_pending_port_messages: move || {
let target_global = this.root();
target_global.maybe_add_pending_ports();
@ -2026,7 +2018,7 @@ impl GlobalScope {
let trusted_stream = Trusted::new(&*stream.clone());
let task_canceller = self.task_canceller(TaskSourceName::FileReading);
let task_source = self.file_reading_task_source();
let task_source = self.task_manager().file_reading_task_source();
let mut file_listener = FileListener {
state: Some(FileListenerState::Empty(FileListenerTarget::Stream(
@ -2051,7 +2043,7 @@ impl GlobalScope {
let trusted_promise = TrustedPromise::new(promise);
let task_canceller = self.task_canceller(TaskSourceName::FileReading);
let task_source = self.file_reading_task_source();
let task_source = self.task_manager().file_reading_task_source();
let mut file_listener = FileListener {
state: Some(FileListenerState::Empty(FileListenerTarget::Promise(
@ -2595,72 +2587,13 @@ impl GlobalScope {
unreachable!();
}
/// `TaskSource` to send messages to the gamepad task source of
/// this global scope.
/// <https://w3c.github.io/gamepad/#dfn-gamepad-task-source>
pub fn gamepad_task_source(&self) -> GamepadTaskSource {
/// The [`TaskManager`] used to schedule tasks for this [`GlobalScope`].
pub fn task_manager(&self) -> &TaskManager {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().gamepad_task_source();
}
unreachable!();
}
/// `TaskSource` to send messages to the networking task source of
/// this global scope.
pub fn networking_task_source(&self) -> NetworkingTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().networking_task_source();
return window.task_manager();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.networking_task_source();
}
unreachable!();
}
/// `TaskSource` to send messages to the port message queue of
/// this global scope.
pub fn port_message_queue(&self) -> PortMessageQueue {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().port_message_queue();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.port_message_queue();
}
unreachable!();
}
/// `TaskSource` to send messages to the timer queue of
/// this global scope.
pub fn timer_task_source(&self) -> TimerTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().timer_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.timer_task_source();
}
unreachable!();
}
/// `TaskSource` to send messages to the remote-event task source of
/// this global scope.
pub fn remote_event_task_source(&self) -> RemoteEventTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().remote_event_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.remote_event_task_source();
}
unreachable!();
}
/// `TaskSource` to send messages to the websocket task source of
/// this global scope.
pub fn websocket_task_source(&self) -> WebsocketTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().websocket_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.websocket_task_source();
return worker.task_manager();
}
unreachable!();
}
@ -2842,7 +2775,8 @@ impl GlobalScope {
scripted_caller.line,
scripted_caller.col,
);
self.dom_manipulation_task_source()
self.task_manager()
.dom_manipulation_task_source()
.queue(task, self)
.unwrap();
}
@ -3024,28 +2958,6 @@ impl GlobalScope {
unreachable!();
}
pub fn dom_manipulation_task_source(&self) -> DOMManipulationTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().dom_manipulation_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.dom_manipulation_task_source();
}
unreachable!();
}
/// Channel to send messages to the file reading task source of
/// this of this global scope.
pub fn file_reading_task_source(&self) -> FileReadingTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().file_reading_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.file_reading_task_source();
}
unreachable!();
}
pub fn runtime_handle(&self) -> ParentRuntime {
if self.is::<Window>() {
ScriptThread::runtime_handle()
@ -3096,18 +3008,6 @@ impl GlobalScope {
unreachable!();
}
/// Channel to send messages to the performance timeline task source
/// of this global scope.
pub fn performance_timeline_task_source(&self) -> PerformanceTimelineTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.task_manager().performance_timeline_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.performance_timeline_task_source();
}
unreachable!();
}
/// <https://w3c.github.io/performance-timeline/#supportedentrytypes-attribute>
pub fn supported_performance_entry_types(&self, cx: SafeJSContext, retval: MutableHandleValue) {
self.frozen_supported_performance_entry_types.get_or_init(
@ -3261,7 +3161,8 @@ impl GlobalScope {
// TODO: 2. If document is not null and is not allowed to use the "gamepad" permission,
// then abort these steps.
let this = Trusted::new(self);
self.gamepad_task_source()
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
task!(gamepad_connected: move || {
let global = this.root();
@ -3291,7 +3192,8 @@ impl GlobalScope {
/// <https://www.w3.org/TR/gamepad/#dfn-gamepaddisconnected>
pub fn handle_gamepad_disconnect(&self, index: usize) {
let this = Trusted::new(self);
self.gamepad_task_source()
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
task!(gamepad_disconnected: move || {
let global = this.root();
@ -3315,7 +3217,8 @@ impl GlobalScope {
let this = Trusted::new(self);
// <https://w3c.github.io/gamepad/#dfn-update-gamepad-state>
self.gamepad_task_source()
self.task_manager()
.gamepad_task_source()
.queue_with_canceller(
task!(update_gamepad_state: move || {
let global = this.root();
@ -3427,7 +3330,7 @@ impl GlobalScope {
&self,
request_builder: RequestBuilder,
context: Arc<Mutex<Listener>>,
task_source: NetworkingTaskSource,
task_source: TaskSource,
cancellation_sender: Option<ipc::IpcReceiver<()>>,
) {
let canceller = Some(self.task_canceller(TaskSourceName::Networking));