Split media fetch context and fetch listener to prevent deadlocks

This commit is contained in:
Fernando Jiménez Moreno 2019-01-11 13:58:48 +01:00
parent da32c8488c
commit 9a18074b88
2 changed files with 117 additions and 99 deletions

View file

@ -488,7 +488,7 @@ unsafe_no_jsmanaged_fields!(Mutex<MediaFrameRenderer>);
unsafe_no_jsmanaged_fields!(RenderApiSender); unsafe_no_jsmanaged_fields!(RenderApiSender);
unsafe_no_jsmanaged_fields!(ResourceFetchTiming); unsafe_no_jsmanaged_fields!(ResourceFetchTiming);
unsafe_no_jsmanaged_fields!(Timespec); unsafe_no_jsmanaged_fields!(Timespec);
unsafe_no_jsmanaged_fields!(Mutex<HTMLMediaElementFetchContext>); unsafe_no_jsmanaged_fields!(HTMLMediaElementFetchContext);
unsafe impl<'a> JSTraceable for &'a str { unsafe impl<'a> JSTraceable for &'a str {
#[inline] #[inline]

View file

@ -204,8 +204,7 @@ pub struct HTMLMediaElement {
#[ignore_malloc_size_of = "Defined in time"] #[ignore_malloc_size_of = "Defined in time"]
next_timeupdate_event: Cell<Timespec>, next_timeupdate_event: Cell<Timespec>,
/// Latest fetch request context. /// Latest fetch request context.
#[ignore_malloc_size_of = "Arc"] current_fetch_context: DomRefCell<Option<HTMLMediaElementFetchContext>>,
current_fetch_request: DomRefCell<Option<Arc<Mutex<HTMLMediaElementFetchContext>>>>,
} }
/// <https://html.spec.whatwg.org/multipage/#dom-media-networkstate> /// <https://html.spec.whatwg.org/multipage/#dom-media-networkstate>
@ -262,7 +261,7 @@ impl HTMLMediaElement {
played: Rc::new(DomRefCell::new(TimeRangesContainer::new())), played: Rc::new(DomRefCell::new(TimeRangesContainer::new())),
text_tracks_list: Default::default(), text_tracks_list: Default::default(),
next_timeupdate_event: Cell::new(time::get_time() + Duration::milliseconds(250)), next_timeupdate_event: Cell::new(time::get_time() + Duration::milliseconds(250)),
current_fetch_request: DomRefCell::new(None), current_fetch_context: DomRefCell::new(None),
} }
} }
@ -665,34 +664,31 @@ impl HTMLMediaElement {
..RequestInit::default() ..RequestInit::default()
}; };
let mut current_fetch_request = self.current_fetch_request.borrow_mut(); let mut current_fetch_context = self.current_fetch_context.borrow_mut();
if let Some(ref current_fetch_request) = *current_fetch_request { if let Some(ref mut current_fetch_context) = *current_fetch_context {
current_fetch_request current_fetch_context.cancel(CancelReason::Overridden);
.lock()
.unwrap()
.cancel(CancelReason::Overridden);
} }
let (context, cancel_receiver) = HTMLMediaElementFetchContext::new( let (fetch_context, cancel_receiver) = HTMLMediaElementFetchContext::new();
*current_fetch_context = Some(fetch_context);
let fetch_listener = Arc::new(Mutex::new(HTMLMediaElementFetchListener::new(
self, self,
self.resource_url.borrow().as_ref().unwrap().clone(), self.resource_url.borrow().as_ref().unwrap().clone(),
offset.unwrap_or(0), offset.unwrap_or(0),
); )));
let context = Arc::new(Mutex::new(context));
*current_fetch_request = Some(context.clone());
let (action_sender, action_receiver) = ipc::channel().unwrap(); let (action_sender, action_receiver) = ipc::channel().unwrap();
let window = window_from_node(self); let window = window_from_node(self);
let (task_source, canceller) = window let (task_source, canceller) = window
.task_manager() .task_manager()
.networking_task_source_with_canceller(); .networking_task_source_with_canceller();
let listener = NetworkListener { let network_listener = NetworkListener {
context, context: fetch_listener,
task_source, task_source,
canceller: Some(canceller), canceller: Some(canceller),
}; };
ROUTER.add_route( ROUTER.add_route(
action_receiver.to_opaque(), action_receiver.to_opaque(),
Box::new(move |message| { Box::new(move |message| {
listener.notify_fetch(message.to().unwrap()); network_listener.notify_fetch(message.to().unwrap());
}), }),
); );
let global = self.global(); let global = self.global();
@ -887,11 +883,8 @@ impl HTMLMediaElement {
task_source.queue_simple_event(self.upcast(), atom!("emptied"), &window); task_source.queue_simple_event(self.upcast(), atom!("emptied"), &window);
// Step 6.2. // Step 6.2.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() { if let Some(ref mut current_fetch_context) = *self.current_fetch_context.borrow_mut() {
current_fetch_request current_fetch_context.cancel(CancelReason::Error);
.lock()
.unwrap()
.cancel(CancelReason::Error);
} }
// Step 6.3. // Step 6.3.
@ -1045,8 +1038,8 @@ impl HTMLMediaElement {
// XXX(ferjm) seekable attribute: we need to get the information about // XXX(ferjm) seekable attribute: we need to get the information about
// what's been decoded and buffered so far from servo-media // what's been decoded and buffered so far from servo-media
// and add the seekable attribute as a TimeRange. // and add the seekable attribute as a TimeRange.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() { if let Some(ref current_fetch_context) = *self.current_fetch_context.borrow() {
if !current_fetch_request.lock().unwrap().is_seekable() { if !current_fetch_context.is_seekable() {
self.seeking.set(false); self.seeking.set(false);
return; return;
} }
@ -1208,9 +1201,8 @@ impl HTMLMediaElement {
// Otherwise, if we have no request and the previous request was // Otherwise, if we have no request and the previous request was
// cancelled because we got an EnoughData event, we restart // cancelled because we got an EnoughData event, we restart
// fetching where we left. // fetching where we left.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() { if let Some(ref current_fetch_context) = *self.current_fetch_context.borrow() {
let current_fetch_request = current_fetch_request.lock().unwrap(); match current_fetch_context.cancel_reason() {
match current_fetch_request.cancel_reason() {
Some(ref reason) if *reason == CancelReason::Backoff => { Some(ref reason) if *reason == CancelReason::Backoff => {
// XXX(ferjm) Ideally we should just create a fetch request from // XXX(ferjm) Ideally we should just create a fetch request from
// where we left. But keeping track of the exact next byte that the // where we left. But keeping track of the exact next byte that the
@ -1228,10 +1220,11 @@ impl HTMLMediaElement {
// bytes, so we cancel the ongoing fetch request iff we are able // bytes, so we cancel the ongoing fetch request iff we are able
// to restart it from where we left. Otherwise, we continue the // to restart it from where we left. Otherwise, we continue the
// current fetch request, assuming that some frames will be dropped. // current fetch request, assuming that some frames will be dropped.
if let Some(ref current_fetch_request) = *self.current_fetch_request.borrow() { if let Some(ref mut current_fetch_context) =
let mut current_fetch_request = current_fetch_request.lock().unwrap(); *self.current_fetch_context.borrow_mut()
if current_fetch_request.is_seekable() { {
current_fetch_request.cancel(CancelReason::Backoff); if current_fetch_context.is_seekable() {
current_fetch_context.cancel(CancelReason::Backoff);
} }
} }
}, },
@ -1685,7 +1678,7 @@ enum Resource {
} }
/// Indicates the reason why a fetch request was cancelled. /// Indicates the reason why a fetch request was cancelled.
#[derive(Debug, PartialEq)] #[derive(Debug, MallocSizeOf, PartialEq)]
enum CancelReason { enum CancelReason {
/// We were asked to stop pushing data to the player. /// We were asked to stop pushing data to the player.
Backoff, Backoff,
@ -1695,7 +1688,53 @@ enum CancelReason {
Overridden, Overridden,
} }
#[derive(MallocSizeOf)]
pub struct HTMLMediaElementFetchContext { pub struct HTMLMediaElementFetchContext {
/// Some if the request has been cancelled.
cancel_reason: Option<CancelReason>,
/// Indicates whether the fetched stream is seekable.
is_seekable: bool,
/// Fetch canceller. Allows cancelling the current fetch request by
/// manually calling its .cancel() method or automatically on Drop.
fetch_canceller: FetchCanceller,
}
impl HTMLMediaElementFetchContext {
fn new() -> (HTMLMediaElementFetchContext, ipc::IpcReceiver<()>) {
let mut fetch_canceller = FetchCanceller::new();
let cancel_receiver = fetch_canceller.initialize();
(
HTMLMediaElementFetchContext {
cancel_reason: None,
is_seekable: false,
fetch_canceller,
},
cancel_receiver,
)
}
fn is_seekable(&self) -> bool {
self.is_seekable
}
fn set_seekable(&mut self, seekable: bool) {
self.is_seekable = seekable;
}
fn cancel(&mut self, reason: CancelReason) {
if self.cancel_reason.is_some() {
return;
}
self.cancel_reason = Some(reason);
self.fetch_canceller.cancel();
}
fn cancel_reason(&self) -> &Option<CancelReason> {
&self.cancel_reason
}
}
struct HTMLMediaElementFetchListener {
/// The element that initiated the request. /// The element that initiated the request.
elem: Trusted<HTMLMediaElement>, elem: Trusted<HTMLMediaElement>,
/// The response metadata received to date. /// The response metadata received to date.
@ -1704,8 +1743,6 @@ pub struct HTMLMediaElementFetchContext {
generation_id: u32, generation_id: u32,
/// Time of last progress notification. /// Time of last progress notification.
next_progress_event: Timespec, next_progress_event: Timespec,
/// Some if the request has been cancelled.
cancel_reason: Option<CancelReason>,
/// Timing data for this resource. /// Timing data for this resource.
resource_timing: ResourceFetchTiming, resource_timing: ResourceFetchTiming,
/// Url for the resource. /// Url for the resource.
@ -1718,15 +1755,10 @@ pub struct HTMLMediaElementFetchContext {
/// EnoughData event uses this value to restart the download from /// EnoughData event uses this value to restart the download from
/// the last fetched position. /// the last fetched position.
latest_fetched_content: u64, latest_fetched_content: u64,
/// Indicates whether the stream is seekable.
is_seekable: bool,
/// Fetch canceller. Allows cancelling the current fetch request by
/// manually calling its .cancel() method or automatically on Drop.
fetch_canceller: FetchCanceller,
} }
// https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list // https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list
impl FetchResponseListener for HTMLMediaElementFetchContext { impl FetchResponseListener for HTMLMediaElementFetchListener {
fn process_request_body(&mut self) {} fn process_request_body(&mut self) {}
fn process_request_eof(&mut self) {} fn process_request_eof(&mut self) {}
@ -1780,7 +1812,9 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
if is_seekable { if is_seekable {
// The server supports range requests, // The server supports range requests,
self.is_seekable = true; if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_context.set_seekable(true);
}
// and we can safely set the type of stream to Seekable. // and we can safely set the type of stream to Seekable.
if let Err(e) = elem.player.set_stream_type(StreamType::Seekable) { if let Err(e) = elem.player.set_stream_type(StreamType::Seekable) {
warn!("Could not set stream type to Seekable. {:?}", e); warn!("Could not set stream type to Seekable. {:?}", e);
@ -1791,18 +1825,25 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
if !status_is_ok { if !status_is_ok {
// Ensure that the element doesn't receive any further notifications // Ensure that the element doesn't receive any further notifications
// of the aborted fetch. // of the aborted fetch.
self.cancel(CancelReason::Error); if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_context.cancel(CancelReason::Error);
}
elem.queue_dedicated_media_source_failure_steps(); elem.queue_dedicated_media_source_failure_steps();
} }
} }
fn process_response_chunk(&mut self, payload: Vec<u8>) { fn process_response_chunk(&mut self, payload: Vec<u8>) {
let elem = self.elem.root(); let elem = self.elem.root();
if self.cancel_reason.is_some() || elem.generation_id.get() != self.generation_id { // If an error was received previously or if we triggered a new fetch request,
// An error was received previously or we triggered a new fetch request, // we skip processing the payload.
// skip processing the payload. if elem.generation_id.get() != self.generation_id {
return; return;
} }
if let Some(ref current_fetch_context) = *elem.current_fetch_context.borrow() {
if current_fetch_context.cancel_reason().is_some() {
return;
}
}
let payload_len = payload.len() as u64; let payload_len = payload.len() as u64;
@ -1813,7 +1854,13 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// the current request. Otherwise, we continue the request // the current request. Otherwise, we continue the request
// assuming that we may drop some frames. // assuming that we may drop some frames.
match e { match e {
PlayerError::EnoughData => self.cancel(CancelReason::Backoff), PlayerError::EnoughData => {
if let Some(ref mut current_fetch_context) =
*elem.current_fetch_context.borrow_mut()
{
current_fetch_context.cancel(CancelReason::Backoff);
}
},
_ => (), _ => (),
} }
warn!("Could not push input data to player {:?}", e); warn!("Could not push input data to player {:?}", e);
@ -1837,15 +1884,17 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list // https://html.spec.whatwg.org/multipage/#media-data-processing-steps-list
fn process_response_eof(&mut self, status: Result<ResourceFetchTiming, NetworkError>) { fn process_response_eof(&mut self, status: Result<ResourceFetchTiming, NetworkError>) {
let elem = self.elem.root(); let elem = self.elem.root();
// If an error was previously received and no new fetch request was triggered,
// we skip processing the payload and notify the media backend that we are done
// pushing data.
if elem.generation_id.get() == self.generation_id { if elem.generation_id.get() == self.generation_id {
if let Some(CancelReason::Error) = self.cancel_reason { if let Some(ref current_fetch_context) = *elem.current_fetch_context.borrow() {
// An error was received previously and no new fetch request was triggered, so if let Some(CancelReason::Error) = current_fetch_context.cancel_reason() {
// we skip processing the payload and notify the media backend that we are done if let Err(e) = elem.player.end_of_stream() {
// pushing data. warn!("Could not signal EOS to player {:?}", e);
if let Err(e) = elem.player.end_of_stream() { }
warn!("Could not signal EOS to player {:?}", e); return;
} }
return;
} }
} }
@ -1868,11 +1917,8 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
// => "If the connection is interrupted after some media data has been received..." // => "If the connection is interrupted after some media data has been received..."
else if elem.ready_state.get() != ReadyState::HaveNothing { else if elem.ready_state.get() != ReadyState::HaveNothing {
// Step 1 // Step 1
if let Some(ref current_fetch_request) = *elem.current_fetch_request.borrow() { if let Some(ref mut current_fetch_context) = *elem.current_fetch_context.borrow_mut() {
current_fetch_request current_fetch_context.cancel(CancelReason::Error);
.lock()
.unwrap()
.cancel(CancelReason::Error);
} }
// Step 2 // Step 2
@ -1908,7 +1954,7 @@ impl FetchResponseListener for HTMLMediaElementFetchContext {
} }
} }
impl ResourceTimingListener for HTMLMediaElementFetchContext { impl ResourceTimingListener for HTMLMediaElementFetchListener {
fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) { fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
let initiator_type = InitiatorType::LocalName( let initiator_type = InitiatorType::LocalName(
self.elem self.elem
@ -1925,52 +1971,24 @@ impl ResourceTimingListener for HTMLMediaElementFetchContext {
} }
} }
impl PreInvoke for HTMLMediaElementFetchContext { impl PreInvoke for HTMLMediaElementFetchListener {
fn should_invoke(&self) -> bool { fn should_invoke(&self) -> bool {
//TODO: finish_load needs to run at some point if the generation changes. //TODO: finish_load needs to run at some point if the generation changes.
self.elem.root().generation_id.get() == self.generation_id self.elem.root().generation_id.get() == self.generation_id
} }
} }
impl HTMLMediaElementFetchContext { impl HTMLMediaElementFetchListener {
fn new( fn new(elem: &HTMLMediaElement, url: ServoUrl, offset: u64) -> Self {
elem: &HTMLMediaElement, Self {
url: ServoUrl, elem: Trusted::new(elem),
offset: u64, metadata: None,
) -> (HTMLMediaElementFetchContext, ipc::IpcReceiver<()>) { generation_id: elem.generation_id.get(),
let mut fetch_canceller = FetchCanceller::new(); next_progress_event: time::get_time() + Duration::milliseconds(350),
let cancel_receiver = fetch_canceller.initialize(); resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
( url,
HTMLMediaElementFetchContext { expected_content_length: None,
elem: Trusted::new(elem), latest_fetched_content: offset,
metadata: None,
generation_id: elem.generation_id.get(),
next_progress_event: time::get_time() + Duration::milliseconds(350),
cancel_reason: None,
resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
url,
expected_content_length: None,
latest_fetched_content: offset,
is_seekable: false,
fetch_canceller,
},
cancel_receiver,
)
}
fn is_seekable(&self) -> bool {
self.is_seekable
}
fn cancel(&mut self, reason: CancelReason) {
if self.cancel_reason.is_some() {
return;
} }
self.cancel_reason = Some(reason);
self.fetch_canceller.cancel();
}
fn cancel_reason(&self) -> &Option<CancelReason> {
&self.cancel_reason
} }
} }