Implement reestablishing the connection for EventSource

And remove unneeded fields in EventSourceContext
This commit is contained in:
Keith Yeung 2016-10-15 19:25:31 -07:00
parent 9afc17cd3a
commit 2924726b0a
2 changed files with 102 additions and 13 deletions

View file

@ -9,7 +9,7 @@ use dom::bindings::error::{Error, Fallible};
use dom::bindings::inheritance::Castable;
use dom::bindings::js::Root;
use dom::bindings::refcounted::Trusted;
use dom::bindings::reflector::reflect_dom_object;
use dom::bindings::reflector::{Reflectable, reflect_dom_object};
use dom::bindings::str::DOMString;
use dom::eventtarget::EventTarget;
use dom::globalscope::GlobalScope;
@ -21,13 +21,15 @@ use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkE
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
use net_traits::request::{RequestInit, RequestMode};
use network_listener::{NetworkListener, PreInvoke};
use script_thread::{Runnable, RunnableWrapper};
use script_thread::Runnable;
use std::cell::Cell;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, channel};
use task_source::TaskSource;
use task_source::networking::NetworkingTaskSource;
use url::Url;
header! { (LastEventId, "Last-Event-ID") => [String] }
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
/// https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate
enum ReadyState {
@ -47,24 +49,43 @@ pub struct EventSource {
}
struct EventSourceContext {
event_source: Trusted<EventSource>,
networking_task_source: NetworkingTaskSource,
wrapper: RunnableWrapper
event_source: Trusted<EventSource>
}
impl EventSourceContext {
fn announce_the_connection(&self) {
let event_source = self.event_source.root();
let runnable = box AnnounceConnectionRunnable {
event_source: self.event_source.clone()
};
let _ = self.networking_task_source.queue_with_wrapper(runnable, &self.wrapper);
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
}
fn fail_the_connection(&self) {
let event_source = self.event_source.root();
let runnable = box FailConnectionRunnable {
event_source: self.event_source.clone()
};
let _ = self.networking_task_source.queue_with_wrapper(runnable, &self.wrapper);
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
}
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn reestablish_the_connection(&self) {
let event_source = self.event_source.root();
let (sender, receiver) = channel();
// Step 1
let runnable = box ReestablishConnectionRunnable {
event_source: self.event_source.clone(),
done_chan: sender
};
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
// Step 4
let _ = receiver.recv();
// Step 5
let runnable = box RefetchRequestRunnable {
event_source: self.event_source.clone(),
};
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
}
}
@ -94,8 +115,7 @@ impl FetchResponseListener for EventSourceContext {
}
}
Err(_) => {
// FIXME: Fail the connection for now, but it should really attempt to re-establish
self.fail_the_connection();
self.reestablish_the_connection();
}
}
}
@ -129,6 +149,14 @@ impl EventSource {
Wrap)
}
pub fn request(&self) -> RequestInit {
self.request.borrow().clone().unwrap()
}
pub fn last_event_id(&self) -> DOMString {
self.last_event_id.borrow().clone()
}
pub fn Constructor(global: &GlobalScope,
url: DOMString,
event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
@ -174,9 +202,7 @@ impl EventSource {
*ev.request.borrow_mut() = Some(request.clone());
// Step 14
let context = EventSourceContext {
event_source: Trusted::new(&ev),
networking_task_source: global.networking_task_source(),
wrapper: global.get_runnable_wrapper()
event_source: Trusted::new(&ev)
};
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
@ -258,3 +284,65 @@ impl Runnable for FailConnectionRunnable {
}
}
}
pub struct ReestablishConnectionRunnable {
event_source: Trusted<EventSource>,
done_chan: Sender<()>
}
impl Runnable for ReestablishConnectionRunnable {
fn name(&self) -> &'static str { "EventSource ReestablishConnectionRunnable" }
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn handler(self: Box<ReestablishConnectionRunnable>) {
let event_source = self.event_source.root();
// Step 1.1
if event_source.ready_state.get() == ReadyState::Closed {
self.done_chan.send(()).unwrap();
return;
}
// Step 1.2
event_source.ready_state.set(ReadyState::Connecting);
// Step 1.3
event_source.upcast::<EventTarget>().fire_event(atom!("error"));
self.done_chan.send(()).unwrap();
}
}
pub struct RefetchRequestRunnable {
event_source: Trusted<EventSource>,
}
impl Runnable for RefetchRequestRunnable {
fn name(&self) -> &'static str { "EventSource RefetchRequestRunnable" }
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn handler(self: Box<RefetchRequestRunnable>) {
let event_source = self.event_source.root();
let global = event_source.global();
// Step 5.1
if event_source.ready_state.get() != ReadyState::Connecting {
return;
}
// Step 5.2
let mut request = event_source.request();
// Step 5.3
if !event_source.last_event_id().is_empty() {
request.headers.set(LastEventId(String::from(event_source.last_event_id())));
}
// Step 5.4
let context = EventSourceContext {
event_source: self.event_source.clone()
};
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
task_source: global.networking_task_source(),
wrapper: Some(global.get_runnable_wrapper())
};
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify_fetch(message.to().unwrap());
});
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
}
}