mirror of
https://github.com/servo/servo.git
synced 2025-07-22 23:03:42 +01:00
Interpret event stream
This commit is contained in:
parent
c198bfa388
commit
0b32b624a7
1 changed files with 229 additions and 26 deletions
|
@ -11,25 +11,39 @@ use dom::bindings::js::Root;
|
||||||
use dom::bindings::refcounted::Trusted;
|
use dom::bindings::refcounted::Trusted;
|
||||||
use dom::bindings::reflector::{Reflectable, reflect_dom_object};
|
use dom::bindings::reflector::{Reflectable, reflect_dom_object};
|
||||||
use dom::bindings::str::DOMString;
|
use dom::bindings::str::DOMString;
|
||||||
|
use dom::event::Event;
|
||||||
use dom::eventtarget::EventTarget;
|
use dom::eventtarget::EventTarget;
|
||||||
use dom::globalscope::GlobalScope;
|
use dom::globalscope::GlobalScope;
|
||||||
|
use dom::messageevent::MessageEvent;
|
||||||
|
use encoding::Encoding;
|
||||||
|
use encoding::all::UTF_8;
|
||||||
use hyper::header::{Accept, qitem};
|
use hyper::header::{Accept, qitem};
|
||||||
use ipc_channel::ipc;
|
use ipc_channel::ipc;
|
||||||
use ipc_channel::router::ROUTER;
|
use ipc_channel::router::ROUTER;
|
||||||
|
use js::conversions::ToJSValConvertible;
|
||||||
|
use js::jsapi::JSAutoCompartment;
|
||||||
|
use js::jsval::UndefinedValue;
|
||||||
use mime::{Mime, TopLevel, SubLevel};
|
use mime::{Mime, TopLevel, SubLevel};
|
||||||
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError};
|
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError};
|
||||||
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
|
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
|
||||||
use net_traits::request::{RequestInit, RequestMode};
|
use net_traits::request::{RequestInit, RequestMode};
|
||||||
use network_listener::{NetworkListener, PreInvoke};
|
use network_listener::{NetworkListener, PreInvoke};
|
||||||
use script_thread::Runnable;
|
use script_thread::Runnable;
|
||||||
|
use servo_atoms::Atom;
|
||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
|
use std::mem;
|
||||||
|
use std::str::{Chars, FromStr};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::sync::mpsc::{Sender, channel};
|
use std::sync::mpsc::{Sender, channel};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
use task_source::TaskSource;
|
use task_source::TaskSource;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
header! { (LastEventId, "Last-Event-ID") => [String] }
|
header! { (LastEventId, "Last-Event-ID") => [String] }
|
||||||
|
|
||||||
|
const DEFAULT_RECONNECTION_TIME: u64 = 5000;
|
||||||
|
|
||||||
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
|
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
|
||||||
struct GenerationId(u32);
|
struct GenerationId(u32);
|
||||||
|
|
||||||
|
@ -44,18 +58,34 @@ enum ReadyState {
|
||||||
#[dom_struct]
|
#[dom_struct]
|
||||||
pub struct EventSource {
|
pub struct EventSource {
|
||||||
eventtarget: EventTarget,
|
eventtarget: EventTarget,
|
||||||
url: DOMRefCell<Option<Url>>,
|
url: Url,
|
||||||
request: DOMRefCell<Option<RequestInit>>,
|
request: DOMRefCell<Option<RequestInit>>,
|
||||||
last_event_id: DOMRefCell<DOMString>,
|
last_event_id: DOMRefCell<DOMString>,
|
||||||
|
reconnection_time: Cell<u64>,
|
||||||
generation_id: Cell<GenerationId>,
|
generation_id: Cell<GenerationId>,
|
||||||
|
|
||||||
ready_state: Cell<ReadyState>,
|
ready_state: Cell<ReadyState>,
|
||||||
with_credentials: bool,
|
with_credentials: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum ParserState {
|
||||||
|
Field,
|
||||||
|
Comment,
|
||||||
|
Value,
|
||||||
|
Eol
|
||||||
|
}
|
||||||
|
|
||||||
struct EventSourceContext {
|
struct EventSourceContext {
|
||||||
event_source: Trusted<EventSource>,
|
event_source: Trusted<EventSource>,
|
||||||
gen_id: GenerationId
|
gen_id: GenerationId,
|
||||||
|
parser_state: ParserState,
|
||||||
|
field: String,
|
||||||
|
value: String,
|
||||||
|
origin: String,
|
||||||
|
|
||||||
|
event_type: String,
|
||||||
|
data: String,
|
||||||
|
last_event_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventSourceContext {
|
impl EventSourceContext {
|
||||||
|
@ -94,6 +124,9 @@ impl EventSourceContext {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
|
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
|
||||||
|
// Step 2
|
||||||
|
thread::sleep(Duration::from_millis(event_source.reconnection_time.get()));
|
||||||
|
// TODO Step 3: Optionally wait some more
|
||||||
// Step 4
|
// Step 4
|
||||||
if self.gen_id != self.event_source.root().generation_id.get() {
|
if self.gen_id != self.event_source.root().generation_id.get() {
|
||||||
return;
|
return;
|
||||||
|
@ -102,13 +135,146 @@ impl EventSourceContext {
|
||||||
// Step 5
|
// Step 5
|
||||||
let runnable = box RefetchRequestRunnable {
|
let runnable = box RefetchRequestRunnable {
|
||||||
event_source: self.event_source.clone(),
|
event_source: self.event_source.clone(),
|
||||||
gen_id: self.gen_id
|
gen_id: self.gen_id,
|
||||||
|
|
||||||
|
event_type: self.event_type.clone(),
|
||||||
|
data: self.data.clone(),
|
||||||
|
last_event_id: self.last_event_id.clone(),
|
||||||
};
|
};
|
||||||
if self.gen_id != self.event_source.root().generation_id.get() {
|
if self.gen_id != self.event_source.root().generation_id.get() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
|
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://html.spec.whatwg.org/multipage/#processField
|
||||||
|
fn process_field(&mut self) {
|
||||||
|
match &*self.field {
|
||||||
|
"event" => mem::swap(&mut self.event_type, &mut self.value),
|
||||||
|
"data" => {
|
||||||
|
self.data.push_str(&self.value);
|
||||||
|
self.data.push('\n');
|
||||||
|
}
|
||||||
|
"id" => mem::swap(&mut self.last_event_id, &mut self.value),
|
||||||
|
"retry" => if let Ok(time) = u64::from_str(&self.value) {
|
||||||
|
self.event_source.root().reconnection_time.set(time);
|
||||||
|
},
|
||||||
|
_ => ()
|
||||||
|
}
|
||||||
|
|
||||||
|
self.field.clear();
|
||||||
|
self.value.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://html.spec.whatwg.org/multipage/#dispatchMessage
|
||||||
|
#[allow(unsafe_code)]
|
||||||
|
fn dispatch_event(&mut self) {
|
||||||
|
let event_source = self.event_source.root();
|
||||||
|
// Step 1
|
||||||
|
*event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
|
||||||
|
// Step 2
|
||||||
|
if self.data.is_empty() {
|
||||||
|
self.data.clear();
|
||||||
|
self.event_type.clear();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Step 3
|
||||||
|
if let Some(last) = self.data.pop() {
|
||||||
|
if last != '\n' {
|
||||||
|
self.data.push(last);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Step 6
|
||||||
|
let type_ = if !self.event_type.is_empty() {
|
||||||
|
Atom::from(self.event_type.clone())
|
||||||
|
} else {
|
||||||
|
atom!("message")
|
||||||
|
};
|
||||||
|
// Steps 4-5
|
||||||
|
let event = {
|
||||||
|
let _ac = JSAutoCompartment::new(event_source.global().get_cx(),
|
||||||
|
event_source.reflector().get_jsobject().get());
|
||||||
|
rooted!(in(event_source.global().get_cx()) let mut data = UndefinedValue());
|
||||||
|
unsafe { self.data.to_jsval(event_source.global().get_cx(), data.handle_mut()) };
|
||||||
|
MessageEvent::new(&*event_source.global(), type_, false, false, data.handle(),
|
||||||
|
DOMString::from(self.origin.clone()),
|
||||||
|
event_source.last_event_id.borrow().clone())
|
||||||
|
};
|
||||||
|
// Step 7
|
||||||
|
self.event_type.clear();
|
||||||
|
self.data.clear();
|
||||||
|
// Step 8
|
||||||
|
let runnable = box DispatchEventRunnable {
|
||||||
|
event_source: self.event_source.clone(),
|
||||||
|
event: Trusted::new(&event)
|
||||||
|
};
|
||||||
|
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://html.spec.whatwg.org/multipage/#event-stream-interpretation
|
||||||
|
fn parse(&mut self, stream: Chars) {
|
||||||
|
let mut stream = stream.peekable();
|
||||||
|
|
||||||
|
while let Some(ch) = stream.next() {
|
||||||
|
match (ch, &self.parser_state) {
|
||||||
|
(':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
|
||||||
|
(':', &ParserState::Field) => {
|
||||||
|
self.parser_state = ParserState::Value;
|
||||||
|
if let Some(&' ') = stream.peek() {
|
||||||
|
stream.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
('\n', &ParserState::Value) => {
|
||||||
|
self.parser_state = ParserState::Eol;
|
||||||
|
self.process_field();
|
||||||
|
}
|
||||||
|
('\r', &ParserState::Value) => {
|
||||||
|
if let Some(&'\n') = stream.peek() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.parser_state = ParserState::Eol;
|
||||||
|
self.process_field();
|
||||||
|
}
|
||||||
|
|
||||||
|
('\n', &ParserState::Field) => {
|
||||||
|
self.parser_state = ParserState::Eol;
|
||||||
|
self.process_field();
|
||||||
|
}
|
||||||
|
('\r', &ParserState::Field) => {
|
||||||
|
if let Some(&'\n') = stream.peek() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.parser_state = ParserState::Eol;
|
||||||
|
self.process_field();
|
||||||
|
}
|
||||||
|
|
||||||
|
('\n', &ParserState::Eol) => self.dispatch_event(),
|
||||||
|
('\r', &ParserState::Eol) => {
|
||||||
|
if let Some(&'\n') = stream.peek() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.dispatch_event();
|
||||||
|
}
|
||||||
|
|
||||||
|
('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
|
||||||
|
('\r', &ParserState::Comment) => {
|
||||||
|
if let Some(&'\n') = stream.peek() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.parser_state = ParserState::Eol;
|
||||||
|
}
|
||||||
|
|
||||||
|
(_, &ParserState::Field) => self.field.push(ch),
|
||||||
|
(_, &ParserState::Value) => self.value.push(ch),
|
||||||
|
(_, &ParserState::Eol) => {
|
||||||
|
self.parser_state = ParserState::Field;
|
||||||
|
self.field.push(ch);
|
||||||
|
}
|
||||||
|
(_, &ParserState::Comment) => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FetchResponseListener for EventSourceContext {
|
impl FetchResponseListener for EventSourceContext {
|
||||||
|
@ -130,8 +296,10 @@ impl FetchResponseListener for EventSourceContext {
|
||||||
match meta.content_type {
|
match meta.content_type {
|
||||||
None => self.fail_the_connection(),
|
None => self.fail_the_connection(),
|
||||||
Some(ct) => match ct.into_inner().0 {
|
Some(ct) => match ct.into_inner().0 {
|
||||||
Mime(TopLevel::Text, SubLevel::EventStream, _) =>
|
Mime(TopLevel::Text, SubLevel::EventStream, _) => {
|
||||||
self.announce_the_connection(),
|
self.origin = meta.final_url.origin().unicode_serialization();
|
||||||
|
self.announce_the_connection();
|
||||||
|
}
|
||||||
_ => self.fail_the_connection()
|
_ => self.fail_the_connection()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -142,12 +310,14 @@ impl FetchResponseListener for EventSourceContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_response_chunk(&mut self, mut _chunk: Vec<u8>) {
|
fn process_response_chunk(&mut self, chunk: Vec<u8>) {
|
||||||
// TODO
|
let mut stream = String::new();
|
||||||
|
UTF_8.raw_decoder().raw_feed(&chunk, &mut stream);
|
||||||
|
self.parse(stream.chars())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
|
fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {
|
||||||
// TODO
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,12 +328,13 @@ impl PreInvoke for EventSourceContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventSource {
|
impl EventSource {
|
||||||
fn new_inherited(with_credentials: bool) -> EventSource {
|
fn new_inherited(url: Url, with_credentials: bool) -> EventSource {
|
||||||
EventSource {
|
EventSource {
|
||||||
eventtarget: EventTarget::new_inherited(),
|
eventtarget: EventTarget::new_inherited(),
|
||||||
url: DOMRefCell::new(None),
|
url: url,
|
||||||
request: DOMRefCell::new(None),
|
request: DOMRefCell::new(None),
|
||||||
last_event_id: DOMRefCell::new(DOMString::from("")),
|
last_event_id: DOMRefCell::new(DOMString::from("")),
|
||||||
|
reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
|
||||||
generation_id: Cell::new(GenerationId(0)),
|
generation_id: Cell::new(GenerationId(0)),
|
||||||
|
|
||||||
ready_state: Cell::new(ReadyState::Connecting),
|
ready_state: Cell::new(ReadyState::Connecting),
|
||||||
|
@ -171,8 +342,8 @@ impl EventSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(global: &GlobalScope, with_credentials: bool) -> Root<EventSource> {
|
fn new(global: &GlobalScope, url: Url, with_credentials: bool) -> Root<EventSource> {
|
||||||
reflect_dom_object(box EventSource::new_inherited(with_credentials),
|
reflect_dom_object(box EventSource::new_inherited(url, with_credentials),
|
||||||
global,
|
global,
|
||||||
Wrap)
|
Wrap)
|
||||||
}
|
}
|
||||||
|
@ -181,15 +352,9 @@ impl EventSource {
|
||||||
self.request.borrow().clone().unwrap()
|
self.request.borrow().clone().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn last_event_id(&self) -> DOMString {
|
|
||||||
self.last_event_id.borrow().clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn Constructor(global: &GlobalScope,
|
pub fn Constructor(global: &GlobalScope,
|
||||||
url: DOMString,
|
url: DOMString,
|
||||||
event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
|
event_source_init: &EventSourceInit) -> Fallible<Root<EventSource>> {
|
||||||
// Step 1
|
|
||||||
let ev = EventSource::new(global, event_source_init.withCredentials);
|
|
||||||
// TODO: Step 2 relevant settings object
|
// TODO: Step 2 relevant settings object
|
||||||
// Step 3
|
// Step 3
|
||||||
let base_url = global.api_base_url();
|
let base_url = global.api_base_url();
|
||||||
|
@ -198,8 +363,8 @@ impl EventSource {
|
||||||
// Step 4
|
// Step 4
|
||||||
Err(_) => return Err(Error::Syntax)
|
Err(_) => return Err(Error::Syntax)
|
||||||
};
|
};
|
||||||
// Step 5
|
// Step 1, 5
|
||||||
*ev.url.borrow_mut() = Some(url_record.clone());
|
let ev = EventSource::new(global, url_record.clone(), event_source_init.withCredentials);
|
||||||
// Steps 6-7
|
// Steps 6-7
|
||||||
let cors_attribute_state = if event_source_init.withCredentials {
|
let cors_attribute_state = if event_source_init.withCredentials {
|
||||||
CorsSettings::UseCredentials
|
CorsSettings::UseCredentials
|
||||||
|
@ -231,7 +396,15 @@ impl EventSource {
|
||||||
// Step 14
|
// Step 14
|
||||||
let context = EventSourceContext {
|
let context = EventSourceContext {
|
||||||
event_source: Trusted::new(&ev),
|
event_source: Trusted::new(&ev),
|
||||||
gen_id: ev.generation_id.get()
|
gen_id: ev.generation_id.get(),
|
||||||
|
parser_state: ParserState::Eol,
|
||||||
|
field: String::new(),
|
||||||
|
value: String::new(),
|
||||||
|
origin: String::new(),
|
||||||
|
|
||||||
|
event_type: String::new(),
|
||||||
|
data: String::new(),
|
||||||
|
last_event_id: String::new(),
|
||||||
};
|
};
|
||||||
let listener = NetworkListener {
|
let listener = NetworkListener {
|
||||||
context: Arc::new(Mutex::new(context)),
|
context: Arc::new(Mutex::new(context)),
|
||||||
|
@ -260,7 +433,7 @@ impl EventSourceMethods for EventSource {
|
||||||
|
|
||||||
// https://html.spec.whatwg.org/multipage/#dom-eventsource-url
|
// https://html.spec.whatwg.org/multipage/#dom-eventsource-url
|
||||||
fn Url(&self) -> DOMString {
|
fn Url(&self) -> DOMString {
|
||||||
DOMString::from(self.url.borrow().clone().map_or("".to_owned(), Url::into_string))
|
DOMString::from(self.url.as_str())
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials
|
// https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials
|
||||||
|
@ -341,7 +514,11 @@ impl Runnable for ReestablishConnectionRunnable {
|
||||||
|
|
||||||
pub struct RefetchRequestRunnable {
|
pub struct RefetchRequestRunnable {
|
||||||
event_source: Trusted<EventSource>,
|
event_source: Trusted<EventSource>,
|
||||||
gen_id: GenerationId
|
gen_id: GenerationId,
|
||||||
|
|
||||||
|
event_type: String,
|
||||||
|
data: String,
|
||||||
|
last_event_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runnable for RefetchRequestRunnable {
|
impl Runnable for RefetchRequestRunnable {
|
||||||
|
@ -358,13 +535,21 @@ impl Runnable for RefetchRequestRunnable {
|
||||||
// Step 5.2
|
// Step 5.2
|
||||||
let mut request = event_source.request();
|
let mut request = event_source.request();
|
||||||
// Step 5.3
|
// Step 5.3
|
||||||
if !event_source.last_event_id().is_empty() {
|
if !event_source.last_event_id.borrow().is_empty() {
|
||||||
request.headers.set(LastEventId(String::from(event_source.last_event_id())));
|
request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
|
||||||
}
|
}
|
||||||
// Step 5.4
|
// Step 5.4
|
||||||
let context = EventSourceContext {
|
let context = EventSourceContext {
|
||||||
event_source: self.event_source.clone(),
|
event_source: self.event_source.clone(),
|
||||||
gen_id: self.gen_id
|
gen_id: self.gen_id,
|
||||||
|
parser_state: ParserState::Eol,
|
||||||
|
field: String::new(),
|
||||||
|
value: String::new(),
|
||||||
|
origin: String::new(),
|
||||||
|
|
||||||
|
event_type: self.event_type.clone(),
|
||||||
|
data: self.data.clone(),
|
||||||
|
last_event_id: self.last_event_id.clone()
|
||||||
};
|
};
|
||||||
let listener = NetworkListener {
|
let listener = NetworkListener {
|
||||||
context: Arc::new(Mutex::new(context)),
|
context: Arc::new(Mutex::new(context)),
|
||||||
|
@ -378,3 +563,21 @@ impl Runnable for RefetchRequestRunnable {
|
||||||
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
|
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct DispatchEventRunnable {
|
||||||
|
event_source: Trusted<EventSource>,
|
||||||
|
event: Trusted<MessageEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Runnable for DispatchEventRunnable {
|
||||||
|
fn name(&self) -> &'static str { "EventSource DispatchEventRunnable" }
|
||||||
|
|
||||||
|
// https://html.spec.whatwg.org/multipage/#dispatchMessage
|
||||||
|
fn handler(self: Box<DispatchEventRunnable>) {
|
||||||
|
let event_source = self.event_source.root();
|
||||||
|
// Step 8
|
||||||
|
if event_source.ready_state.get() != ReadyState::Closed {
|
||||||
|
self.event.root().upcast::<Event>().fire(&event_source.upcast());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue