Use a timer callback when re-establishing a connection

This commit is contained in:
Keith Yeung 2016-10-28 03:09:05 -07:00
parent 0b32b624a7
commit a5c2c0ba4b
2 changed files with 36 additions and 67 deletions

View file

@ -17,6 +17,7 @@ use dom::globalscope::GlobalScope;
use dom::messageevent::MessageEvent;
use encoding::Encoding;
use encoding::all::UTF_8;
use euclid::length::Length;
use hyper::header::{Accept, qitem};
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
@ -24,7 +25,7 @@ use js::conversions::ToJSValConvertible;
use js::jsapi::JSAutoCompartment;
use js::jsval::UndefinedValue;
use mime::{Mime, TopLevel, SubLevel};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
use net_traits::request::{RequestInit, RequestMode};
use network_listener::{NetworkListener, PreInvoke};
@ -34,10 +35,8 @@ use std::cell::Cell;
use std::mem;
use std::str::{Chars, FromStr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, channel};
use std::thread;
use std::time::Duration;
use task_source::TaskSource;
use timers::OneshotTimerCallback;
use url::Url;
header! { (LastEventId, "Last-Event-ID") => [String] }
@ -78,6 +77,8 @@ enum ParserState {
struct EventSourceContext {
event_source: Trusted<EventSource>,
gen_id: GenerationId,
action_sender: ipc::IpcSender<FetchResponseMsg>,
parser_state: ParserState,
field: String,
value: String,
@ -114,36 +115,16 @@ impl EventSourceContext {
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn reestablish_the_connection(&self) {
let event_source = self.event_source.root();
let (sender, receiver) = channel();
if self.gen_id != event_source.generation_id.get() {
return;
}
// Step 1
let runnable = box ReestablishConnectionRunnable {
event_source: self.event_source.clone(),
done_chan: sender
action_sender: self.action_sender.clone()
};
if self.gen_id != self.event_source.root().generation_id.get() {
return;
}
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
// Step 2
thread::sleep(Duration::from_millis(event_source.reconnection_time.get()));
// TODO Step 3: Optionally wait some more
// Step 4
if self.gen_id != self.event_source.root().generation_id.get() {
return;
}
let _ = receiver.recv();
// Step 5
let runnable = box RefetchRequestRunnable {
event_source: self.event_source.clone(),
gen_id: self.gen_id,
event_type: self.event_type.clone(),
data: self.data.clone(),
last_event_id: self.last_event_id.clone(),
};
if self.gen_id != self.event_source.root().generation_id.get() {
return;
}
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
}
@ -317,7 +298,7 @@ impl FetchResponseListener for EventSourceContext {
}
fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
self.reestablish_the_connection();
}
}
@ -394,9 +375,12 @@ impl EventSource {
// Step 12
*ev.request.borrow_mut() = Some(request.clone());
// Step 14
let (action_sender, action_receiver) = ipc::channel().unwrap();
let context = EventSourceContext {
event_source: Trusted::new(&ev),
gen_id: ev.generation_id.get(),
action_sender: action_sender.clone(),
parser_state: ParserState::Eol,
field: String::new(),
value: String::new(),
@ -411,7 +395,6 @@ impl EventSource {
task_source: global.networking_task_source(),
wrapper: Some(global.get_runnable_wrapper())
};
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify_fetch(message.to().unwrap());
});
@ -490,7 +473,7 @@ impl Runnable for FailConnectionRunnable {
pub struct ReestablishConnectionRunnable {
event_source: Trusted<EventSource>,
done_chan: Sender<()>
action_sender: ipc::IpcSender<FetchResponseMsg>,
}
impl Runnable for ReestablishConnectionRunnable {
@ -501,31 +484,35 @@ impl Runnable for ReestablishConnectionRunnable {
let event_source = self.event_source.root();
// Step 1.1
if event_source.ready_state.get() == ReadyState::Closed {
self.done_chan.send(()).unwrap();
return;
}
// Step 1.2
event_source.ready_state.set(ReadyState::Connecting);
// Step 1.3
event_source.upcast::<EventTarget>().fire_event(atom!("error"));
self.done_chan.send(()).unwrap();
// Step 2
let duration = Length::new(event_source.reconnection_time.get());
// TODO Step 3: Optionally wait some more
// Steps 4-5
let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback {
event_source: self.event_source.clone(),
action_sender: self.action_sender.clone()
});
let _ = event_source.global().schedule_callback(callback, duration);
}
}
pub struct RefetchRequestRunnable {
#[derive(JSTraceable, HeapSizeOf)]
pub struct EventSourceTimeoutCallback {
#[ignore_heap_size_of = "Because it is non-owning"]
event_source: Trusted<EventSource>,
gen_id: GenerationId,
event_type: String,
data: String,
last_event_id: String,
#[ignore_heap_size_of = "Because it is non-owning"]
action_sender: ipc::IpcSender<FetchResponseMsg>,
}
impl Runnable for RefetchRequestRunnable {
fn name(&self) -> &'static str { "EventSource RefetchRequestRunnable" }
impl EventSourceTimeoutCallback {
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn handler(self: Box<RefetchRequestRunnable>) {
pub fn invoke(self) {
let event_source = self.event_source.root();
let global = event_source.global();
// Step 5.1
@ -539,28 +526,7 @@ impl Runnable for RefetchRequestRunnable {
request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
}
// Step 5.4
let context = EventSourceContext {
event_source: self.event_source.clone(),
gen_id: self.gen_id,
parser_state: ParserState::Eol,
field: String::new(),
value: String::new(),
origin: String::new(),
event_type: self.event_type.clone(),
data: self.data.clone(),
last_event_id: self.last_event_id.clone()
};
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
task_source: global.networking_task_source(),
wrapper: Some(global.get_runnable_wrapper())
};
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify_fetch(message.to().unwrap());
});
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
}
}

View file

@ -7,6 +7,7 @@ use dom::bindings::cell::DOMRefCell;
use dom::bindings::codegen::Bindings::FunctionBinding::Function;
use dom::bindings::reflector::Reflectable;
use dom::bindings::str::DOMString;
use dom::eventsource::EventSourceTimeoutCallback;
use dom::globalscope::GlobalScope;
use dom::testbinding::TestBindingCallback;
use dom::xmlhttprequest::XHRTimeoutCallback;
@ -67,6 +68,7 @@ struct OneshotTimer {
#[derive(JSTraceable, HeapSizeOf)]
pub enum OneshotTimerCallback {
XhrTimeout(XHRTimeoutCallback),
EventSourceTimeout(EventSourceTimeoutCallback),
JsTimer(JsTimerTask),
TestBindingCallback(TestBindingCallback),
}
@ -75,6 +77,7 @@ impl OneshotTimerCallback {
fn invoke<T: Reflectable>(self, this: &T, js_timers: &JsTimers) {
match self {
OneshotTimerCallback::XhrTimeout(callback) => callback.invoke(),
OneshotTimerCallback::EventSourceTimeout(callback) => callback.invoke(),
OneshotTimerCallback::JsTimer(task) => task.invoke(this, js_timers),
OneshotTimerCallback::TestBindingCallback(callback) => callback.invoke(),
}