diff --git a/components/script/dom/eventsource.rs b/components/script/dom/eventsource.rs index 2bc10f244ac..2eabfcbc8b0 100644 --- a/components/script/dom/eventsource.rs +++ b/components/script/dom/eventsource.rs @@ -17,6 +17,7 @@ use dom::globalscope::GlobalScope; use dom::messageevent::MessageEvent; use encoding::Encoding; use encoding::all::UTF_8; +use euclid::length::Length; use hyper::header::{Accept, qitem}; use ipc_channel::ipc; use ipc_channel::router::ROUTER; @@ -24,7 +25,7 @@ use js::conversions::ToJSValConvertible; use js::jsapi::JSAutoCompartment; use js::jsval::UndefinedValue; use mime::{Mime, TopLevel, SubLevel}; -use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError}; +use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError}; use net_traits::request::{CacheMode, CorsSettings, CredentialsMode}; use net_traits::request::{RequestInit, RequestMode}; use network_listener::{NetworkListener, PreInvoke}; @@ -34,10 +35,8 @@ use std::cell::Cell; use std::mem; use std::str::{Chars, FromStr}; use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{Sender, channel}; -use std::thread; -use std::time::Duration; use task_source::TaskSource; +use timers::OneshotTimerCallback; use url::Url; header! { (LastEventId, "Last-Event-ID") => [String] } @@ -78,6 +77,8 @@ enum ParserState { struct EventSourceContext { event_source: Trusted, gen_id: GenerationId, + action_sender: ipc::IpcSender, + parser_state: ParserState, field: String, value: String, @@ -114,36 +115,16 @@ impl EventSourceContext { // https://html.spec.whatwg.org/multipage/#reestablish-the-connection fn reestablish_the_connection(&self) { let event_source = self.event_source.root(); - let (sender, receiver) = channel(); + + if self.gen_id != event_source.generation_id.get() { + return; + } + // Step 1 let runnable = box ReestablishConnectionRunnable { event_source: self.event_source.clone(), - done_chan: sender + action_sender: self.action_sender.clone() }; - if self.gen_id != self.event_source.root().generation_id.get() { - return; - } - let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); - // Step 2 - thread::sleep(Duration::from_millis(event_source.reconnection_time.get())); - // TODO Step 3: Optionally wait some more - // Step 4 - if self.gen_id != self.event_source.root().generation_id.get() { - return; - } - let _ = receiver.recv(); - // Step 5 - let runnable = box RefetchRequestRunnable { - event_source: self.event_source.clone(), - gen_id: self.gen_id, - - event_type: self.event_type.clone(), - data: self.data.clone(), - last_event_id: self.last_event_id.clone(), - }; - if self.gen_id != self.event_source.root().generation_id.get() { - return; - } let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global()); } @@ -317,7 +298,7 @@ impl FetchResponseListener for EventSourceContext { } fn process_response_eof(&mut self, _response: Result<(), NetworkError>) { - + self.reestablish_the_connection(); } } @@ -394,9 +375,12 @@ impl EventSource { // Step 12 *ev.request.borrow_mut() = Some(request.clone()); // Step 14 + let (action_sender, action_receiver) = ipc::channel().unwrap(); let context = EventSourceContext { event_source: Trusted::new(&ev), gen_id: ev.generation_id.get(), + action_sender: action_sender.clone(), + parser_state: ParserState::Eol, field: String::new(), value: String::new(), @@ -411,7 +395,6 @@ impl EventSource { task_source: global.networking_task_source(), wrapper: Some(global.get_runnable_wrapper()) }; - let (action_sender, action_receiver) = ipc::channel().unwrap(); ROUTER.add_route(action_receiver.to_opaque(), box move |message| { listener.notify_fetch(message.to().unwrap()); }); @@ -490,7 +473,7 @@ impl Runnable for FailConnectionRunnable { pub struct ReestablishConnectionRunnable { event_source: Trusted, - done_chan: Sender<()> + action_sender: ipc::IpcSender, } impl Runnable for ReestablishConnectionRunnable { @@ -501,31 +484,35 @@ impl Runnable for ReestablishConnectionRunnable { let event_source = self.event_source.root(); // Step 1.1 if event_source.ready_state.get() == ReadyState::Closed { - self.done_chan.send(()).unwrap(); return; } // Step 1.2 event_source.ready_state.set(ReadyState::Connecting); // Step 1.3 event_source.upcast::().fire_event(atom!("error")); - self.done_chan.send(()).unwrap(); + // Step 2 + let duration = Length::new(event_source.reconnection_time.get()); + // TODO Step 3: Optionally wait some more + // Steps 4-5 + let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback { + event_source: self.event_source.clone(), + action_sender: self.action_sender.clone() + }); + let _ = event_source.global().schedule_callback(callback, duration); } } -pub struct RefetchRequestRunnable { +#[derive(JSTraceable, HeapSizeOf)] +pub struct EventSourceTimeoutCallback { + #[ignore_heap_size_of = "Because it is non-owning"] event_source: Trusted, - gen_id: GenerationId, - - event_type: String, - data: String, - last_event_id: String, + #[ignore_heap_size_of = "Because it is non-owning"] + action_sender: ipc::IpcSender, } -impl Runnable for RefetchRequestRunnable { - fn name(&self) -> &'static str { "EventSource RefetchRequestRunnable" } - +impl EventSourceTimeoutCallback { // https://html.spec.whatwg.org/multipage/#reestablish-the-connection - fn handler(self: Box) { + pub fn invoke(self) { let event_source = self.event_source.root(); let global = event_source.global(); // Step 5.1 @@ -539,28 +526,7 @@ impl Runnable for RefetchRequestRunnable { request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone()))); } // Step 5.4 - let context = EventSourceContext { - event_source: self.event_source.clone(), - gen_id: self.gen_id, - parser_state: ParserState::Eol, - field: String::new(), - value: String::new(), - origin: String::new(), - - event_type: self.event_type.clone(), - data: self.data.clone(), - last_event_id: self.last_event_id.clone() - }; - let listener = NetworkListener { - context: Arc::new(Mutex::new(context)), - task_source: global.networking_task_source(), - wrapper: Some(global.get_runnable_wrapper()) - }; - let (action_sender, action_receiver) = ipc::channel().unwrap(); - ROUTER.add_route(action_receiver.to_opaque(), box move |message| { - listener.notify_fetch(message.to().unwrap()); - }); - global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap(); + global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap(); } } diff --git a/components/script/timers.rs b/components/script/timers.rs index 34ac02b6589..6b67d6959e3 100644 --- a/components/script/timers.rs +++ b/components/script/timers.rs @@ -7,6 +7,7 @@ use dom::bindings::cell::DOMRefCell; use dom::bindings::codegen::Bindings::FunctionBinding::Function; use dom::bindings::reflector::Reflectable; use dom::bindings::str::DOMString; +use dom::eventsource::EventSourceTimeoutCallback; use dom::globalscope::GlobalScope; use dom::testbinding::TestBindingCallback; use dom::xmlhttprequest::XHRTimeoutCallback; @@ -67,6 +68,7 @@ struct OneshotTimer { #[derive(JSTraceable, HeapSizeOf)] pub enum OneshotTimerCallback { XhrTimeout(XHRTimeoutCallback), + EventSourceTimeout(EventSourceTimeoutCallback), JsTimer(JsTimerTask), TestBindingCallback(TestBindingCallback), } @@ -75,6 +77,7 @@ impl OneshotTimerCallback { fn invoke(self, this: &T, js_timers: &JsTimers) { match self { OneshotTimerCallback::XhrTimeout(callback) => callback.invoke(), + OneshotTimerCallback::EventSourceTimeout(callback) => callback.invoke(), OneshotTimerCallback::JsTimer(task) => task.invoke(this, js_timers), OneshotTimerCallback::TestBindingCallback(callback) => callback.invoke(), }