script: Unsilence all main thread TaskQueue errors (#34849)

No longer hide errors while queueing tasks on the main thread. This
requires creating two types of `TaskSource`s: one for the main thread
and one that can be sent to other threads. This makes queueing a bit
more efficient on the main thread and more importantly, no longer hides
task queue errors.

Fixes #25688.

Signed-off-by: Martin Robinson <mrobinson@igalia.com>
Co-authored-by: Mukilan Thiyagarajan <mukilan@igalia.com>
This commit is contained in:
Martin Robinson 2025-01-07 04:36:39 +01:00 committed by GitHub
parent d252a631d2
commit fe8a22b72c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 628 additions and 571 deletions

View file

@ -40,7 +40,7 @@ use crate::dom::readablestream::{get_read_promise_bytes, get_read_promise_done,
use crate::dom::urlsearchparams::URLSearchParams; use crate::dom::urlsearchparams::URLSearchParams;
use crate::realms::{enter_realm, AlreadyInRealm, InRealm}; use crate::realms::{enter_realm, AlreadyInRealm, InRealm};
use crate::script_runtime::{CanGc, JSContext}; use crate::script_runtime::{CanGc, JSContext};
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
/// The Dom object, or ReadableStream, that is the source of a body. /// The Dom object, or ReadableStream, that is the source of a body.
/// <https://fetch.spec.whatwg.org/#concept-body-source> /// <https://fetch.spec.whatwg.org/#concept-body-source>
@ -70,7 +70,7 @@ enum StopReading {
#[derive(Clone)] #[derive(Clone)]
struct TransmitBodyConnectHandler { struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>, stream: Trusted<ReadableStream>,
task_source: TaskSource, task_source: SendableTaskSource,
bytes_sender: Option<IpcSender<BodyChunkResponse>>, bytes_sender: Option<IpcSender<BodyChunkResponse>>,
control_sender: IpcSender<BodyChunkRequest>, control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>, in_memory: Option<Vec<u8>>,
@ -81,7 +81,7 @@ struct TransmitBodyConnectHandler {
impl TransmitBodyConnectHandler { impl TransmitBodyConnectHandler {
pub fn new( pub fn new(
stream: Trusted<ReadableStream>, stream: Trusted<ReadableStream>,
task_source: TaskSource, task_source: SendableTaskSource,
control_sender: IpcSender<BodyChunkRequest>, control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>, in_memory: Option<Vec<u8>>,
source: BodySource, source: BodySource,
@ -174,8 +174,7 @@ impl TransmitBodyConnectHandler {
// If we're using an actual ReadableStream, acquire a reader for it. // If we're using an actual ReadableStream, acquire a reader for it.
if self.source == BodySource::Null { if self.source == BodySource::Null {
let stream = self.stream.clone(); let stream = self.stream.clone();
let _ = self self.task_source
.task_source
.queue(task!(start_reading_request_body_stream: move || { .queue(task!(start_reading_request_body_stream: move || {
// Step 1, Let body be requests body. // Step 1, Let body be requests body.
let rooted_stream = stream.root(); let rooted_stream = stream.root();
@ -231,7 +230,7 @@ impl TransmitBodyConnectHandler {
return; return;
} }
let _ = self.task_source.queue( self.task_source.queue(
task!(setup_native_body_promise_handler: move || { task!(setup_native_body_promise_handler: move || {
let rooted_stream = stream.root(); let rooted_stream = stream.root();
let global = rooted_stream.global(); let global = rooted_stream.global();
@ -384,7 +383,7 @@ impl ExtractedBody {
let mut body_handler = TransmitBodyConnectHandler::new( let mut body_handler = TransmitBodyConnectHandler::new(
trusted_stream, trusted_stream,
task_source, task_source.into(),
chunk_request_sender.clone(), chunk_request_sender.clone(),
in_memory, in_memory,
source, source,

View file

@ -110,14 +110,17 @@ impl AnalyserNode {
) -> Fallible<DomRoot<AnalyserNode>> { ) -> Fallible<DomRoot<AnalyserNode>> {
let (node, recv) = AnalyserNode::new_inherited(window, context, options)?; let (node, recv) = AnalyserNode::new_inherited(window, context, options)?;
let object = reflect_dom_object_with_proto(Box::new(node), window, proto, can_gc); let object = reflect_dom_object_with_proto(Box::new(node), window, proto, can_gc);
let task_source = window.task_manager().dom_manipulation_task_source(); let task_source = window
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let this = Trusted::new(&*object); let this = Trusted::new(&*object);
ROUTER.add_typed_route( ROUTER.add_typed_route(
recv, recv,
Box::new(move |block| { Box::new(move |block| {
let this = this.clone(); let this = this.clone();
let _ = task_source.queue(task!(append_analysis_block: move || { task_source.queue(task!(append_analysis_block: move || {
let this = this.root(); let this = this.root();
this.push_block(block.unwrap()) this.push_block(block.unwrap())
})); }));

View file

@ -157,13 +157,12 @@ impl AudioContextMethods<crate::DomTypeHolder> for AudioContext {
} }
// Steps 4 and 5. // Steps 4 and 5.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
match self.context.audio_context_impl().lock().unwrap().suspend() { match self.context.audio_context_impl().lock().unwrap().suspend() {
Ok(_) => { Ok(_) => {
let base_context = Trusted::new(&self.context); let base_context = Trusted::new(&self.context);
let context = Trusted::new(self); let context = Trusted::new(self);
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(suspend_ok: move || { task!(suspend_ok: move || {
let base_context = base_context.root(); let base_context = base_context.root();
let context = context.root(); let context = context.root();
@ -182,10 +181,13 @@ impl AudioContextMethods<crate::DomTypeHolder> for AudioContext {
Err(_) => { Err(_) => {
// The spec does not define the error case and `suspend` should // The spec does not define the error case and `suspend` should
// never fail, but we handle the case here for completion. // never fail, but we handle the case here for completion.
let _ = task_source.queue(task!(suspend_error: move || { self.global()
let promise = trusted_promise.root(); .task_manager()
promise.reject_error(Error::Type("Something went wrong".to_owned())); .dom_manipulation_task_source()
})); .queue(task!(suspend_error: move || {
let promise = trusted_promise.root();
promise.reject_error(Error::Type("Something went wrong".to_owned()));
}));
}, },
}; };
@ -211,13 +213,12 @@ impl AudioContextMethods<crate::DomTypeHolder> for AudioContext {
} }
// Steps 4 and 5. // Steps 4 and 5.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
match self.context.audio_context_impl().lock().unwrap().close() { match self.context.audio_context_impl().lock().unwrap().close() {
Ok(_) => { Ok(_) => {
let base_context = Trusted::new(&self.context); let base_context = Trusted::new(&self.context);
let context = Trusted::new(self); let context = Trusted::new(self);
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(suspend_ok: move || { task!(suspend_ok: move || {
let base_context = base_context.root(); let base_context = base_context.root();
let context = context.root(); let context = context.root();
@ -236,10 +237,13 @@ impl AudioContextMethods<crate::DomTypeHolder> for AudioContext {
Err(_) => { Err(_) => {
// The spec does not define the error case and `suspend` should // The spec does not define the error case and `suspend` should
// never fail, but we handle the case here for completion. // never fail, but we handle the case here for completion.
let _ = task_source.queue(task!(suspend_error: move || { self.global()
let promise = trusted_promise.root(); .task_manager()
promise.reject_error(Error::Type("Something went wrong".to_owned())); .dom_manipulation_task_source()
})); .queue(task!(suspend_error: move || {
let promise = trusted_promise.root();
promise.reject_error(Error::Type("Something went wrong".to_owned()));
}));
}, },
}; };

View file

@ -71,9 +71,13 @@ impl AudioScheduledSourceNodeMethods<crate::DomTypeHolder> for AudioScheduledSou
} }
let this = Trusted::new(self); let this = Trusted::new(self);
let task_source = self.global().task_manager().dom_manipulation_task_source(); let task_source = self
.global()
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let callback = OnEndedCallback::new(move || { let callback = OnEndedCallback::new(move || {
let _ = task_source.queue(task!(ended: move || { task_source.queue(task!(ended: move || {
let this = this.root(); let this = this.root();
this.global().task_manager().dom_manipulation_task_source().queue_simple_event( this.global().task_manager().dom_manipulation_task_source().queue_simple_event(
this.upcast(), this.upcast(),

View file

@ -90,7 +90,7 @@ impl AudioTrackList {
let global = &self.global(); let global = &self.global();
let this = Trusted::new(self); let this = Trusted::new(self);
let task_source = global.task_manager().media_element_task_source(); let task_source = global.task_manager().media_element_task_source();
let _ = task_source.queue(task!(media_track_change: move || { task_source.queue(task!(media_track_change: move || {
let this = this.root(); let this = this.root();
this.upcast::<EventTarget>().fire_event(atom!("change"), CanGc::note()); this.upcast::<EventTarget>().fire_event(atom!("change"), CanGc::note());
})); }));

View file

@ -229,14 +229,13 @@ impl BaseAudioContext {
} }
pub fn resume(&self) { pub fn resume(&self) {
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
// Set the rendering thread state to 'running' and start // Set the rendering thread state to 'running' and start
// rendering the audio graph. // rendering the audio graph.
match self.audio_context_impl.lock().unwrap().resume() { match self.audio_context_impl.lock().unwrap().resume() {
Ok(()) => { Ok(()) => {
self.take_pending_resume_promises(Ok(())); self.take_pending_resume_promises(Ok(()));
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(resume_success: move || { task!(resume_success: move || {
let this = this.root(); let this = this.root();
this.fulfill_in_flight_resume_promises(|| { this.fulfill_in_flight_resume_promises(|| {
@ -255,9 +254,12 @@ impl BaseAudioContext {
self.take_pending_resume_promises(Err(Error::Type( self.take_pending_resume_promises(Err(Error::Type(
"Something went wrong".to_owned(), "Something went wrong".to_owned(),
))); )));
let _ = task_source.queue(task!(resume_error: move || { self.global()
this.root().fulfill_in_flight_resume_promises(|| {}) .task_manager()
})); .dom_manipulation_task_source()
.queue(task!(resume_error: move || {
this.root().fulfill_in_flight_resume_promises(|| {})
}));
}, },
} }
} }
@ -501,7 +503,11 @@ impl BaseAudioContextMethods<crate::DomTypeHolder> for BaseAudioContext {
let channels = Arc::new(Mutex::new(HashMap::new())); let channels = Arc::new(Mutex::new(HashMap::new()));
let this = Trusted::new(self); let this = Trusted::new(self);
let this_ = this.clone(); let this_ = this.clone();
let task_source = self.global().task_manager().dom_manipulation_task_source(); let task_source = self
.global()
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let task_source_clone = task_source.clone(); let task_source_clone = task_source.clone();
let callbacks = AudioDecoderCallbacks::new() let callbacks = AudioDecoderCallbacks::new()
.ready(move |channel_count| { .ready(move |channel_count| {
@ -523,7 +529,7 @@ impl BaseAudioContextMethods<crate::DomTypeHolder> for BaseAudioContext {
decoded_audio[channel].extend_from_slice((*buffer).as_ref()); decoded_audio[channel].extend_from_slice((*buffer).as_ref());
}) })
.eos(move || { .eos(move || {
let _ = task_source.queue(task!(audio_decode_eos: move || { task_source.queue(task!(audio_decode_eos: move || {
let this = this.root(); let this = this.root();
let decoded_audio = decoded_audio__.lock().unwrap(); let decoded_audio = decoded_audio__.lock().unwrap();
let length = if decoded_audio.len() >= 1 { let length = if decoded_audio.len() >= 1 {
@ -548,7 +554,7 @@ impl BaseAudioContextMethods<crate::DomTypeHolder> for BaseAudioContext {
})); }));
}) })
.error(move |error| { .error(move |error| {
let _ = task_source_clone.queue(task!(audio_decode_eos: move || { task_source_clone.queue(task!(audio_decode_eos: move || {
let this = this_.root(); let this = this_.root();
let mut resolvers = this.decode_resolvers.borrow_mut(); let mut resolvers = this.decode_resolvers.borrow_mut();
assert!(resolvers.contains_key(&uuid)); assert!(resolvers.contains_key(&uuid));

View file

@ -244,7 +244,11 @@ pub fn response_async<T: AsyncBluetoothListener + DomObject + 'static>(
receiver: &T, receiver: &T,
) -> IpcSender<BluetoothResponseResult> { ) -> IpcSender<BluetoothResponseResult> {
let (action_sender, action_receiver) = ipc::channel().unwrap(); let (action_sender, action_receiver) = ipc::channel().unwrap();
let task_source = receiver.global().task_manager().networking_task_source(); let task_source = receiver
.global()
.task_manager()
.networking_task_source()
.to_sendable();
let context = Arc::new(Mutex::new(BluetoothContext { let context = Arc::new(Mutex::new(BluetoothContext {
promise: Some(TrustedPromise::new(promise.clone())), promise: Some(TrustedPromise::new(promise.clone())),
receiver: Trusted::new(receiver), receiver: Trusted::new(receiver),
@ -272,10 +276,7 @@ pub fn response_async<T: AsyncBluetoothListener + DomObject + 'static>(
action: message.unwrap(), action: message.unwrap(),
}; };
let result = task_source.queue_unconditionally(task); task_source.queue_unconditionally(task);
if let Err(err) = result {
warn!("failed to deliver network data: {:?}", err);
}
}), }),
); );
action_sender action_sender

View file

@ -59,7 +59,7 @@ use crate::script_runtime::{
ThreadSafeJSContext, ThreadSafeJSContext,
}; };
use crate::task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue}; use crate::task_queue::{QueuedTask, QueuedTaskConversion, TaskQueue};
use crate::task_source::{TaskSource, TaskSourceName}; use crate::task_source::{SendableTaskSource, TaskSourceName};
/// Set the `worker` field of a related DedicatedWorkerGlobalScope object to a particular /// Set the `worker` field of a related DedicatedWorkerGlobalScope object to a particular
/// value for the duration of this object's lifetime. This ensures that the related Worker /// value for the duration of this object's lifetime. This ensures that the related Worker
@ -378,7 +378,7 @@ impl DedicatedWorkerGlobalScope {
.origin(origin); .origin(origin);
let runtime = unsafe { let runtime = unsafe {
let task_source = TaskSource { let task_source = SendableTaskSource {
sender: Box::new(WorkerThreadWorkerChan { sender: Box::new(WorkerThreadWorkerChan {
sender: own_sender.clone(), sender: own_sender.clone(),
worker: worker.clone(), worker: worker.clone(),

View file

@ -724,7 +724,6 @@ impl Document {
event.set_trusted(true); event.set_trusted(true);
window.dispatch_event_with_target_override(event, CanGc::note()); window.dispatch_event_with_target_override(event, CanGc::note());
})) }))
.unwrap();
} }
pub fn origin(&self) -> &MutableOrigin { pub fn origin(&self) -> &MutableOrigin {
@ -2128,7 +2127,7 @@ impl Document {
) { ) {
let callback = NetworkListener { let callback = NetworkListener {
context: std::sync::Arc::new(Mutex::new(listener)), context: std::sync::Arc::new(Mutex::new(listener)),
task_source: self.window().task_manager().networking_task_source(), task_source: self.window().task_manager().networking_task_source().into(),
} }
.into_callback(); .into_callback();
self.loader_mut() self.loader_mut()
@ -2143,7 +2142,7 @@ impl Document {
) { ) {
let callback = NetworkListener { let callback = NetworkListener {
context: std::sync::Arc::new(Mutex::new(listener)), context: std::sync::Arc::new(Mutex::new(listener)),
task_source: self.window().task_manager().networking_task_source(), task_source: self.window().task_manager().networking_task_source().into(),
} }
.into_callback(); .into_callback();
self.loader_mut() self.loader_mut()
@ -2409,8 +2408,7 @@ impl Document {
if let Some(fragment) = document.url().fragment() { if let Some(fragment) = document.url().fragment() {
document.check_and_scroll_fragment(fragment, CanGc::note()); document.check_and_scroll_fragment(fragment, CanGc::note());
} }
})) }));
.unwrap();
// Step 8. // Step 8.
let document = Trusted::new(self); let document = Trusted::new(self);
@ -2439,8 +2437,7 @@ impl Document {
event.set_trusted(true); event.set_trusted(true);
window.dispatch_event_with_target_override(event, CanGc::note()); window.dispatch_event_with_target_override(event, CanGc::note());
})) }));
.unwrap();
} }
// Step 9. // Step 9.
@ -2489,8 +2486,7 @@ impl Document {
// Note: this will, among others, result in the "iframe-load-event-steps" being run. // Note: this will, among others, result in the "iframe-load-event-steps" being run.
// https://html.spec.whatwg.org/multipage/#iframe-load-event-steps // https://html.spec.whatwg.org/multipage/#iframe-load-event-steps
document.notify_constellation_load(); document.notify_constellation_load();
})) }));
.unwrap();
} }
} }
@ -2645,8 +2641,7 @@ impl Document {
document.upcast::<EventTarget>().fire_bubbling_event(atom!("DOMContentLoaded"), CanGc::note()); document.upcast::<EventTarget>().fire_bubbling_event(atom!("DOMContentLoaded"), CanGc::note());
update_with_current_instant(&document.dom_content_loaded_event_end); update_with_current_instant(&document.dom_content_loaded_event_end);
}) })
) );
.unwrap();
// html parsing has finished - set dom content loaded // html parsing has finished - set dom content loaded
self.interactive_time self.interactive_time

View file

@ -111,8 +111,7 @@ impl EventSourceContext {
} }
let global = event_source.global(); let global = event_source.global();
let event_source = self.event_source.clone(); let event_source = self.event_source.clone();
// FIXME(nox): Why are errors silenced here? global.task_manager().remote_event_task_source().queue(
let _ = global.task_manager().remote_event_task_source().queue(
task!(announce_the_event_source_connection: move || { task!(announce_the_event_source_connection: move || {
let event_source = event_source.root(); let event_source = event_source.root();
if event_source.ready_state.get() != ReadyState::Closed { if event_source.ready_state.get() != ReadyState::Closed {
@ -143,8 +142,7 @@ impl EventSourceContext {
let trusted_event_source = self.event_source.clone(); let trusted_event_source = self.event_source.clone();
let action_sender = self.action_sender.clone(); let action_sender = self.action_sender.clone();
let global = event_source.global(); let global = event_source.global();
// FIXME(nox): Why are errors silenced here? global.task_manager().remote_event_task_source().queue(
let _ = global.task_manager().remote_event_task_source().queue(
task!(reestablish_the_event_source_onnection: move || { task!(reestablish_the_event_source_onnection: move || {
let event_source = trusted_event_source.root(); let event_source = trusted_event_source.root();
@ -172,8 +170,7 @@ impl EventSourceContext {
action_sender, action_sender,
} }
); );
// FIXME(nox): Why are errors silenced here? event_source.global().schedule_callback(callback, duration);
let _ = event_source.global().schedule_callback(callback, duration);
}), }),
); );
} }
@ -255,8 +252,7 @@ impl EventSourceContext {
let global = event_source.global(); let global = event_source.global();
let event_source = self.event_source.clone(); let event_source = self.event_source.clone();
let event = Trusted::new(&*event); let event = Trusted::new(&*event);
// FIXME(nox): Why are errors silenced here? global.task_manager().remote_event_task_source().queue(
let _ = global.task_manager().remote_event_task_source().queue(
task!(dispatch_the_event_source_event: move || { task!(dispatch_the_event_source_event: move || {
let event_source = event_source.root(); let event_source = event_source.root();
if event_source.ready_state.get() != ReadyState::Closed { if event_source.ready_state.get() != ReadyState::Closed {
@ -495,8 +491,7 @@ impl EventSource {
pub fn fail_the_connection(&self) { pub fn fail_the_connection(&self) {
let global = self.global(); let global = self.global();
let event_source = Trusted::new(self); let event_source = Trusted::new(self);
// FIXME(nox): Why are errors silenced here? global.task_manager().remote_event_task_source().queue(
let _ = global.task_manager().remote_event_task_source().queue(
task!(fail_the_event_source_connection: move || { task!(fail_the_event_source_connection: move || {
let event_source = event_source.root(); let event_source = event_source.root();
if event_source.ready_state.get() != ReadyState::Closed { if event_source.ready_state.get() != ReadyState::Closed {
@ -600,7 +595,7 @@ impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
}; };
let listener = NetworkListener { let listener = NetworkListener {
context: Arc::new(Mutex::new(context)), context: Arc::new(Mutex::new(context)),
task_source: global.task_manager().networking_task_source(), task_source: global.task_manager().networking_task_source().into(),
}; };
ROUTER.add_typed_route( ROUTER.add_typed_route(
action_receiver, action_receiver,

View file

@ -502,27 +502,22 @@ impl FileReader {
let filereader = Trusted::new(self); let filereader = Trusted::new(self);
let global = self.global(); let global = self.global();
let task_source = global.task_manager().file_reading_task_source(); let task_manager = global.task_manager();
let task_source = task_manager.file_reading_task_source();
// Queue tasks as appropriate. // Queue tasks as appropriate.
task_source task_source.queue(FileReadingTask::ProcessRead(filereader.clone(), gen_id));
.queue(FileReadingTask::ProcessRead(filereader.clone(), gen_id))
.unwrap();
if !blob_contents.is_empty() { if !blob_contents.is_empty() {
task_source task_source.queue(FileReadingTask::ProcessReadData(filereader.clone(), gen_id));
.queue(FileReadingTask::ProcessReadData(filereader.clone(), gen_id))
.unwrap();
} }
task_source task_source.queue(FileReadingTask::ProcessReadEOF(
.queue(FileReadingTask::ProcessReadEOF( filereader,
filereader, gen_id,
gen_id, load_data,
load_data, blob_contents,
blob_contents, ));
))
.unwrap();
Ok(()) Ok(())
} }

View file

@ -27,18 +27,17 @@ use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise; use crate::dom::promise::Promise;
use crate::realms::InRealm; use crate::realms::InRealm;
use crate::script_runtime::{CanGc, JSContext}; use crate::script_runtime::{CanGc, JSContext};
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
struct HapticEffectListener { struct HapticEffectListener {
task_source: TaskSource, task_source: SendableTaskSource,
context: Trusted<GamepadHapticActuator>, context: Trusted<GamepadHapticActuator>,
} }
impl HapticEffectListener { impl HapticEffectListener {
fn handle_stopped(&self, stopped_successfully: bool) { fn handle_stopped(&self, stopped_successfully: bool) {
let context = self.context.clone(); let context = self.context.clone();
let _ = self self.task_source
.task_source
.queue(task!(handle_haptic_effect_stopped: move || { .queue(task!(handle_haptic_effect_stopped: move || {
let actuator = context.root(); let actuator = context.root();
actuator.handle_haptic_effect_stopped(stopped_successfully); actuator.handle_haptic_effect_stopped(stopped_successfully);
@ -47,8 +46,7 @@ impl HapticEffectListener {
fn handle_completed(&self, completed_successfully: bool) { fn handle_completed(&self, completed_successfully: bool) {
let context = self.context.clone(); let context = self.context.clone();
let _ = self self.task_source
.task_source
.queue(task!(handle_haptic_effect_completed: move || { .queue(task!(handle_haptic_effect_completed: move || {
let actuator = context.root(); let actuator = context.root();
actuator.handle_haptic_effect_completed(completed_successfully); actuator.handle_haptic_effect_completed(completed_successfully);
@ -194,7 +192,7 @@ impl GamepadHapticActuatorMethods<crate::DomTypeHolder> for GamepadHapticActuato
if let Some(promise) = self.playing_effect_promise.borrow_mut().take() { if let Some(promise) = self.playing_effect_promise.borrow_mut().take() {
let trusted_promise = TrustedPromise::new(promise); let trusted_promise = TrustedPromise::new(promise);
let _ = self.global().task_manager().gamepad_task_source().queue( self.global().task_manager().gamepad_task_source().queue(
task!(preempt_promise: move || { task!(preempt_promise: move || {
let promise = trusted_promise.root(); let promise = trusted_promise.root();
let message = DOMString::from("preempted"); let message = DOMString::from("preempted");
@ -215,7 +213,7 @@ impl GamepadHapticActuatorMethods<crate::DomTypeHolder> for GamepadHapticActuato
let (effect_complete_sender, effect_complete_receiver) = let (effect_complete_sender, effect_complete_receiver) =
ipc::channel().expect("ipc channel failure"); ipc::channel().expect("ipc channel failure");
let listener = HapticEffectListener { let listener = HapticEffectListener {
task_source: self.global().task_manager().gamepad_task_source(), task_source: self.global().task_manager().gamepad_task_source().into(),
context, context,
}; };
@ -261,7 +259,7 @@ impl GamepadHapticActuatorMethods<crate::DomTypeHolder> for GamepadHapticActuato
if let Some(promise) = self.playing_effect_promise.borrow_mut().take() { if let Some(promise) = self.playing_effect_promise.borrow_mut().take() {
let trusted_promise = TrustedPromise::new(promise); let trusted_promise = TrustedPromise::new(promise);
let _ = self.global().task_manager().gamepad_task_source().queue( self.global().task_manager().gamepad_task_source().queue(
task!(preempt_promise: move || { task!(preempt_promise: move || {
let promise = trusted_promise.root(); let promise = trusted_promise.root();
let message = DOMString::from("preempted"); let message = DOMString::from("preempted");
@ -278,7 +276,7 @@ impl GamepadHapticActuatorMethods<crate::DomTypeHolder> for GamepadHapticActuato
let (effect_stop_sender, effect_stop_receiver) = let (effect_stop_sender, effect_stop_receiver) =
ipc::channel().expect("ipc channel failure"); ipc::channel().expect("ipc channel failure");
let listener = HapticEffectListener { let listener = HapticEffectListener {
task_source: self.global().task_manager().gamepad_task_source(), task_source: self.global().task_manager().gamepad_task_source().into(),
context, context,
}; };
@ -325,7 +323,7 @@ impl GamepadHapticActuator {
let trusted_promise = TrustedPromise::new(promise); let trusted_promise = TrustedPromise::new(promise);
let sequence_id = self.sequence_id.get(); let sequence_id = self.sequence_id.get();
let reset_sequence_id = self.reset_sequence_id.get(); let reset_sequence_id = self.reset_sequence_id.get();
let _ = self.global().task_manager().gamepad_task_source().queue( self.global().task_manager().gamepad_task_source().queue(
task!(complete_promise: move || { task!(complete_promise: move || {
if sequence_id != reset_sequence_id { if sequence_id != reset_sequence_id {
warn!("Mismatched sequence/reset sequence ids: {} != {}", sequence_id, reset_sequence_id); warn!("Mismatched sequence/reset sequence ids: {} != {}", sequence_id, reset_sequence_id);
@ -346,7 +344,7 @@ impl GamepadHapticActuator {
} }
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = self.global().task_manager().gamepad_task_source().queue( self.global().task_manager().gamepad_task_source().queue(
task!(stop_playing_effect: move || { task!(stop_playing_effect: move || {
let actuator = this.root(); let actuator = this.root();
let Some(promise) = actuator.playing_effect_promise.borrow_mut().take() else { let Some(promise) = actuator.playing_effect_promise.borrow_mut().take() else {

View file

@ -138,7 +138,7 @@ use crate::script_runtime::{
use crate::script_thread::{with_script_thread, ScriptThread}; use crate::script_thread::{with_script_thread, ScriptThread};
use crate::security_manager::CSPViolationReporter; use crate::security_manager::CSPViolationReporter;
use crate::task_manager::TaskManager; use crate::task_manager::TaskManager;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
use crate::timers::{ use crate::timers::{
IsInterval, OneshotTimerCallback, OneshotTimerHandle, OneshotTimers, TimerCallback, IsInterval, OneshotTimerCallback, OneshotTimerHandle, OneshotTimers, TimerCallback,
}; };
@ -376,13 +376,13 @@ pub struct GlobalScope {
/// A wrapper for glue-code between the ipc router and the event-loop. /// A wrapper for glue-code between the ipc router and the event-loop.
struct MessageListener { struct MessageListener {
task_source: TaskSource, task_source: SendableTaskSource,
context: Trusted<GlobalScope>, context: Trusted<GlobalScope>,
} }
/// A wrapper for broadcasts coming in over IPC, and the event-loop. /// A wrapper for broadcasts coming in over IPC, and the event-loop.
struct BroadcastListener { struct BroadcastListener {
task_source: TaskSource, task_source: SendableTaskSource,
context: Trusted<GlobalScope>, context: Trusted<GlobalScope>,
} }
@ -394,7 +394,7 @@ struct FileListener {
/// - Some(Empty) => Some(Receiving) => None /// - Some(Empty) => Some(Receiving) => None
/// - Some(Empty) => None /// - Some(Empty) => None
state: Option<FileListenerState>, state: Option<FileListenerState>,
task_source: TaskSource, task_source: SendableTaskSource,
} }
enum FileListenerTarget { enum FileListenerTarget {
@ -507,8 +507,7 @@ impl BroadcastListener {
// This however seems to be hard to avoid in the light of the IPC. // This however seems to be hard to avoid in the light of the IPC.
// One can imagine queueing tasks directly, // One can imagine queueing tasks directly,
// for channels that would be in the same script-thread. // for channels that would be in the same script-thread.
let _ = self self.task_source
.task_source
.queue(task!(broadcast_message_event: move || { .queue(task!(broadcast_message_event: move || {
let global = context.root(); let global = context.root();
// Step 10 of https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage, // Step 10 of https://html.spec.whatwg.org/multipage/#dom-broadcastchannel-postmessage,
@ -526,7 +525,7 @@ impl MessageListener {
match msg { match msg {
MessagePortMsg::CompleteTransfer(ports) => { MessagePortMsg::CompleteTransfer(ports) => {
let context = self.context.clone(); let context = self.context.clone();
let _ = self.task_source.queue( self.task_source.queue(
task!(process_complete_transfer: move || { task!(process_complete_transfer: move || {
let global = context.root(); let global = context.root();
@ -561,22 +560,21 @@ impl MessageListener {
}, },
MessagePortMsg::CompletePendingTransfer(port_id, buffer) => { MessagePortMsg::CompletePendingTransfer(port_id, buffer) => {
let context = self.context.clone(); let context = self.context.clone();
let _ = self.task_source.queue(task!(complete_pending: move || { self.task_source.queue(task!(complete_pending: move || {
let global = context.root(); let global = context.root();
global.complete_port_transfer(port_id, buffer); global.complete_port_transfer(port_id, buffer);
})); }));
}, },
MessagePortMsg::NewTask(port_id, task) => { MessagePortMsg::NewTask(port_id, task) => {
let context = self.context.clone(); let context = self.context.clone();
let _ = self.task_source.queue(task!(process_new_task: move || { self.task_source.queue(task!(process_new_task: move || {
let global = context.root(); let global = context.root();
global.route_task_to_port(port_id, task, CanGc::note()); global.route_task_to_port(port_id, task, CanGc::note());
})); }));
}, },
MessagePortMsg::RemoveMessagePort(port_id) => { MessagePortMsg::RemoveMessagePort(port_id) => {
let context = self.context.clone(); let context = self.context.clone();
let _ = self self.task_source
.task_source
.queue(task!(process_remove_message_port: move || { .queue(task!(process_remove_message_port: move || {
let global = context.root(); let global = context.root();
global.note_entangled_port_removed(&port_id); global.note_entangled_port_removed(&port_id);
@ -615,7 +613,7 @@ impl FileListener {
let stream = trusted.root(); let stream = trusted.root();
stream_handle_incoming(&stream, Ok(blob_buf.bytes)); stream_handle_incoming(&stream, Ok(blob_buf.bytes));
}); });
let _ = self.task_source.queue(task); self.task_source.queue(task);
Vec::with_capacity(0) Vec::with_capacity(0)
} else { } else {
@ -638,7 +636,7 @@ impl FileListener {
stream_handle_incoming(&stream, Ok(bytes_in)); stream_handle_incoming(&stream, Ok(bytes_in));
}); });
let _ = self.task_source.queue(task); self.task_source.queue(task);
} else { } else {
bytes.append(&mut bytes_in); bytes.append(&mut bytes_in);
}; };
@ -658,7 +656,7 @@ impl FileListener {
callback(promise, Ok(bytes)); callback(promise, Ok(bytes));
}); });
let _ = self.task_source.queue(task); self.task_source.queue(task);
}, },
FileListenerTarget::Stream(trusted_stream) => { FileListenerTarget::Stream(trusted_stream) => {
let trusted = trusted_stream.clone(); let trusted = trusted_stream.clone();
@ -668,7 +666,7 @@ impl FileListener {
stream_handle_eof(&stream); stream_handle_eof(&stream);
}); });
let _ = self.task_source.queue(task); self.task_source.queue(task);
}, },
}, },
_ => { _ => {
@ -682,14 +680,14 @@ impl FileListener {
match target { match target {
FileListenerTarget::Promise(trusted_promise, callback) => { FileListenerTarget::Promise(trusted_promise, callback) => {
let _ = self.task_source.queue(task!(reject_promise: move || { self.task_source.queue(task!(reject_promise: move || {
let promise = trusted_promise.root(); let promise = trusted_promise.root();
let _ac = enter_realm(&*promise.global()); let _ac = enter_realm(&*promise.global());
callback(promise, error); callback(promise, error);
})); }));
}, },
FileListenerTarget::Stream(trusted_stream) => { FileListenerTarget::Stream(trusted_stream) => {
let _ = self.task_source.queue(task!(error_stream: move || { self.task_source.queue(task!(error_stream: move || {
let stream = trusted_stream.root(); let stream = trusted_stream.root();
stream_handle_incoming(&stream, error); stream_handle_incoming(&stream, error);
})); }));
@ -1018,7 +1016,7 @@ impl GlobalScope {
for task in message_buffer { for task in message_buffer {
let port_id = *port_id; let port_id = *port_id;
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = self.task_manager().port_message_queue().queue( self.task_manager().port_message_queue().queue(
task!(process_pending_port_messages: move || { task!(process_pending_port_messages: move || {
let target_global = this.root(); let target_global = this.root();
target_global.route_task_to_port(port_id, task, CanGc::note()); target_global.route_task_to_port(port_id, task, CanGc::note());
@ -1071,15 +1069,14 @@ impl GlobalScope {
if let Some(entangled_id) = entangled_port { if let Some(entangled_id) = entangled_port {
// Step 7 // Step 7
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = self.task_manager()
self.task_manager() .port_message_queue()
.port_message_queue() .queue(task!(post_message: move || {
.queue(task!(post_message: move || { let global = this.root();
let global = this.root(); // Note: we do this in a task, as this will ensure the global and constellation
// Note: we do this in a task, as this will ensure the global and constellation // are aware of any transfer that might still take place in the current task.
// are aware of any transfer that might still take place in the current task. global.route_task_to_port(entangled_id, task, CanGc::note());
global.route_task_to_port(entangled_id, task, CanGc::note()); }));
}));
} }
} else { } else {
warn!("post_messageport_msg called on a global not managing any ports."); warn!("post_messageport_msg called on a global not managing any ports.");
@ -1168,7 +1165,7 @@ impl GlobalScope {
// to fire the message event // to fire the message event
let channel = Trusted::new(&*channel); let channel = Trusted::new(&*channel);
let global = Trusted::new(self); let global = Trusted::new(self);
let _ = self.task_manager().dom_manipulation_task_source().queue( self.task_manager().dom_manipulation_task_source().queue(
task!(process_pending_port_messages: move || { task!(process_pending_port_messages: move || {
let destination = channel.root(); let destination = channel.root();
let global = global.root(); let global = global.root();
@ -1361,7 +1358,7 @@ impl GlobalScope {
ipc::channel().expect("ipc channel failure"); ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self); let context = Trusted::new(self);
let listener = BroadcastListener { let listener = BroadcastListener {
task_source: self.task_manager().dom_manipulation_task_source(), task_source: self.task_manager().dom_manipulation_task_source().into(),
context, context,
}; };
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -1409,7 +1406,7 @@ impl GlobalScope {
ipc::channel().expect("ipc channel failure"); ipc::channel().expect("ipc channel failure");
let context = Trusted::new(self); let context = Trusted::new(self);
let listener = MessageListener { let listener = MessageListener {
task_source: self.task_manager().port_message_queue(), task_source: self.task_manager().port_message_queue().into(),
context, context,
}; };
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -1447,7 +1444,7 @@ impl GlobalScope {
// Queue a task to complete the transfer, // Queue a task to complete the transfer,
// unless the port is re-transferred in the current task. // unless the port is re-transferred in the current task.
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = self.task_manager().port_message_queue().queue( self.task_manager().port_message_queue().queue(
task!(process_pending_port_messages: move || { task!(process_pending_port_messages: move || {
let target_global = this.root(); let target_global = this.root();
target_global.maybe_add_pending_ports(); target_global.maybe_add_pending_ports();
@ -1935,7 +1932,7 @@ impl GlobalScope {
state: Some(FileListenerState::Empty(FileListenerTarget::Stream( state: Some(FileListenerState::Empty(FileListenerTarget::Stream(
trusted_stream, trusted_stream,
))), ))),
task_source: self.task_manager().file_reading_task_source(), task_source: self.task_manager().file_reading_task_source().into(),
}; };
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -1957,7 +1954,7 @@ impl GlobalScope {
trusted_promise, trusted_promise,
callback, callback,
))), ))),
task_source: self.task_manager().file_reading_task_source(), task_source: self.task_manager().file_reading_task_source().into(),
}; };
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -2691,8 +2688,7 @@ impl GlobalScope {
); );
self.task_manager() self.task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
} }
} }
@ -3081,8 +3077,7 @@ impl GlobalScope {
); );
navigator.set_gamepad(selected_index as usize, &gamepad, CanGc::note()); navigator.set_gamepad(selected_index as usize, &gamepad, CanGc::note());
} }
})) }));
.expect("Failed to queue gamepad connected task.");
} }
/// <https://www.w3.org/TR/gamepad/#dfn-gamepaddisconnected> /// <https://www.w3.org/TR/gamepad/#dfn-gamepaddisconnected>
@ -3101,8 +3096,7 @@ impl GlobalScope {
} }
} }
} }
})) }));
.expect("Failed to queue gamepad disconnected task.");
} }
/// <https://www.w3.org/TR/gamepad/#receiving-inputs> /// <https://www.w3.org/TR/gamepad/#receiving-inputs>
@ -3141,16 +3135,14 @@ impl GlobalScope {
let gamepad = new_gamepad.root(); let gamepad = new_gamepad.root();
gamepad.notify_event(GamepadEventType::Connected, CanGc::note()); gamepad.notify_event(GamepadEventType::Connected, CanGc::note());
}) })
) );
.expect("Failed to queue update gamepad connect task.");
} }
}); });
} }
} }
} }
}) })
) );
.expect("Failed to queue update gamepad state task.");
} }
pub(crate) fn current_group_label(&self) -> Option<DOMString> { pub(crate) fn current_group_label(&self) -> Option<DOMString> {
@ -3218,7 +3210,7 @@ impl GlobalScope {
&self, &self,
request_builder: RequestBuilder, request_builder: RequestBuilder,
context: Arc<Mutex<Listener>>, context: Arc<Mutex<Listener>>,
task_source: TaskSource, task_source: SendableTaskSource,
cancellation_sender: Option<ipc::IpcReceiver<()>>, cancellation_sender: Option<ipc::IpcReceiver<()>>,
) { ) {
let network_listener = NetworkListener { let network_listener = NetworkListener {

View file

@ -84,8 +84,7 @@ impl VirtualMethods for HTMLDetailsElement {
let window = self.owner_window(); let window = self.owner_window();
let this = Trusted::new(self); let this = Trusted::new(self);
// FIXME(nox): Why are errors silenced here? window.task_manager().dom_manipulation_task_source().queue(
let _ = window.task_manager().dom_manipulation_task_source().queue(
task!(details_notification_task_steps: move || { task!(details_notification_task_steps: move || {
let this = this.root(); let this = this.root();
if counter == this.toggle_counter.get() { if counter == this.toggle_counter.get() {

View file

@ -1040,7 +1040,6 @@ impl HTMLFormElement {
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task)
.unwrap();
} }
/// Interactively validate the constraints of form elements /// Interactively validate the constraints of form elements

View file

@ -891,8 +891,7 @@ impl HTMLImageElement {
self.abort_request(State::Broken, ImageRequestPhase::Current, can_gc); self.abort_request(State::Broken, ImageRequestPhase::Current, can_gc);
self.abort_request(State::Broken, ImageRequestPhase::Pending, can_gc); self.abort_request(State::Broken, ImageRequestPhase::Pending, can_gc);
// Step 9. // Step 9.
// FIXME(nox): Why are errors silenced here? task_source.queue(task!(image_null_source_error: move || {
let _ = task_source.queue(task!(image_null_source_error: move || {
let this = this.root(); let this = this.root();
{ {
let mut current_request = let mut current_request =
@ -924,8 +923,7 @@ impl HTMLImageElement {
self.abort_request(State::Broken, ImageRequestPhase::Pending, can_gc); self.abort_request(State::Broken, ImageRequestPhase::Pending, can_gc);
// Step 12.1-12.5. // Step 12.1-12.5.
let src = src.0; let src = src.0;
// FIXME(nox): Why are errors silenced here? task_source.queue(task!(image_selected_source_error: move || {
let _ = task_source.queue(task!(image_selected_source_error: move || {
let this = this.root(); let this = this.root();
{ {
let mut current_request = let mut current_request =
@ -1016,7 +1014,7 @@ impl HTMLImageElement {
let this = Trusted::new(self); let this = Trusted::new(self);
let src = src.0; let src = src.0;
let _ = window.task_manager().dom_manipulation_task_source().queue( window.task_manager().dom_manipulation_task_source().queue(
task!(image_load_event: move || { task!(image_load_event: move || {
let this = this.root(); let this = this.root();
{ {
@ -1063,7 +1061,11 @@ impl HTMLImageElement {
) -> IpcSender<PendingImageResponse> { ) -> IpcSender<PendingImageResponse> {
let trusted_node = Trusted::new(elem); let trusted_node = Trusted::new(elem);
let (responder_sender, responder_receiver) = ipc::channel().unwrap(); let (responder_sender, responder_receiver) = ipc::channel().unwrap();
let task_source = elem.owner_window().task_manager().networking_task_source(); let task_source = elem
.owner_window()
.task_manager()
.networking_task_source()
.to_sendable();
let generation = elem.generation.get(); let generation = elem.generation.get();
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -1075,7 +1077,7 @@ impl HTMLImageElement {
let element = trusted_node.clone(); let element = trusted_node.clone();
let image: PendingImageResponse = message.unwrap(); let image: PendingImageResponse = message.unwrap();
let selected_source_clone = selected_source.clone(); let selected_source_clone = selected_source.clone();
let _ = task_source.queue( task_source.queue(
task!(process_image_response_for_environment_change: move || { task!(process_image_response_for_environment_change: move || {
let element = element.root(); let element = element.root();
// Ignore any image response for a previous request that has been discarded. // Ignore any image response for a previous request that has been discarded.
@ -1240,7 +1242,7 @@ impl HTMLImageElement {
let this = Trusted::new(self); let this = Trusted::new(self);
let window = self.owner_window(); let window = self.owner_window();
let src = src.0; let src = src.0;
let _ = window.task_manager().dom_manipulation_task_source().queue( window.task_manager().dom_manipulation_task_source().queue(
task!(image_load_event: move || { task!(image_load_event: move || {
let this = this.root(); let this = this.root();
let relevant_mutation = this.generation.get() != generation; let relevant_mutation = this.generation.get() != generation;

View file

@ -548,8 +548,7 @@ impl HTMLMediaElement {
// Step 2.3. // Step 2.3.
let this = Trusted::new(self); let this = Trusted::new(self);
let generation_id = self.generation_id.get(); let generation_id = self.generation_id.get();
let _ = self self.owner_window()
.owner_window()
.task_manager() .task_manager()
.media_element_task_source() .media_element_task_source()
.queue(task!(internal_pause_steps: move || { .queue(task!(internal_pause_steps: move || {
@ -595,9 +594,7 @@ impl HTMLMediaElement {
// Step 2. // Step 2.
let this = Trusted::new(self); let this = Trusted::new(self);
let generation_id = self.generation_id.get(); let generation_id = self.generation_id.get();
// FIXME(nox): Why are errors silenced here? self.owner_window()
let _ = self
.owner_window()
.task_manager() .task_manager()
.media_element_task_source() .media_element_task_source()
.queue(task!(notify_about_playing: move || { .queue(task!(notify_about_playing: move || {
@ -632,10 +629,9 @@ impl HTMLMediaElement {
return; return;
} }
let task_source = self let owner_window = self.owner_window();
.owner_window() let task_manager = owner_window.task_manager();
.task_manager() let task_source = task_manager.media_element_task_source();
.media_element_task_source();
// Step 1. // Step 1.
match (old_ready_state, ready_state) { match (old_ready_state, ready_state) {
@ -648,8 +644,7 @@ impl HTMLMediaElement {
if !self.fired_loadeddata_event.get() { if !self.fired_loadeddata_event.get() {
self.fired_loadeddata_event.set(true); self.fired_loadeddata_event.set(true);
let this = Trusted::new(self); let this = Trusted::new(self);
// FIXME(nox): Why are errors silenced here? task_source.queue(task!(media_reached_current_data: move || {
let _ = task_source.queue(task!(media_reached_current_data: move || {
let this = this.root(); let this = this.root();
this.upcast::<EventTarget>().fire_event(atom!("loadeddata"), CanGc::note()); this.upcast::<EventTarget>().fire_event(atom!("loadeddata"), CanGc::note());
this.delay_load_event(false, CanGc::note()); this.delay_load_event(false, CanGc::note());
@ -942,13 +937,11 @@ impl HTMLMediaElement {
// Step 4.remote.1.3. // Step 4.remote.1.3.
let this = Trusted::new(self); let this = Trusted::new(self);
window window.task_manager().media_element_task_source().queue(
.task_manager() task!(set_media_delay_load_event_flag_to_false: move || {
.media_element_task_source()
.queue(task!(set_media_delay_load_event_flag_to_false: move || {
this.root().delay_load_event(false, CanGc::note()); this.root().delay_load_event(false, CanGc::note());
})) }),
.unwrap(); );
// Steps 4.remote.1.4. // Steps 4.remote.1.4.
// FIXME(nox): Somehow we should wait for the task from previous // FIXME(nox): Somehow we should wait for the task from previous
@ -1004,9 +997,7 @@ impl HTMLMediaElement {
let this = Trusted::new(self); let this = Trusted::new(self);
let generation_id = self.generation_id.get(); let generation_id = self.generation_id.get();
self.take_pending_play_promises(Err(Error::NotSupported)); self.take_pending_play_promises(Err(Error::NotSupported));
// FIXME(nox): Why are errors silenced here? self.owner_window()
let _ = self
.owner_window()
.task_manager() .task_manager()
.media_element_task_source() .media_element_task_source()
.queue(task!(dedicated_media_source_failure_steps: move || { .queue(task!(dedicated_media_source_failure_steps: move || {
@ -1105,10 +1096,9 @@ impl HTMLMediaElement {
self.fulfill_in_flight_play_promises(|| ()); self.fulfill_in_flight_play_promises(|| ());
} }
let task_source = self let window = self.owner_window();
.owner_window() let task_manager = window.task_manager();
.task_manager() let task_source = task_manager.media_element_task_source();
.media_element_task_source();
// Step 5. // Step 5.
let network_state = self.network_state.get(); let network_state = self.network_state.get();
@ -1314,14 +1304,16 @@ impl HTMLMediaElement {
self.time_marches_on(); self.time_marches_on();
// Step 16. // Step 16.
let task_source = self let window = self.owner_window();
.owner_window() let task_manager = window.task_manager();
.task_manager() task_manager
.media_element_task_source(); .media_element_task_source()
task_source.queue_simple_event(self.upcast(), atom!("timeupdate")); .queue_simple_event(self.upcast(), atom!("timeupdate"));
// Step 17. // Step 17.
task_source.queue_simple_event(self.upcast(), atom!("seeked")); task_manager
.media_element_task_source()
.queue_simple_event(self.upcast(), atom!("seeked"));
} }
/// <https://html.spec.whatwg.org/multipage/#poster-frame> /// <https://html.spec.whatwg.org/multipage/#poster-frame>
@ -1386,18 +1378,19 @@ impl HTMLMediaElement {
*self.player.borrow_mut() = Some(player); *self.player.borrow_mut() = Some(player);
let trusted_node = Trusted::new(self); let trusted_node = Trusted::new(self);
let task_source = window.task_manager().media_element_task_source(); let task_source = window
.task_manager()
.media_element_task_source()
.to_sendable();
ROUTER.add_typed_route( ROUTER.add_typed_route(
action_receiver, action_receiver,
Box::new(move |message| { Box::new(move |message| {
let event = message.unwrap(); let event = message.unwrap();
trace!("Player event {:?}", event); trace!("Player event {:?}", event);
let this = trusted_node.clone(); let this = trusted_node.clone();
if let Err(err) = task_source.queue(task!(handle_player_event: move || { task_source.queue(task!(handle_player_event: move || {
this.root().handle_player_event(&event, CanGc::note()); this.root().handle_player_event(&event, CanGc::note());
})) { }));
warn!("Could not queue player event handler task {:?}", err);
}
}), }),
); );
@ -1424,13 +1417,16 @@ impl HTMLMediaElement {
if let Some(image_receiver) = image_receiver { if let Some(image_receiver) = image_receiver {
let trusted_node = Trusted::new(self); let trusted_node = Trusted::new(self);
let task_source = window.task_manager().media_element_task_source(); let task_source = window
.task_manager()
.media_element_task_source()
.to_sendable();
ROUTER.add_typed_route( ROUTER.add_typed_route(
image_receiver.to_ipc_receiver(), image_receiver.to_ipc_receiver(),
Box::new(move |message| { Box::new(move |message| {
let msg = message.unwrap(); let msg = message.unwrap();
let this = trusted_node.clone(); let this = trusted_node.clone();
if let Err(err) = task_source.queue(task!(handle_glplayer_message: move || { task_source.queue(task!(handle_glplayer_message: move || {
trace!("GLPlayer message {:?}", msg); trace!("GLPlayer message {:?}", msg);
let video_renderer = this.root().video_renderer.clone(); let video_renderer = this.root().video_renderer.clone();
@ -1454,9 +1450,7 @@ impl HTMLMediaElement {
}, },
_ => (), _ => (),
} }
})) { }));
warn!("Could not queue GL player message handler task {:?}", err);
}
}), }),
); );
} }
@ -1506,7 +1500,7 @@ impl HTMLMediaElement {
// Step 3. // Step 3.
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = self.owner_window().task_manager().media_element_task_source().queue( self.owner_window().task_manager().media_element_task_source().queue(
task!(reaches_the_end_steps: move || { task!(reaches_the_end_steps: move || {
let this = this.root(); let this = this.root();
// Step 3.1. // Step 3.1.
@ -2216,10 +2210,9 @@ impl HTMLMediaElementMethods<crate::DomTypeHolder> for HTMLMediaElement {
let state = self.ready_state.get(); let state = self.ready_state.get();
let task_source = self let owner_window = self.owner_window();
.owner_window() let task_manager = owner_window.task_manager();
.task_manager() let task_source = task_manager.media_element_task_source();
.media_element_task_source();
if self.Paused() { if self.Paused() {
// Step 6.1. // Step 6.1.
self.paused.set(false); self.paused.set(false);
@ -2249,18 +2242,16 @@ impl HTMLMediaElementMethods<crate::DomTypeHolder> for HTMLMediaElement {
self.take_pending_play_promises(Ok(())); self.take_pending_play_promises(Ok(()));
let this = Trusted::new(self); let this = Trusted::new(self);
let generation_id = self.generation_id.get(); let generation_id = self.generation_id.get();
task_source task_source.queue(task!(resolve_pending_play_promises: move || {
.queue(task!(resolve_pending_play_promises: move || { let this = this.root();
let this = this.root(); if generation_id != this.generation_id.get() {
if generation_id != this.generation_id.get() { return;
return; }
}
this.fulfill_in_flight_play_promises(|| { this.fulfill_in_flight_play_promises(|| {
this.play_media(); this.play_media();
}); });
})) }));
.unwrap();
} }
// Step 8. // Step 8.

View file

@ -65,7 +65,7 @@ use crate::script_module::{
fetch_external_module_script, fetch_inline_module_script, ModuleOwner, ScriptFetchOptions, fetch_external_module_script, fetch_inline_module_script, ModuleOwner, ScriptFetchOptions,
}; };
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::task_source::{TaskSource, TaskSourceName}; use crate::task_source::{SendableTaskSource, TaskSourceName};
use crate::unminify::{unminify_js, ScriptSource}; use crate::unminify::{unminify_js, ScriptSource};
use crate::HasParent; use crate::HasParent;

View file

@ -170,12 +170,16 @@ impl OfflineAudioContextMethods<crate::DomTypeHolder> for OfflineAudioContext {
})); }));
let this = Trusted::new(self); let this = Trusted::new(self);
let task_source = self.global().task_manager().dom_manipulation_task_source(); let task_source = self
.global()
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
Builder::new() Builder::new()
.name("OfflineACResolver".to_owned()) .name("OfflineACResolver".to_owned())
.spawn(move || { .spawn(move || {
let _ = receiver.recv(); let _ = receiver.recv();
let _ = task_source.queue( task_source.queue(
task!(resolve: move || { task!(resolve: move || {
let this = this.root(); let this = this.root();
let processed_audio = processed_audio.lock().unwrap(); let processed_audio = processed_audio.lock().unwrap();

View file

@ -238,16 +238,14 @@ impl Performance {
if !self.pending_notification_observers_task.get() { if !self.pending_notification_observers_task.get() {
self.pending_notification_observers_task.set(true); self.pending_notification_observers_task.set(true);
let task_source = self
.global()
.task_manager()
.performance_timeline_task_source();
let global = &self.global(); let global = &self.global();
let owner = Trusted::new(&*global.performance()); let owner = Trusted::new(&*global.performance());
let task = task!(notify_performance_observers: move || { self.global()
owner.root().notify_observers(); .task_manager()
}); .performance_timeline_task_source()
let _ = task_source.queue(task); .queue(task!(notify_performance_observers: move || {
owner.root().notify_observers();
}));
} }
} }
let mut observers = self.observers.borrow_mut(); let mut observers = self.observers.borrow_mut();
@ -324,17 +322,15 @@ impl Performance {
// Step 6. // Step 6.
// Queue a new notification task. // Queue a new notification task.
self.pending_notification_observers_task.set(true); self.pending_notification_observers_task.set(true);
let task_source = self
.global()
.task_manager()
.performance_timeline_task_source();
let global = &self.global(); let global = &self.global();
let owner = Trusted::new(&*global.performance()); let owner = Trusted::new(&*global.performance());
let task = task!(notify_performance_observers: move || { self.global()
owner.root().notify_observers(); .task_manager()
}); .performance_timeline_task_source()
let _ = task_source.queue(task); .queue(task!(notify_performance_observers: move || {
owner.root().notify_observers();
}));
Some(entry_last_index) Some(entry_last_index)
} }

View file

@ -52,7 +52,7 @@ use crate::dom::rtctrackevent::RTCTrackEvent;
use crate::dom::window::Window; use crate::dom::window::Window;
use crate::realms::{enter_realm, InRealm}; use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
#[dom_struct] #[dom_struct]
pub struct RTCPeerConnection { pub struct RTCPeerConnection {
@ -79,13 +79,13 @@ pub struct RTCPeerConnection {
struct RTCSignaller { struct RTCSignaller {
trusted: Trusted<RTCPeerConnection>, trusted: Trusted<RTCPeerConnection>,
task_source: TaskSource, task_source: SendableTaskSource,
} }
impl WebRtcSignaller for RTCSignaller { impl WebRtcSignaller for RTCSignaller {
fn on_ice_candidate(&self, _: &WebRtcController, candidate: IceCandidate) { fn on_ice_candidate(&self, _: &WebRtcController, candidate: IceCandidate) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self.task_source.queue(task!(on_ice_candidate: move || { self.task_source.queue(task!(on_ice_candidate: move || {
let this = this.root(); let this = this.root();
this.on_ice_candidate(candidate, CanGc::note()); this.on_ice_candidate(candidate, CanGc::note());
})); }));
@ -93,8 +93,7 @@ impl WebRtcSignaller for RTCSignaller {
fn on_negotiation_needed(&self, _: &WebRtcController) { fn on_negotiation_needed(&self, _: &WebRtcController) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self self.task_source
.task_source
.queue(task!(on_negotiation_needed: move || { .queue(task!(on_negotiation_needed: move || {
let this = this.root(); let this = this.root();
this.on_negotiation_needed(CanGc::note()); this.on_negotiation_needed(CanGc::note());
@ -103,8 +102,7 @@ impl WebRtcSignaller for RTCSignaller {
fn update_gathering_state(&self, state: GatheringState) { fn update_gathering_state(&self, state: GatheringState) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self self.task_source
.task_source
.queue(task!(update_gathering_state: move || { .queue(task!(update_gathering_state: move || {
let this = this.root(); let this = this.root();
this.update_gathering_state(state, CanGc::note()); this.update_gathering_state(state, CanGc::note());
@ -113,8 +111,7 @@ impl WebRtcSignaller for RTCSignaller {
fn update_ice_connection_state(&self, state: IceConnectionState) { fn update_ice_connection_state(&self, state: IceConnectionState) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self self.task_source
.task_source
.queue(task!(update_ice_connection_state: move || { .queue(task!(update_ice_connection_state: move || {
let this = this.root(); let this = this.root();
this.update_ice_connection_state(state, CanGc::note()); this.update_ice_connection_state(state, CanGc::note());
@ -123,8 +120,7 @@ impl WebRtcSignaller for RTCSignaller {
fn update_signaling_state(&self, state: SignalingState) { fn update_signaling_state(&self, state: SignalingState) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self self.task_source
.task_source
.queue(task!(update_signaling_state: move || { .queue(task!(update_signaling_state: move || {
let this = this.root(); let this = this.root();
this.update_signaling_state(state, CanGc::note()); this.update_signaling_state(state, CanGc::note());
@ -134,7 +130,7 @@ impl WebRtcSignaller for RTCSignaller {
fn on_add_stream(&self, id: &MediaStreamId, ty: MediaStreamType) { fn on_add_stream(&self, id: &MediaStreamId, ty: MediaStreamType) {
let this = self.trusted.clone(); let this = self.trusted.clone();
let id = *id; let id = *id;
let _ = self.task_source.queue(task!(on_add_stream: move || { self.task_source.queue(task!(on_add_stream: move || {
let this = this.root(); let this = this.root();
this.on_add_stream(id, ty, CanGc::note()); this.on_add_stream(id, ty, CanGc::note());
})); }));
@ -148,8 +144,7 @@ impl WebRtcSignaller for RTCSignaller {
) { ) {
// XXX(ferjm) get label and options from channel properties. // XXX(ferjm) get label and options from channel properties.
let this = self.trusted.clone(); let this = self.trusted.clone();
let _ = self self.task_source
.task_source
.queue(task!(on_data_channel_event: move || { .queue(task!(on_data_channel_event: move || {
let this = this.root(); let this = this.root();
let global = this.global(); let global = this.global();
@ -224,10 +219,9 @@ impl RTCPeerConnection {
fn make_signaller(&self) -> Box<dyn WebRtcSignaller> { fn make_signaller(&self) -> Box<dyn WebRtcSignaller> {
let trusted = Trusted::new(self); let trusted = Trusted::new(self);
let task_source = self.global().task_manager().networking_task_source();
Box::new(RTCSignaller { Box::new(RTCSignaller {
trusted, trusted,
task_source, task_source: self.global().task_manager().networking_task_source().into(),
}) })
} }
@ -442,14 +436,18 @@ impl RTCPeerConnection {
fn create_offer(&self) { fn create_offer(&self) {
let generation = self.offer_answer_generation.get(); let generation = self.offer_answer_generation.get();
let task_source = self.global().task_manager().networking_task_source(); let task_source = self
.global()
.task_manager()
.networking_task_source()
.to_sendable();
let this = Trusted::new(self); let this = Trusted::new(self);
self.controller self.controller
.borrow_mut() .borrow_mut()
.as_ref() .as_ref()
.unwrap() .unwrap()
.create_offer(Box::new(move |desc: SessionDescription| { .create_offer(Box::new(move |desc: SessionDescription| {
let _ = task_source.queue(task!(offer_created: move || { task_source.queue(task!(offer_created: move || {
let this = this.root(); let this = this.root();
if this.offer_answer_generation.get() != generation { if this.offer_answer_generation.get() != generation {
// the state has changed since we last created the offer, // the state has changed since we last created the offer,
@ -467,14 +465,18 @@ impl RTCPeerConnection {
fn create_answer(&self) { fn create_answer(&self) {
let generation = self.offer_answer_generation.get(); let generation = self.offer_answer_generation.get();
let task_source = self.global().task_manager().networking_task_source(); let task_source = self
.global()
.task_manager()
.networking_task_source()
.to_sendable();
let this = Trusted::new(self); let this = Trusted::new(self);
self.controller self.controller
.borrow_mut() .borrow_mut()
.as_ref() .as_ref()
.unwrap() .unwrap()
.create_answer(Box::new(move |desc: SessionDescription| { .create_answer(Box::new(move |desc: SessionDescription| {
let _ = task_source.queue(task!(answer_created: move || { task_source.queue(task!(answer_created: move || {
let this = this.root(); let this = this.root();
if this.offer_answer_generation.get() != generation { if this.offer_answer_generation.get() != generation {
// the state has changed since we last created the offer, // the state has changed since we last created the offer,
@ -635,7 +637,11 @@ impl RTCPeerConnectionMethods<crate::DomTypeHolder> for RTCPeerConnection {
let this = Trusted::new(self); let this = Trusted::new(self);
let desc: SessionDescription = desc.convert(); let desc: SessionDescription = desc.convert();
let trusted_promise = TrustedPromise::new(p.clone()); let trusted_promise = TrustedPromise::new(p.clone());
let task_source = self.global().task_manager().networking_task_source(); let task_source = self
.global()
.task_manager()
.networking_task_source()
.to_sendable();
self.controller self.controller
.borrow_mut() .borrow_mut()
.as_ref() .as_ref()
@ -643,7 +649,7 @@ impl RTCPeerConnectionMethods<crate::DomTypeHolder> for RTCPeerConnection {
.set_local_description( .set_local_description(
desc.clone(), desc.clone(),
Box::new(move || { Box::new(move || {
let _ = task_source.queue(task!(local_description_set: move || { task_source.queue(task!(local_description_set: move || {
// XXXManishearth spec actually asks for an intricate // XXXManishearth spec actually asks for an intricate
// dance between pending/current local/remote descriptions // dance between pending/current local/remote descriptions
let this = this.root(); let this = this.root();
@ -674,7 +680,11 @@ impl RTCPeerConnectionMethods<crate::DomTypeHolder> for RTCPeerConnection {
let this = Trusted::new(self); let this = Trusted::new(self);
let desc: SessionDescription = desc.convert(); let desc: SessionDescription = desc.convert();
let trusted_promise = TrustedPromise::new(p.clone()); let trusted_promise = TrustedPromise::new(p.clone());
let task_source = self.global().task_manager().networking_task_source(); let task_source = self
.global()
.task_manager()
.networking_task_source()
.to_sendable();
self.controller self.controller
.borrow_mut() .borrow_mut()
.as_ref() .as_ref()
@ -682,7 +692,7 @@ impl RTCPeerConnectionMethods<crate::DomTypeHolder> for RTCPeerConnection {
.set_remote_description( .set_remote_description(
desc.clone(), desc.clone(),
Box::new(move || { Box::new(move || {
let _ = task_source.queue(task!(remote_description_set: move || { task_source.queue(task!(remote_description_set: move || {
// XXXManishearth spec actually asks for an intricate // XXXManishearth spec actually asks for an intricate
// dance between pending/current local/remote descriptions // dance between pending/current local/remote descriptions
let this = this.root(); let this = this.root();

View file

@ -98,8 +98,7 @@ impl Selection {
this.task_queued.set(false); this.task_queued.set(false);
this.document.upcast::<EventTarget>().fire_event(atom!("selectionchange"), CanGc::note()); this.document.upcast::<EventTarget>().fire_event(atom!("selectionchange"), CanGc::note());
}) })
) );
.expect("Couldn't queue selectionchange task!");
self.task_queued.set(true); self.task_queued.set(true);
} }

View file

@ -26,7 +26,7 @@ use crate::dom::serviceworker::ServiceWorker;
use crate::dom::serviceworkerregistration::ServiceWorkerRegistration; use crate::dom::serviceworkerregistration::ServiceWorkerRegistration;
use crate::realms::{enter_realm, InRealm}; use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
#[dom_struct] #[dom_struct]
pub struct ServiceWorkerContainer { pub struct ServiceWorkerContainer {
@ -140,10 +140,9 @@ impl ServiceWorkerContainerMethods<crate::DomTypeHolder> for ServiceWorkerContai
// Setup the callback for reject/resolve of the promise, // Setup the callback for reject/resolve of the promise,
// from steps running "in-parallel" from here in the serviceworker manager. // from steps running "in-parallel" from here in the serviceworker manager.
let task_source = global.task_manager().dom_manipulation_task_source();
let mut handler = RegisterJobResultHandler { let mut handler = RegisterJobResultHandler {
trusted_promise: Some(TrustedPromise::new(promise.clone())), trusted_promise: Some(TrustedPromise::new(promise.clone())),
task_source, task_source: global.task_manager().dom_manipulation_task_source().into(),
}; };
let (job_result_sender, job_result_receiver) = ipc::channel().expect("ipc channel failure"); let (job_result_sender, job_result_receiver) = ipc::channel().expect("ipc channel failure");
@ -183,7 +182,7 @@ impl ServiceWorkerContainerMethods<crate::DomTypeHolder> for ServiceWorkerContai
/// <https://w3c.github.io/ServiceWorker/#register> /// <https://w3c.github.io/ServiceWorker/#register>
struct RegisterJobResultHandler { struct RegisterJobResultHandler {
trusted_promise: Option<TrustedPromise>, trusted_promise: Option<TrustedPromise>,
task_source: TaskSource, task_source: SendableTaskSource,
} }
impl RegisterJobResultHandler { impl RegisterJobResultHandler {
@ -199,7 +198,7 @@ impl RegisterJobResultHandler {
.expect("No promise to resolve for SW Register job."); .expect("No promise to resolve for SW Register job.");
// Step 1 // Step 1
let _ = self.task_source.queue( self.task_source.queue(
task!(reject_promise_with_security_error: move || { task!(reject_promise_with_security_error: move || {
let promise = promise.root(); let promise = promise.root();
let _ac = enter_realm(&*promise.global()); let _ac = enter_realm(&*promise.global());
@ -224,7 +223,7 @@ impl RegisterJobResultHandler {
.expect("No promise to resolve for SW Register job."); .expect("No promise to resolve for SW Register job.");
// Step 1 // Step 1
let _ = self.task_source.queue(task!(resolve_promise: move || { self.task_source.queue(task!(resolve_promise: move || {
let promise = promise.root(); let promise = promise.root();
let global = promise.global(); let global = promise.global();
let _ac = enter_realm(&*global); let _ac = enter_realm(&*global);

View file

@ -208,10 +208,8 @@ impl Storage {
) { ) {
let global = self.global(); let global = self.global();
let this = Trusted::new(self); let this = Trusted::new(self);
global global.task_manager().dom_manipulation_task_source().queue(
.task_manager() task!(send_storage_notification: move || {
.dom_manipulation_task_source()
.queue(task!(send_storage_notification: move || {
let this = this.root(); let this = this.root();
let global = this.global(); let global = this.global();
let event = StorageEvent::new( let event = StorageEvent::new(
@ -227,7 +225,7 @@ impl Storage {
CanGc::note() CanGc::note()
); );
event.upcast::<Event>().fire(global.upcast(), CanGc::note()); event.upcast::<Event>().fire(global.upcast(), CanGc::note());
})) }),
.unwrap(); );
} }
} }

View file

@ -162,13 +162,12 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
ArrayBufferViewOrArrayBuffer::ArrayBuffer(buffer) => buffer.to_vec(), ArrayBufferViewOrArrayBuffer::ArrayBuffer(buffer) => buffer.to_vec(),
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let key_alg = key.algorithm(); let key_alg = key.algorithm();
let valid_usage = key.usages().contains(&KeyUsage::Encrypt); let valid_usage = key.usages().contains(&KeyUsage::Encrypt);
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(encrypt: move || { task!(encrypt: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();
@ -217,13 +216,12 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
ArrayBufferViewOrArrayBuffer::ArrayBuffer(buffer) => buffer.to_vec(), ArrayBufferViewOrArrayBuffer::ArrayBuffer(buffer) => buffer.to_vec(),
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let key_alg = key.algorithm(); let key_alg = key.algorithm();
let valid_usage = key.usages().contains(&KeyUsage::Decrypt); let valid_usage = key.usages().contains(&KeyUsage::Decrypt);
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(decrypt: move || { task!(decrypt: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();
@ -283,48 +281,50 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
// NOTE: We did that in preparation of Step 4. // NOTE: We did that in preparation of Step 4.
// Step 6. Return promise and perform the remaining steps in parallel. // Step 6. Return promise and perform the remaining steps in parallel.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let _ = task_source.queue(task!(sign: move || { self.global()
// Step 7. If the following steps or referenced procedures say to throw an error, reject promise .task_manager()
// with the returned error and then terminate the algorithm. .dom_manipulation_task_source()
let promise = trusted_promise.root(); .queue(task!(sign: move || {
let key = trusted_key.root(); // Step 7. If the following steps or referenced procedures say to throw an error, reject promise
// with the returned error and then terminate the algorithm.
let promise = trusted_promise.root();
let key = trusted_key.root();
// Step 8. If the name member of normalizedAlgorithm is not equal to the name attribute of the // Step 8. If the name member of normalizedAlgorithm is not equal to the name attribute of the
// [[algorithm]] internal slot of key then throw an InvalidAccessError. // [[algorithm]] internal slot of key then throw an InvalidAccessError.
if normalized_algorithm.name() != key.algorithm() { if normalized_algorithm.name() != key.algorithm() {
promise.reject_error(Error::InvalidAccess); promise.reject_error(Error::InvalidAccess);
return;
}
// Step 9. If the [[usages]] internal slot of key does not contain an entry that is "sign",
// then throw an InvalidAccessError.
if !key.usages().contains(&KeyUsage::Sign) {
promise.reject_error(Error::InvalidAccess);
return;
}
// Step 10. Let result be the result of performing the sign operation specified by normalizedAlgorithm
// using key and algorithm and with data as message.
let cx = GlobalScope::get_cx();
let result = match normalized_algorithm.sign(cx, &key, &data) {
Ok(signature) => signature,
Err(e) => {
promise.reject_error(e);
return; return;
} }
};
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>()); // Step 9. If the [[usages]] internal slot of key does not contain an entry that is "sign",
create_buffer_source::<ArrayBufferU8>(cx, &result, array_buffer_ptr.handle_mut()) // then throw an InvalidAccessError.
.expect("failed to create buffer source for exported key."); if !key.usages().contains(&KeyUsage::Sign) {
promise.reject_error(Error::InvalidAccess);
return;
}
// Step 9. Resolve promise with result. // Step 10. Let result be the result of performing the sign operation specified by normalizedAlgorithm
promise.resolve_native(&*array_buffer_ptr); // using key and algorithm and with data as message.
})); let cx = GlobalScope::get_cx();
let result = match normalized_algorithm.sign(cx, &key, &data) {
Ok(signature) => signature,
Err(e) => {
promise.reject_error(e);
return;
}
};
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
create_buffer_source::<ArrayBufferU8>(cx, &result, array_buffer_ptr.handle_mut())
.expect("failed to create buffer source for exported key.");
// Step 9. Resolve promise with result.
promise.resolve_native(&*array_buffer_ptr);
}));
promise promise
} }
@ -373,44 +373,46 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
// NOTE: We did that in preparation of Step 6. // NOTE: We did that in preparation of Step 6.
// Step 7. Return promise and perform the remaining steps in parallel. // Step 7. Return promise and perform the remaining steps in parallel.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let _ = task_source.queue(task!(sign: move || { self.global()
// Step 8. If the following steps or referenced procedures say to throw an error, reject promise .task_manager()
// with the returned error and then terminate the algorithm. .dom_manipulation_task_source()
let promise = trusted_promise.root(); .queue(task!(sign: move || {
let key = trusted_key.root(); // Step 8. If the following steps or referenced procedures say to throw an error, reject promise
// with the returned error and then terminate the algorithm.
let promise = trusted_promise.root();
let key = trusted_key.root();
// Step 9. If the name member of normalizedAlgorithm is not equal to the name attribute of the // Step 9. If the name member of normalizedAlgorithm is not equal to the name attribute of the
// [[algorithm]] internal slot of key then throw an InvalidAccessError. // [[algorithm]] internal slot of key then throw an InvalidAccessError.
if normalized_algorithm.name() != key.algorithm() { if normalized_algorithm.name() != key.algorithm() {
promise.reject_error(Error::InvalidAccess); promise.reject_error(Error::InvalidAccess);
return;
}
// Step 10. If the [[usages]] internal slot of key does not contain an entry that is "verify",
// then throw an InvalidAccessError.
if !key.usages().contains(&KeyUsage::Verify) {
promise.reject_error(Error::InvalidAccess);
return;
}
// Step 1. Let result be the result of performing the verify operation specified by normalizedAlgorithm
// using key, algorithm and signature and with data as message.
let cx = GlobalScope::get_cx();
let result = match normalized_algorithm.verify(cx, &key, &data, &signature) {
Ok(result) => result,
Err(e) => {
promise.reject_error(e);
return; return;
} }
};
// Step 9. Resolve promise with result. // Step 10. If the [[usages]] internal slot of key does not contain an entry that is "verify",
promise.resolve_native(&result); // then throw an InvalidAccessError.
})); if !key.usages().contains(&KeyUsage::Verify) {
promise.reject_error(Error::InvalidAccess);
return;
}
// Step 1. Let result be the result of performing the verify operation specified by normalizedAlgorithm
// using key, algorithm and signature and with data as message.
let cx = GlobalScope::get_cx();
let result = match normalized_algorithm.verify(cx, &key, &data, &signature) {
Ok(result) => result,
Err(e) => {
promise.reject_error(e);
return;
}
};
// Step 9. Resolve promise with result.
promise.resolve_native(&result);
}));
promise promise
} }
@ -449,10 +451,9 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
// NOTE: We did that in preparation of Step 4. // NOTE: We did that in preparation of Step 4.
// Step 6. Return promise and perform the remaining steps in parallel. // Step 6. Return promise and perform the remaining steps in parallel.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(generate_key: move || { task!(generate_key: move || {
// Step 7. If the following steps or referenced procedures say to throw an error, reject promise // Step 7. If the following steps or referenced procedures say to throw an error, reject promise
// with the returned error and then terminate the algorithm. // with the returned error and then terminate the algorithm.
@ -501,19 +502,21 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
}, },
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue(task!(generate_key: move || { self.global()
let subtle = this.root(); .task_manager()
let promise = trusted_promise.root(); .dom_manipulation_task_source()
let key = normalized_algorithm.generate_key(&subtle, key_usages, extractable); .queue(task!(generate_key: move || {
let subtle = this.root();
let promise = trusted_promise.root();
let key = normalized_algorithm.generate_key(&subtle, key_usages, extractable);
match key { match key {
Ok(key) => promise.resolve_native(&key), Ok(key) => promise.resolve_native(&key),
Err(e) => promise.reject_error(e), Err(e) => promise.reject_error(e),
} }
})); }));
promise promise
} }
@ -573,11 +576,10 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
// NOTE: We created the promise earlier, after Step 1. // NOTE: We created the promise earlier, after Step 1.
// Step 9. Return promise and perform the remaining steps in parallel. // Step 9. Return promise and perform the remaining steps in parallel.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_base_key = Trusted::new(base_key); let trusted_base_key = Trusted::new(base_key);
let this = Trusted::new(self); let this = Trusted::new(self);
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(derive_key: move || { task!(derive_key: move || {
// Step 10. If the following steps or referenced procedures say to throw an error, reject promise // Step 10. If the following steps or referenced procedures say to throw an error, reject promise
// with the returned error and then terminate the algorithm. // with the returned error and then terminate the algorithm.
@ -677,44 +679,46 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
// NOTE: We did that in preparation of Step 3. // NOTE: We did that in preparation of Step 3.
// Step 5. Return promise and perform the remaining steps in parallel. // Step 5. Return promise and perform the remaining steps in parallel.
let task_source = self.global().task_manager().dom_manipulation_task_source();
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let trusted_base_key = Trusted::new(base_key); let trusted_base_key = Trusted::new(base_key);
let _ = task_source.queue(task!(import_key: move || { self.global()
// Step 6. If the following steps or referenced procedures say to throw an error, .task_manager()
// reject promise with the returned error and then terminate the algorithm. .dom_manipulation_task_source()
.queue(task!(import_key: move || {
// Step 6. If the following steps or referenced procedures say to throw an error,
// reject promise with the returned error and then terminate the algorithm.
// TODO Step 7. If the name member of normalizedAlgorithm is not equal to the name attribute // TODO Step 7. If the name member of normalizedAlgorithm is not equal to the name attribute
// of the [[algorithm]] internal slot of baseKey then throw an InvalidAccessError. // of the [[algorithm]] internal slot of baseKey then throw an InvalidAccessError.
let promise = trusted_promise.root(); let promise = trusted_promise.root();
let base_key = trusted_base_key.root(); let base_key = trusted_base_key.root();
// Step 8. If the [[usages]] internal slot of baseKey does not contain an entry that // Step 8. If the [[usages]] internal slot of baseKey does not contain an entry that
// is "deriveBits", then throw an InvalidAccessError. // is "deriveBits", then throw an InvalidAccessError.
if !base_key.usages().contains(&KeyUsage::DeriveBits) { if !base_key.usages().contains(&KeyUsage::DeriveBits) {
promise.reject_error(Error::InvalidAccess); promise.reject_error(Error::InvalidAccess);
return;
}
// Step 9. Let result be the result of creating an ArrayBuffer containing the result of performing the
// derive bits operation specified by normalizedAlgorithm using baseKey, algorithm and length.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
let result = match normalized_algorithm.derive_bits(&base_key, length) {
Ok(derived_bits) => derived_bits,
Err(e) => {
promise.reject_error(e);
return; return;
} }
};
create_buffer_source::<ArrayBufferU8>(cx, &result, array_buffer_ptr.handle_mut()) // Step 9. Let result be the result of creating an ArrayBuffer containing the result of performing the
.expect("failed to create buffer source for derived bits."); // derive bits operation specified by normalizedAlgorithm using baseKey, algorithm and length.
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
let result = match normalized_algorithm.derive_bits(&base_key, length) {
Ok(derived_bits) => derived_bits,
Err(e) => {
promise.reject_error(e);
return;
}
};
// Step 10. Resolve promise with result. create_buffer_source::<ArrayBufferU8>(cx, &result, array_buffer_ptr.handle_mut())
promise.resolve_native(&*array_buffer_ptr); .expect("failed to create buffer source for derived bits.");
}));
// Step 10. Resolve promise with result.
promise.resolve_native(&*array_buffer_ptr);
}));
promise promise
} }
@ -766,10 +770,9 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
}, },
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(import_key: move || { task!(import_key: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();
@ -794,11 +797,10 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
) -> Rc<Promise> { ) -> Rc<Promise> {
let promise = Promise::new_in_current_realm(comp, can_gc); let promise = Promise::new_in_current_realm(comp, can_gc);
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(export_key: move || { task!(export_key: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();
@ -861,12 +863,11 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
}, },
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_key = Trusted::new(key); let trusted_key = Trusted::new(key);
let trusted_wrapping_key = Trusted::new(wrapping_key); let trusted_wrapping_key = Trusted::new(wrapping_key);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(wrap_key: move || { task!(wrap_key: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();
@ -999,11 +1000,10 @@ impl SubtleCryptoMethods<crate::DomTypeHolder> for SubtleCrypto {
}, },
}; };
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
let trusted_key = Trusted::new(unwrapping_key); let trusted_key = Trusted::new(unwrapping_key);
let trusted_promise = TrustedPromise::new(promise.clone()); let trusted_promise = TrustedPromise::new(promise.clone());
let _ = task_source.queue( self.global().task_manager().dom_manipulation_task_source().queue(
task!(unwrap_key: move || { task!(unwrap_key: move || {
let subtle = this.root(); let subtle = this.root();
let promise = trusted_promise.root(); let promise = trusted_promise.root();

View file

@ -62,31 +62,32 @@ impl TextTrackList {
if self.find(track).is_none() { if self.find(track).is_none() {
self.dom_tracks.borrow_mut().push(Dom::from_ref(track)); self.dom_tracks.borrow_mut().push(Dom::from_ref(track));
let this = Trusted::new(self);
let task_source = self.global().task_manager().media_element_task_source();
let Some(idx) = self.find(track) else { let Some(idx) = self.find(track) else {
return; return;
}; };
let _ = task_source.queue(task!(track_event_queue: move || { let this = Trusted::new(self);
let this = this.root(); self.global()
.task_manager()
.media_element_task_source()
.queue(task!(track_event_queue: move || {
let this = this.root();
if let Some(track) = this.item(idx) { if let Some(track) = this.item(idx) {
let event = TrackEvent::new( let event = TrackEvent::new(
&this.global(), &this.global(),
atom!("addtrack"), atom!("addtrack"),
false, false,
false, false,
&Some(VideoTrackOrAudioTrackOrTextTrack::TextTrack( &Some(VideoTrackOrAudioTrackOrTextTrack::TextTrack(
DomRoot::from_ref(&track) DomRoot::from_ref(&track)
)), )),
CanGc::note() CanGc::note()
); );
event.upcast::<Event>().fire(this.upcast::<EventTarget>(), CanGc::note()); event.upcast::<Event>().fire(this.upcast::<EventTarget>(), CanGc::note());
} }
})); }));
track.add_track_list(self); track.add_track_list(self);
} }
} }

View file

@ -81,9 +81,6 @@ impl VideoTrackList {
return; return;
} }
let this = Trusted::new(self);
let task_source = self.global().task_manager().media_element_task_source();
if let Some(current) = self.selected_index() { if let Some(current) = self.selected_index() {
self.tracks.borrow()[current].set_selected(false); self.tracks.borrow()[current].set_selected(false);
} }
@ -93,10 +90,14 @@ impl VideoTrackList {
media_element.set_video_track(idx, value); media_element.set_video_track(idx, value);
} }
let _ = task_source.queue(task!(media_track_change: move || { let this = Trusted::new(self);
let this = this.root(); self.global()
this.upcast::<EventTarget>().fire_event(atom!("change"), CanGc::note()); .task_manager()
})); .media_element_task_source()
.queue(task!(media_track_change: move || {
let this = this.root();
this.upcast::<EventTarget>().fire_event(atom!("change"), CanGc::note());
}));
} }
pub fn add(&self, track: &VideoTrack) { pub fn add(&self, track: &VideoTrack) {

View file

@ -174,8 +174,7 @@ impl WebGLQuery {
self.global() self.global()
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
} }
match pname { match pname {

View file

@ -76,8 +76,7 @@ impl WebGLSync {
self.global() self.global()
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
}, },
_ => {}, _ => {},
} }
@ -111,8 +110,7 @@ impl WebGLSync {
self.global() self.global()
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
}, },
_ => {}, _ => {},
} }

View file

@ -71,7 +71,8 @@ pub fn response_async<T: AsyncWGPUListener + DomObject + 'static>(
let task_source = receiver let task_source = receiver
.global() .global()
.task_manager() .task_manager()
.dom_manipulation_task_source(); .dom_manipulation_task_source()
.to_sendable();
let mut trusted: Option<TrustedPromise> = Some(TrustedPromise::new(promise.clone())); let mut trusted: Option<TrustedPromise> = Some(TrustedPromise::new(promise.clone()));
let trusted_receiver = Trusted::new(receiver); let trusted_receiver = Trusted::new(receiver);
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -88,12 +89,9 @@ pub fn response_async<T: AsyncWGPUListener + DomObject + 'static>(
trusted, trusted,
receiver: trusted_receiver.clone(), receiver: trusted_receiver.clone(),
}; };
let result = task_source.queue(task!(process_webgpu_task: move|| { task_source.queue(task!(process_webgpu_task: move|| {
context.response(message.unwrap(), CanGc::note()); context.response(message.unwrap(), CanGc::note());
})); }));
if let Err(err) = result {
error!("Failed to queue GPU listener-task: {:?}", err);
}
}), }),
); );
action_sender action_sender

View file

@ -40,7 +40,7 @@ use crate::dom::globalscope::GlobalScope;
use crate::dom::messageevent::MessageEvent; use crate::dom::messageevent::MessageEvent;
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::task::TaskOnce; use crate::task::TaskOnce;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)] #[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
enum WebSocketRequestState { enum WebSocketRequestState {
@ -70,27 +70,25 @@ mod close_code {
fn close_the_websocket_connection( fn close_the_websocket_connection(
address: Trusted<WebSocket>, address: Trusted<WebSocket>,
task_source: &TaskSource, task_source: &SendableTaskSource,
code: Option<u16>, code: Option<u16>,
reason: String, reason: String,
) { ) {
let close_task = CloseTask { task_source.queue(CloseTask {
address, address,
failed: false, failed: false,
code, code,
reason: Some(reason), reason: Some(reason),
}; });
let _ = task_source.queue(close_task);
} }
fn fail_the_websocket_connection(address: Trusted<WebSocket>, task_source: &TaskSource) { fn fail_the_websocket_connection(address: Trusted<WebSocket>, task_source: &SendableTaskSource) {
let close_task = CloseTask { task_source.queue(CloseTask {
address, address,
failed: true, failed: true,
code: Some(close_code::ABNORMAL), code: Some(close_code::ABNORMAL),
reason: None, reason: None,
}; });
let _ = task_source.queue(close_task);
} }
#[dom_struct] #[dom_struct]
@ -162,8 +160,7 @@ impl WebSocket {
self.clearing_buffer.set(true); self.clearing_buffer.set(true);
// TODO(mrobinson): Should this task be cancellable? // TODO(mrobinson): Should this task be cancellable?
let _ = self self.global()
.global()
.task_manager() .task_manager()
.websocket_task_source() .websocket_task_source()
.queue_unconditionally(BufferedAmountTask { address }); .queue_unconditionally(BufferedAmountTask { address });
@ -270,7 +267,7 @@ impl WebSocketMethods<crate::DomTypeHolder> for WebSocket {
.core_resource_thread() .core_resource_thread()
.send(CoreResourceMsg::Fetch(request, channels)); .send(CoreResourceMsg::Fetch(request, channels));
let task_source = global.task_manager().websocket_task_source(); let task_source = global.task_manager().websocket_task_source().to_sendable();
ROUTER.add_typed_route( ROUTER.add_typed_route(
dom_event_receiver.to_ipc_receiver(), dom_event_receiver.to_ipc_receiver(),
Box::new(move |message| match message.unwrap() { Box::new(move |message| match message.unwrap() {
@ -279,14 +276,14 @@ impl WebSocketMethods<crate::DomTypeHolder> for WebSocket {
address: address.clone(), address: address.clone(),
protocol_in_use, protocol_in_use,
}; };
let _ = task_source.queue(open_thread); task_source.queue(open_thread);
}, },
WebSocketNetworkEvent::MessageReceived(message) => { WebSocketNetworkEvent::MessageReceived(message) => {
let message_thread = MessageReceivedTask { let message_thread = MessageReceivedTask {
address: address.clone(), address: address.clone(),
message, message,
}; };
let _ = task_source.queue(message_thread); task_source.queue(message_thread);
}, },
WebSocketNetworkEvent::Fail => { WebSocketNetworkEvent::Fail => {
fail_the_websocket_connection(address.clone(), &task_source); fail_the_websocket_connection(address.clone(), &task_source);
@ -426,9 +423,14 @@ impl WebSocketMethods<crate::DomTypeHolder> for WebSocket {
will abort connecting the websocket*/ will abort connecting the websocket*/
self.ready_state.set(WebSocketRequestState::Closing); self.ready_state.set(WebSocketRequestState::Closing);
let address = Trusted::new(self); fail_the_websocket_connection(
let task_source = self.global().task_manager().websocket_task_source(); Trusted::new(self),
fail_the_websocket_connection(address, &task_source); &self
.global()
.task_manager()
.websocket_task_source()
.to_sendable(),
);
}, },
WebSocketRequestState::Open => { WebSocketRequestState::Open => {
self.ready_state.set(WebSocketRequestState::Closing); self.ready_state.set(WebSocketRequestState::Closing);

View file

@ -306,7 +306,10 @@ impl FakeXRDeviceMethods<crate::DomTypeHolder> for FakeXRDevice {
let global = self.global(); let global = self.global();
let p = Promise::new(&global, can_gc); let p = Promise::new(&global, can_gc);
let mut trusted = Some(TrustedPromise::new(p.clone())); let mut trusted = Some(TrustedPromise::new(p.clone()));
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -315,7 +318,7 @@ impl FakeXRDeviceMethods<crate::DomTypeHolder> for FakeXRDevice {
let trusted = trusted let trusted = trusted
.take() .take()
.expect("disconnect callback called multiple times"); .expect("disconnect callback called multiple times");
let _ = task_source.queue(trusted.resolve_task(())); task_source.queue(trusted.resolve_task(()));
}), }),
); );
self.disconnect(sender); self.disconnect(sender);

View file

@ -201,14 +201,17 @@ impl XRSession {
fn setup_raf_loop(&self, frame_receiver: IpcReceiver<Frame>) { fn setup_raf_loop(&self, frame_receiver: IpcReceiver<Frame>) {
let this = Trusted::new(self); let this = Trusted::new(self);
let global = self.global(); let global = self.global();
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
ROUTER.add_typed_route( ROUTER.add_typed_route(
frame_receiver, frame_receiver,
Box::new(move |message| { Box::new(move |message| {
let frame: Frame = message.unwrap(); let frame: Frame = message.unwrap();
let time = CrossProcessInstant::now(); let time = CrossProcessInstant::now();
let this = this.clone(); let this = this.clone();
let _ = task_source.queue(task!(xr_raf_callback: move || { task_source.queue(task!(xr_raf_callback: move || {
this.root().raf_callback(frame, time); this.root().raf_callback(frame, time);
})); }));
}), }),
@ -224,14 +227,17 @@ impl XRSession {
fn attach_event_handler(&self) { fn attach_event_handler(&self) {
let this = Trusted::new(self); let this = Trusted::new(self);
let global = self.global(); let global = self.global();
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
ROUTER.add_typed_route( ROUTER.add_typed_route(
receiver.to_ipc_receiver(), receiver.to_ipc_receiver(),
Box::new(move |message| { Box::new(move |message| {
let this = this.clone(); let this = this.clone();
let _ = task_source.queue(task!(xr_event_callback: move || { task_source.queue(task!(xr_event_callback: move || {
this.root().event_callback(message.unwrap(), CanGc::note()); this.root().event_callback(message.unwrap(), CanGc::note());
})); }));
}), }),
@ -254,14 +260,16 @@ impl XRSession {
return; return;
} }
let task_source = self.global().task_manager().dom_manipulation_task_source();
let this = Trusted::new(self); let this = Trusted::new(self);
// Queue a task so that it runs after resolve()'s microtasks complete // Queue a task so that it runs after resolve()'s microtasks complete
// so that content has a chance to attach a listener for inputsourceschange // so that content has a chance to attach a listener for inputsourceschange
let _ = task_source.queue(task!(session_initial_inputs: move || { self.global()
let this = this.root(); .task_manager()
this.input_sources.add_input_sources(&this, &initial_inputs, CanGc::note()); .dom_manipulation_task_source()
})); .queue(task!(session_initial_inputs: move || {
let this = this.root();
this.input_sources.add_input_sources(&this, &initial_inputs, CanGc::note());
}));
} }
fn event_callback(&self, event: XREvent, can_gc: CanGc) { fn event_callback(&self, event: XREvent, can_gc: CanGc) {
@ -1036,14 +1044,17 @@ impl XRSessionMethods<crate::DomTypeHolder> for XRSession {
let this = Trusted::new(self); let this = Trusted::new(self);
let global = self.global(); let global = self.global();
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
ROUTER.add_typed_route( ROUTER.add_typed_route(
receiver.to_ipc_receiver(), receiver.to_ipc_receiver(),
Box::new(move |message| { Box::new(move |message| {
let this = this.clone(); let this = this.clone();
let _ = task_source.queue(task!(update_session_framerate: move || { task_source.queue(task!(update_session_framerate: move || {
let session = this.root(); let session = this.root();
session.apply_nominal_framerate(message.unwrap(), CanGc::note()); session.apply_nominal_framerate(message.unwrap(), CanGc::note());
if let Some(promise) = session.update_framerate_promise.borrow_mut().take() { if let Some(promise) = session.update_framerate_promise.borrow_mut().take() {

View file

@ -118,7 +118,10 @@ impl XRSystemMethods<crate::DomTypeHolder> for XRSystem {
let promise = Promise::new(&self.global(), can_gc); let promise = Promise::new(&self.global(), can_gc);
let mut trusted = Some(TrustedPromise::new(promise.clone())); let mut trusted = Some(TrustedPromise::new(promise.clone()));
let global = self.global(); let global = self.global();
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
ROUTER.add_typed_route( ROUTER.add_typed_route(
receiver.to_ipc_receiver(), receiver.to_ipc_receiver(),
@ -137,9 +140,9 @@ impl XRSystemMethods<crate::DomTypeHolder> for XRSystem {
return; return;
}; };
if let Ok(()) = message { if let Ok(()) = message {
let _ = task_source.queue(trusted.resolve_task(true)); task_source.queue(trusted.resolve_task(true));
} else { } else {
let _ = task_source.queue(trusted.resolve_task(false)); task_source.queue(trusted.resolve_task(false));
}; };
}), }),
); );
@ -234,7 +237,10 @@ impl XRSystemMethods<crate::DomTypeHolder> for XRSystem {
let mut trusted = Some(TrustedPromise::new(promise.clone())); let mut trusted = Some(TrustedPromise::new(promise.clone()));
let this = Trusted::new(self); let this = Trusted::new(self);
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
let (frame_sender, frame_receiver) = ipc_crate::channel().unwrap(); let (frame_sender, frame_receiver) = ipc_crate::channel().unwrap();
let mut frame_receiver = Some(frame_receiver); let mut frame_receiver = Some(frame_receiver);
@ -251,7 +257,7 @@ impl XRSystemMethods<crate::DomTypeHolder> for XRSystem {
error!("requestSession callback given incorrect payload"); error!("requestSession callback given incorrect payload");
return; return;
}; };
let _ = task_source.queue(task!(request_session: move || { task_source.queue(task!(request_session: move || {
this.root().session_obtained(message, trusted.root(), mode, frame_receiver); this.root().session_obtained(message, trusted.root(), mode, frame_receiver);
})); }));
}), }),
@ -316,7 +322,6 @@ impl XRSystem {
xr.upcast::<EventTarget>().fire_bubbling_event(atom!("sessionavailable"), CanGc::note()); xr.upcast::<EventTarget>().fire_bubbling_event(atom!("sessionavailable"), CanGc::note());
ScriptThread::set_user_interacting(interacting); ScriptThread::set_user_interacting(interacting);
}) })
) );
.unwrap();
} }
} }

View file

@ -150,7 +150,10 @@ impl XRTestMethods<crate::DomTypeHolder> for XRTest {
let this = Trusted::new(self); let this = Trusted::new(self);
let mut trusted = Some(TrustedPromise::new(p.clone())); let mut trusted = Some(TrustedPromise::new(p.clone()));
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap(); let (sender, receiver) = ipc::channel(global.time_profiler_chan().clone()).unwrap();
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -163,7 +166,7 @@ impl XRTestMethods<crate::DomTypeHolder> for XRTest {
let message = let message =
message.expect("SimulateDeviceConnection callback given incorrect payload"); message.expect("SimulateDeviceConnection callback given incorrect payload");
let _ = task_source.queue(task!(request_session: move || { task_source.queue(task!(request_session: move || {
this.root().device_obtained(message, trusted); this.root().device_obtained(message, trusted);
})); }));
}), }),
@ -200,7 +203,10 @@ impl XRTestMethods<crate::DomTypeHolder> for XRTest {
devices.clear(); devices.clear();
let mut trusted = Some(TrustedPromise::new(p.clone())); let mut trusted = Some(TrustedPromise::new(p.clone()));
let task_source = global.task_manager().dom_manipulation_task_source(); let task_source = global
.task_manager()
.dom_manipulation_task_source()
.to_sendable();
ROUTER.add_typed_route( ROUTER.add_typed_route(
receiver.to_ipc_receiver(), receiver.to_ipc_receiver(),
@ -210,7 +216,7 @@ impl XRTestMethods<crate::DomTypeHolder> for XRTest {
let trusted = trusted let trusted = trusted
.take() .take()
.expect("DisconnectAllDevices disconnected more devices than expected"); .expect("DisconnectAllDevices disconnected more devices than expected");
let _ = task_source.queue(trusted.resolve_task(())); task_source.queue(trusted.resolve_task(()));
} }
}), }),
); );

View file

@ -816,8 +816,7 @@ impl WindowMethods<crate::DomTypeHolder> for Window {
self.global() self.global()
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.expect("Queuing window_close_browsing_context task to work");
} }
} }
} }
@ -2335,8 +2334,7 @@ impl Window {
CanGc::note()); CanGc::note());
event.upcast::<Event>().fire(this.upcast::<EventTarget>(), CanGc::note()); event.upcast::<Event>().fire(this.upcast::<EventTarget>(), CanGc::note());
}); });
let _ = self self.task_manager()
.task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task); .queue(task);
doc.set_url(load_data.url.clone()); doc.set_url(load_data.url.clone());
@ -2946,8 +2944,7 @@ impl Window {
} }
}); });
// TODO(#12718): Use the "posted message task source". // TODO(#12718): Use the "posted message task source".
let _ = self self.task_manager()
.task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task); .queue(task);
} }

View file

@ -73,7 +73,7 @@ use crate::dom::xmlhttprequestupload::XMLHttpRequestUpload;
use crate::fetch::FetchCanceller; use crate::fetch::FetchCanceller;
use crate::network_listener::{self, PreInvoke, ResourceTimingListener}; use crate::network_listener::{self, PreInvoke, ResourceTimingListener};
use crate::script_runtime::{CanGc, JSContext}; use crate::script_runtime::{CanGc, JSContext};
use crate::task_source::{TaskSource, TaskSourceName}; use crate::task_source::{SendableTaskSource, TaskSourceName};
use crate::timers::{OneshotTimerCallback, OneshotTimerHandle}; use crate::timers::{OneshotTimerCallback, OneshotTimerHandle};
#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)] #[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
@ -294,7 +294,7 @@ impl XMLHttpRequest {
fn initiate_async_xhr( fn initiate_async_xhr(
context: Arc<Mutex<XHRContext>>, context: Arc<Mutex<XHRContext>>,
task_source: TaskSource, task_source: SendableTaskSource,
global: &GlobalScope, global: &GlobalScope,
init: RequestBuilder, init: RequestBuilder,
cancellation_chan: ipc::IpcReceiver<()>, cancellation_chan: ipc::IpcReceiver<()>,
@ -1562,7 +1562,7 @@ impl XMLHttpRequest {
let (task_source, script_port) = if self.sync.get() { let (task_source, script_port) = if self.sync.get() {
let (sender, receiver) = global.new_script_pair(); let (sender, receiver) = global.new_script_pair();
( (
TaskSource { SendableTaskSource {
sender, sender,
pipeline_id: global.pipeline_id(), pipeline_id: global.pipeline_id(),
name: TaskSourceName::Networking, name: TaskSourceName::Networking,
@ -1571,7 +1571,10 @@ impl XMLHttpRequest {
Some(receiver), Some(receiver),
) )
} else { } else {
(global.task_manager().networking_task_source(), None) (
global.task_manager().networking_task_source().to_sendable(),
None,
)
}; };
let cancel_receiver = self.canceller.borrow_mut().initialize(); let cancel_receiver = self.canceller.borrow_mut().initialize();

View file

@ -196,7 +196,7 @@ pub fn Fetch(
global.fetch( global.fetch(
request_init, request_init,
fetch_context, fetch_context,
global.task_manager().networking_task_source(), global.task_manager().networking_task_source().to_sendable(),
None, None,
); );

View file

@ -26,7 +26,11 @@ pub fn generate_cache_listener_for_element<
let trusted_node = Trusted::new(elem); let trusted_node = Trusted::new(elem);
let (responder_sender, responder_receiver) = ipc::channel().unwrap(); let (responder_sender, responder_receiver) = ipc::channel().unwrap();
let task_source = elem.owner_window().task_manager().networking_task_source(); let task_source = elem
.owner_window()
.task_manager()
.networking_task_source()
.to_sendable();
let generation = elem.generation_id(); let generation = elem.generation_id();
ROUTER.add_typed_route( ROUTER.add_typed_route(
@ -35,7 +39,7 @@ pub fn generate_cache_listener_for_element<
let element = trusted_node.clone(); let element = trusted_node.clone();
let image: PendingImageResponse = message.unwrap(); let image: PendingImageResponse = message.unwrap();
debug!("Got image {:?}", image); debug!("Got image {:?}", image);
let _ = task_source.queue(task!(process_image_response: move || { task_source.queue(task!(process_image_response: move || {
let element = element.root(); let element = element.root();
// Ignore any image response for a previous request that has been discarded. // Ignore any image response for a previous request that has been discarded.
if generation == element.generation_id() { if generation == element.generation_id() {

View file

@ -434,7 +434,6 @@ pub fn follow_hyperlink(
target_window target_window
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
}; };
} }

View file

@ -17,13 +17,13 @@ use crate::dom::performanceentry::PerformanceEntry;
use crate::dom::performanceresourcetiming::{InitiatorType, PerformanceResourceTiming}; use crate::dom::performanceresourcetiming::{InitiatorType, PerformanceResourceTiming};
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::task::TaskOnce; use crate::task::TaskOnce;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
/// An off-thread sink for async network event tasks. All such events are forwarded to /// An off-thread sink for async network event tasks. All such events are forwarded to
/// a target thread, where they are invoked on the provided context object. /// a target thread, where they are invoked on the provided context object.
pub struct NetworkListener<Listener: PreInvoke + Send + 'static> { pub struct NetworkListener<Listener: PreInvoke + Send + 'static> {
pub context: Arc<Mutex<Listener>>, pub context: Arc<Mutex<Listener>>,
pub task_source: TaskSource, pub task_source: SendableTaskSource,
} }
pub trait ResourceTimingListener { pub trait ResourceTimingListener {
@ -74,14 +74,10 @@ pub fn submit_timing_data(
impl<Listener: PreInvoke + Send + 'static> NetworkListener<Listener> { impl<Listener: PreInvoke + Send + 'static> NetworkListener<Listener> {
pub fn notify<A: Action<Listener> + Send + 'static>(&self, action: A) { pub fn notify<A: Action<Listener> + Send + 'static>(&self, action: A) {
let result = self.task_source.queue(ListenerTask { self.task_source.queue(ListenerTask {
context: self.context.clone(), context: self.context.clone(),
action, action,
}); });
if let Err(err) = result {
warn!("failed to deliver network data: {:?}", err);
}
} }
} }

View file

@ -1769,7 +1769,7 @@ fn fetch_single_module_script(
let network_listener = NetworkListener { let network_listener = NetworkListener {
context, context,
task_source: global.task_manager().networking_task_source(), task_source: global.task_manager().networking_task_source().to_sendable(),
}; };
match document { match document {
Some(document) => { Some(document) => {

View file

@ -85,7 +85,7 @@ use crate::script_module::EnsureModuleHooksInitialized;
use crate::script_thread::trace_thread; use crate::script_thread::trace_thread;
use crate::security_manager::CSPViolationReporter; use crate::security_manager::CSPViolationReporter;
use crate::task::TaskBox; use crate::task::TaskBox;
use crate::task_source::{TaskSource, TaskSourceName}; use crate::task_source::{SendableTaskSource, TaskSourceName};
static JOB_QUEUE_TRAPS: JobQueueTraps = JobQueueTraps { static JOB_QUEUE_TRAPS: JobQueueTraps = JobQueueTraps {
getIncumbentGlobal: Some(get_incumbent_global), getIncumbentGlobal: Some(get_incumbent_global),
@ -400,7 +400,7 @@ unsafe extern "C" fn promise_rejection_tracker(
event.upcast::<Event>().fire(&target, CanGc::note()); event.upcast::<Event>().fire(&target, CanGc::note());
}) })
).unwrap(); );
}, },
}; };
}) })
@ -454,8 +454,7 @@ unsafe extern "C" fn content_security_policy_allows(
global global
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.unwrap();
} }
} }
}); });
@ -529,7 +528,7 @@ pub fn notify_about_rejected_promises(global: &GlobalScope) {
} }
} }
}) })
).unwrap(); );
} }
} }
} }
@ -539,11 +538,11 @@ pub struct Runtime {
rt: RustRuntime, rt: RustRuntime,
pub microtask_queue: Rc<MicrotaskQueue>, pub microtask_queue: Rc<MicrotaskQueue>,
job_queue: *mut JobQueue, job_queue: *mut JobQueue,
networking_task_src: Option<Box<TaskSource>>, networking_task_src: Option<Box<SendableTaskSource>>,
} }
impl Runtime { impl Runtime {
/// Create a new runtime, optionally with the given [`TaskSource`] for networking. /// Create a new runtime, optionally with the given [`SendableTaskSource`] for networking.
/// ///
/// # Safety /// # Safety
/// ///
@ -553,11 +552,11 @@ impl Runtime {
/// ///
/// This, like many calls to SpiderMoney API, is unsafe. /// This, like many calls to SpiderMoney API, is unsafe.
#[allow(unsafe_code)] #[allow(unsafe_code)]
pub(crate) fn new(networking_task_source: Option<TaskSource>) -> Runtime { pub(crate) fn new(networking_task_source: Option<SendableTaskSource>) -> Runtime {
unsafe { Self::new_with_parent(None, networking_task_source) } unsafe { Self::new_with_parent(None, networking_task_source) }
} }
/// Create a new runtime, optionally with the given [`ParentRuntime`] and [`TaskSource`] /// Create a new runtime, optionally with the given [`ParentRuntime`] and [`SendableTaskSource`]
/// for networking. /// for networking.
/// ///
/// # Safety /// # Safety
@ -572,7 +571,7 @@ impl Runtime {
#[allow(unsafe_code)] #[allow(unsafe_code)]
pub(crate) unsafe fn new_with_parent( pub(crate) unsafe fn new_with_parent(
parent: Option<ParentRuntime>, parent: Option<ParentRuntime>,
networking_task_source: Option<TaskSource>, networking_task_source: Option<SendableTaskSource>,
) -> Runtime { ) -> Runtime {
LiveDOMReferences::initialize(); LiveDOMReferences::initialize();
let (cx, runtime) = if let Some(parent) = parent { let (cx, runtime) = if let Some(parent) = parent {
@ -620,7 +619,7 @@ impl Runtime {
closure: *mut c_void, closure: *mut c_void,
dispatchable: *mut JSRunnable, dispatchable: *mut JSRunnable,
) -> bool { ) -> bool {
let networking_task_src: &TaskSource = &*(closure as *mut TaskSource); let networking_task_src: &SendableTaskSource = &*(closure as *mut SendableTaskSource);
let runnable = Runnable(dispatchable); let runnable = Runnable(dispatchable);
let task = task!(dispatch_to_event_loop_message: move || { let task = task!(dispatch_to_event_loop_message: move || {
if let Some(cx) = RustRuntime::get() { if let Some(cx) = RustRuntime::get() {
@ -628,7 +627,8 @@ impl Runtime {
} }
}); });
networking_task_src.queue_unconditionally(task).is_ok() networking_task_src.queue_unconditionally(task);
true
} }
let mut networking_task_src_ptr = std::ptr::null_mut(); let mut networking_task_src_ptr = std::ptr::null_mut();

View file

@ -153,7 +153,7 @@ use crate::script_runtime::{
ThreadSafeJSContext, ThreadSafeJSContext,
}; };
use crate::task_queue::TaskQueue; use crate::task_queue::TaskQueue;
use crate::task_source::{TaskSource, TaskSourceName}; use crate::task_source::{SendableTaskSource, TaskSourceName};
use crate::{devtools, webdriver_handlers}; use crate::{devtools, webdriver_handlers};
thread_local!(static SCRIPT_THREAD_ROOT: Cell<Option<*const ScriptThread>> = const { Cell::new(None) }); thread_local!(static SCRIPT_THREAD_ROOT: Cell<Option<*const ScriptThread>> = const { Cell::new(None) });
@ -692,8 +692,7 @@ impl ScriptThread {
global global
.task_manager() .task_manager()
.dom_manipulation_task_source() .dom_manipulation_task_source()
.queue(task) .queue(task);
.expect("Enqueing navigate js task on the DOM manipulation task source failed");
} else { } else {
if let Some(ref sender) = script_thread.senders.devtools_server_sender { if let Some(ref sender) = script_thread.senders.devtools_server_sender {
let _ = sender.send(ScriptToDevtoolsControlMsg::Navigate( let _ = sender.send(ScriptToDevtoolsControlMsg::Navigate(
@ -902,7 +901,7 @@ impl ScriptThread {
let (self_sender, self_receiver) = unbounded(); let (self_sender, self_receiver) = unbounded();
let self_sender = MainThreadScriptChan(self_sender.clone()); let self_sender = MainThreadScriptChan(self_sender.clone());
let runtime = Runtime::new(Some(TaskSource { let runtime = Runtime::new(Some(SendableTaskSource {
sender: self_sender.as_boxed(), sender: self_sender.as_boxed(),
pipeline_id: state.id, pipeline_id: state.id,
name: TaskSourceName::Networking, name: TaskSourceName::Networking,
@ -1410,9 +1409,11 @@ impl ScriptThread {
// This task is empty because any new IPC messages in the ScriptThread trigger a // This task is empty because any new IPC messages in the ScriptThread trigger a
// rendering update when animations are not running. // rendering update when animations are not running.
let _realm = enter_realm(&*document); let _realm = enter_realm(&*document);
let rendering_task_source = document.window().task_manager().rendering_task_source(); document
let _ = .window()
rendering_task_source.queue_unconditionally(task!(update_the_rendering: move || { })); .task_manager()
.rendering_task_source()
.queue_unconditionally(task!(update_the_rendering: move || { }));
} }
/// Handle incoming messages from other tasks and the task queue. /// Handle incoming messages from other tasks and the task queue.

View file

@ -75,42 +75,31 @@ pub struct TaskCanceller {
impl TaskCanceller { impl TaskCanceller {
/// Returns a wrapped `task` that will be cancelled if the `TaskCanceller` says so. /// Returns a wrapped `task` that will be cancelled if the `TaskCanceller` says so.
pub(crate) fn wrap_task<T>(&self, task: T) -> impl TaskOnce pub(crate) fn wrap_task(&self, task: impl TaskOnce) -> impl TaskOnce {
where
T: TaskOnce,
{
CancellableTask { CancellableTask {
cancelled: self.cancelled.clone(), canceller: self.clone(),
inner: task, inner: task,
} }
} }
pub(crate) fn cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
} }
/// A task that can be cancelled by toggling a shared flag. /// A task that can be cancelled by toggling a shared flag.
pub struct CancellableTask<T: TaskOnce> { pub struct CancellableTask<T: TaskOnce> {
cancelled: Arc<AtomicBool>, canceller: TaskCanceller,
inner: T, inner: T,
} }
impl<T> CancellableTask<T> impl<T: TaskOnce> TaskOnce for CancellableTask<T> {
where
T: TaskOnce,
{
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
}
impl<T> TaskOnce for CancellableTask<T>
where
T: TaskOnce,
{
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
self.inner.name() self.inner.name()
} }
fn run_once(self) { fn run_once(self) {
if !self.is_cancelled() { if !self.canceller.cancelled() {
self.inner.run_once() self.inner.run_once()
} }
} }

View file

@ -4,6 +4,7 @@
use core::cell::RefCell; use core::cell::RefCell;
use core::sync::atomic::Ordering; use core::sync::atomic::Ordering;
use std::cell::Ref;
use std::collections::HashMap; use std::collections::HashMap;
use base::id::PipelineId; use base::id::PipelineId;
@ -64,13 +65,16 @@ impl TaskCancellers {
macro_rules! task_source_functions { macro_rules! task_source_functions {
($self:ident, $task_source:ident, $task_source_name:ident) => { ($self:ident, $task_source:ident, $task_source_name:ident) => {
pub(crate) fn $task_source(&$self) -> TaskSource { pub(crate) fn $task_source(&$self) -> TaskSource {
$self.task_source_for_task_source_name(TaskSourceName::$task_source_name) TaskSource {
task_manager: $self,
name: TaskSourceName::$task_source_name,
}
} }
}; };
} }
#[derive(JSTraceable, MallocSizeOf)] #[derive(JSTraceable, MallocSizeOf)]
pub struct TaskManager { pub(crate) struct TaskManager {
#[ignore_malloc_size_of = "We need to push the measurement of this down into the ScriptChan trait"] #[ignore_malloc_size_of = "We need to push the measurement of this down into the ScriptChan trait"]
sender: RefCell<Option<Box<dyn ScriptChan + Send>>>, sender: RefCell<Option<Box<dyn ScriptChan + Send>>>,
#[no_trace] #[no_trace]
@ -97,22 +101,16 @@ impl TaskManager {
} }
} }
fn task_source_for_task_source_name(&self, name: TaskSourceName) -> TaskSource { pub(crate) fn pipeline_id(&self) -> PipelineId {
let Some(sender) = self self.pipeline_id
.sender }
.borrow()
.as_ref()
.map(|sender| sender.as_boxed())
else {
unreachable!("Tried to enqueue task for DedicatedWorker while not handling a message.")
};
TaskSource { pub(crate) fn sender(&self) -> Ref<Option<Box<dyn ScriptChan + Send + 'static>>> {
sender, self.sender.borrow()
pipeline_id: self.pipeline_id, }
name,
canceller: self.cancellers.get(name), pub(crate) fn canceller(&self, name: TaskSourceName) -> TaskCanceller {
} self.cancellers.get(name)
} }
/// Update the sender for this [`TaskSource`]. This is used by dedicated workers, which only have a /// Update the sender for this [`TaskSource`]. This is used by dedicated workers, which only have a

View file

@ -3,7 +3,6 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use std::fmt; use std::fmt;
use std::result::Result;
use base::id::PipelineId; use base::id::PipelineId;
use malloc_size_of_derive::MallocSizeOf; use malloc_size_of_derive::MallocSizeOf;
@ -14,6 +13,7 @@ use crate::dom::event::{EventBubbles, EventCancelable, EventTask, SimpleEventTas
use crate::dom::eventtarget::EventTarget; use crate::dom::eventtarget::EventTarget;
use crate::script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory}; use crate::script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory};
use crate::task::{TaskCanceller, TaskOnce}; use crate::task::{TaskCanceller, TaskOnce};
use crate::task_manager::TaskManager;
/// The names of all task sources, used to differentiate TaskCancellers. Note: When adding a task /// The names of all task sources, used to differentiate TaskCancellers. Note: When adding a task
/// source, update this enum. Note: The HistoryTraversalTaskSource is not part of this, because it /// source, update this enum. Note: The HistoryTraversalTaskSource is not part of this, because it
@ -82,46 +82,40 @@ impl TaskSourceName {
} }
} }
#[derive(JSTraceable, MallocSizeOf)] pub(crate) struct TaskSource<'task_manager> {
pub(crate) struct TaskSource { pub task_manager: &'task_manager TaskManager,
#[ignore_malloc_size_of = "Need to push MallocSizeOf down into the ScriptChan trait implementations"]
pub sender: Box<dyn ScriptChan + Send + 'static>,
#[no_trace]
pub pipeline_id: PipelineId,
pub name: TaskSourceName, pub name: TaskSourceName,
pub canceller: TaskCanceller,
} }
impl TaskSource { impl TaskSource<'_> {
pub(crate) fn queue<T>(&self, task: T) -> Result<(), ()> /// Queue a task with the default canceller for this [`TaskSource`].
where pub(crate) fn queue(&self, task: impl TaskOnce + 'static) {
T: TaskOnce + 'static, let canceller = self.task_manager.canceller(self.name);
{ if canceller.cancelled() {
let msg = CommonScriptMsg::Task( return;
self.name.into(), }
Box::new(self.canceller.wrap_task(task)),
Some(self.pipeline_id), self.queue_unconditionally(canceller.wrap_task(task))
self.name,
);
self.sender.send(msg).map_err(|_| ())
} }
/// This queues a task that will not be cancelled when its associated global scope gets destroyed. /// This queues a task that will not be cancelled when its associated global scope gets destroyed.
pub(crate) fn queue_unconditionally<T>(&self, task: T) -> Result<(), ()> pub(crate) fn queue_unconditionally(&self, task: impl TaskOnce + 'static) {
where let sender = self.task_manager.sender();
T: TaskOnce + 'static, sender
{ .as_ref()
self.sender.send(CommonScriptMsg::Task( .expect("Tried to enqueue task for DedicatedWorker while not handling a message.")
self.name.into(), .send(CommonScriptMsg::Task(
Box::new(task), self.name.into(),
Some(self.pipeline_id), Box::new(task),
self.name, Some(self.task_manager.pipeline_id()),
)) self.name,
))
.expect("Tried to send a task on a task queue after shutdown.");
} }
pub(crate) fn queue_simple_event(&self, target: &EventTarget, name: Atom) { pub(crate) fn queue_simple_event(&self, target: &EventTarget, name: Atom) {
let target = Trusted::new(target); let target = Trusted::new(target);
let _ = self.queue(SimpleEventTask { target, name }); self.queue(SimpleEventTask { target, name });
} }
pub(crate) fn queue_event( pub(crate) fn queue_event(
@ -132,16 +126,75 @@ impl TaskSource {
cancelable: EventCancelable, cancelable: EventCancelable,
) { ) {
let target = Trusted::new(target); let target = Trusted::new(target);
let _ = self.queue(EventTask { self.queue(EventTask {
target, target,
name, name,
bubbles, bubbles,
cancelable, cancelable,
}); });
} }
/// Convert this [`TaskSource`] into a [`SendableTaskSource`] suitable for sending
/// to different threads.
pub(crate) fn to_sendable(&self) -> SendableTaskSource {
let sender = self.task_manager.sender();
let sender = sender
.as_ref()
.expect("Tried to enqueue task for DedicatedWorker while not handling a message.")
.as_boxed();
SendableTaskSource {
sender,
pipeline_id: self.task_manager.pipeline_id(),
name: self.name,
canceller: self.task_manager.canceller(self.name),
}
}
} }
impl Clone for TaskSource { impl<'task_manager> From<TaskSource<'task_manager>> for SendableTaskSource {
fn from(task_source: TaskSource<'task_manager>) -> Self {
task_source.to_sendable()
}
}
#[derive(JSTraceable, MallocSizeOf)]
pub(crate) struct SendableTaskSource {
#[ignore_malloc_size_of = "Need to push MallocSizeOf down into the ScriptChan trait implementations"]
pub sender: Box<dyn ScriptChan + Send + 'static>,
#[no_trace]
pub pipeline_id: PipelineId,
pub name: TaskSourceName,
pub canceller: TaskCanceller,
}
impl SendableTaskSource {
pub(crate) fn queue(&self, task: impl TaskOnce + 'static) {
if !self.canceller.cancelled() {
self.queue_unconditionally(self.canceller.wrap_task(task))
}
}
/// This queues a task that will not be cancelled when its associated global scope gets destroyed.
pub(crate) fn queue_unconditionally(&self, task: impl TaskOnce + 'static) {
if self
.sender
.send(CommonScriptMsg::Task(
self.name.into(),
Box::new(task),
Some(self.pipeline_id),
self.name,
))
.is_err()
{
warn!(
"Could not queue non-main-thread task {:?}. Likely tried to queue during shutdown.",
self.name
);
}
}
}
impl Clone for SendableTaskSource {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
sender: self.sender.as_boxed(), sender: self.sender.as_boxed(),
@ -152,7 +205,7 @@ impl Clone for TaskSource {
} }
} }
impl fmt::Debug for TaskSource { impl fmt::Debug for SendableTaskSource {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}(...)", self.name) write!(f, "{:?}(...)", self.name)
} }

View file

@ -34,7 +34,7 @@ use crate::dom::xmlhttprequest::XHRTimeoutCallback;
use crate::script_module::ScriptFetchOptions; use crate::script_module::ScriptFetchOptions;
use crate::script_runtime::CanGc; use crate::script_runtime::CanGc;
use crate::script_thread::ScriptThread; use crate::script_thread::ScriptThread;
use crate::task_source::TaskSource; use crate::task_source::SendableTaskSource;
#[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, MallocSizeOf, Ord, PartialEq, PartialOrd)] #[derive(Clone, Copy, Debug, Eq, Hash, JSTraceable, MallocSizeOf, Ord, PartialEq, PartialOrd)]
pub struct OneshotTimerHandle(i32); pub struct OneshotTimerHandle(i32);
@ -285,7 +285,11 @@ impl OneshotTimers {
let callback = TimerListener { let callback = TimerListener {
context: Trusted::new(&*self.global_scope), context: Trusted::new(&*self.global_scope),
task_source: self.global_scope.task_manager().timer_task_source(), task_source: self
.global_scope
.task_manager()
.timer_task_source()
.to_sendable(),
} }
.into_callback(); .into_callback();
@ -584,7 +588,7 @@ impl JsTimerTask {
/// A wrapper between timer events coming in over IPC, and the event-loop. /// A wrapper between timer events coming in over IPC, and the event-loop.
#[derive(Clone)] #[derive(Clone)]
struct TimerListener { struct TimerListener {
task_source: TaskSource, task_source: SendableTaskSource,
context: Trusted<GlobalScope>, context: Trusted<GlobalScope>,
} }
@ -595,7 +599,7 @@ impl TimerListener {
let context = self.context.clone(); let context = self.context.clone();
// Step 18, queue a task, // Step 18, queue a task,
// https://html.spec.whatwg.org/multipage/#timer-initialisation-steps // https://html.spec.whatwg.org/multipage/#timer-initialisation-steps
let _ = self.task_source.queue(task!(timer_event: move || { self.task_source.queue(task!(timer_event: move || {
let global = context.root(); let global = context.root();
let TimerEvent(source, id) = event; let TimerEvent(source, id) = event;
match source { match source {