Auto merge of #7066 - Ms2ger:ws-event, r=metajack

Dispatch message events for WebSocket.



<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/servo/servo/7066)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2015-08-08 03:00:50 -06:00
commit d8b4611a79
87 changed files with 127 additions and 424 deletions

View file

@ -26,7 +26,7 @@ interface WebSocket : EventTarget {
[Throws] void close([Clamp] optional unsigned short code, optional USVString reason);
//messaging
//attribute EventHandler onmessage;
attribute EventHandler onmessage;
//attribute BinaryType binaryType;
[Throws] void send(optional USVString data);
//void send(Blob data);

View file

@ -8,6 +8,7 @@ use dom::bindings::codegen::Bindings::WebSocketBinding::WebSocketMethods;
use dom::bindings::codegen::Bindings::EventHandlerBinding::EventHandlerNonNull;
use dom::bindings::codegen::InheritTypes::EventTargetCast;
use dom::bindings::codegen::InheritTypes::EventCast;
use dom::bindings::conversions::ToJSValConvertible;
use dom::bindings::error::{Error, Fallible};
use dom::bindings::error::Error::{InvalidAccess, Syntax};
use dom::bindings::global::{GlobalField, GlobalRef};
@ -15,20 +16,22 @@ use dom::bindings::js::Root;
use dom::bindings::refcounted::Trusted;
use dom::bindings::str::USVString;
use dom::bindings::trace::JSTraceable;
use dom::bindings::utils::reflect_dom_object;
use dom::bindings::utils::{reflect_dom_object, Reflectable};
use dom::blob::Blob;
use dom::closeevent::CloseEvent;
use dom::event::{Event, EventBubbles, EventCancelable, EventHelpers};
use dom::eventtarget::{EventTarget, EventTargetHelpers, EventTargetTypeId};
use net_traits::hosts::replace_hosts;
use dom::messageevent::MessageEvent;
use script_task::Runnable;
use script_task::ScriptMsg;
use std::cell::{Cell, RefCell};
use std::borrow::ToOwned;
use net_traits::hosts::replace_hosts;
use util::str::DOMString;
use util::task::spawn_named;
use js::jsapi::{RootedValue, JSAutoRequest, JSAutoCompartment};
use js::jsval::UndefinedValue;
use hyper::header::Host;
use websocket::Message;
use websocket::ws::sender::Sender as Sender_Object;
use websocket::client::sender::Sender;
@ -38,9 +41,14 @@ use websocket::client::request::Url;
use websocket::Client;
use websocket::header::Origin;
use websocket::result::WebSocketResult;
use websocket::ws::receiver::Receiver as WSReceiver;
use websocket::ws::util::url::parse_url;
#[derive(JSTraceable, PartialEq, Copy, Clone)]
use std::borrow::ToOwned;
use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex};
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug)]
enum WebSocketRequestState {
Connecting = 0,
Open = 1,
@ -50,13 +58,18 @@ enum WebSocketRequestState {
no_jsmanaged_fields!(Sender<WebSocketStream>);
enum MessageData {
Text(String),
Binary(Vec<u8>),
}
#[dom_struct]
pub struct WebSocket {
eventtarget: EventTarget,
url: Url,
global: GlobalField,
ready_state: Cell<WebSocketRequestState>,
sender: RefCell<Option<Sender<WebSocketStream>>>,
sender: RefCell<Option<Arc<Mutex<Sender<WebSocketStream>>>>>,
failed: Cell<bool>, //Flag to tell if websocket was closed due to failure
full: Cell<bool>, //Flag to tell if websocket queue is full
clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail)
@ -156,7 +169,7 @@ impl WebSocket {
// Step 9.
let channel = establish_a_websocket_connection(&resource_url, net_url, origin);
let (temp_sender, _temp_receiver) = match channel {
let (ws_sender, mut receiver) = match channel {
Ok(channel) => channel,
Err(e) => {
debug!("Failed to establish a WebSocket connection: {:?}", e);
@ -167,12 +180,39 @@ impl WebSocket {
return;
}
};
let ws_sender = Arc::new(Mutex::new(ws_sender));
let open_task = box ConnectionEstablishedTask {
addr: address,
sender: temp_sender,
addr: address.clone(),
sender: ws_sender.clone(),
};
sender.send(ScriptMsg::RunnableMsg(open_task)).unwrap();
for message in receiver.incoming_messages() {
let message = match message {
Ok(Message::Text(text)) => MessageData::Text(text),
Ok(Message::Binary(data)) => MessageData::Binary(data),
Ok(Message::Ping(data)) => {
ws_sender.lock().unwrap().send_message(Message::Pong(data)).unwrap();
continue;
},
Ok(Message::Pong(_)) => continue,
Ok(Message::Close(data)) => {
ws_sender.lock().unwrap().send_message(Message::Close(data)).unwrap();
let task = box CloseTask {
addr: address,
};
sender.send(ScriptMsg::RunnableMsg(task)).unwrap();
break;
},
Err(_) => break,
};
let message_task = box MessageReceivedTask {
address: address.clone(),
message: message,
};
sender.send(ScriptMsg::RunnableMsg(message_task)).unwrap();
}
});
// Step 7.
@ -184,6 +224,7 @@ impl<'a> WebSocketMethods for &'a WebSocket {
event_handler!(open, GetOnopen, SetOnopen);
event_handler!(close, GetOnclose, SetOnclose);
event_handler!(error, GetOnerror, SetOnerror);
event_handler!(message, GetOnmessage, SetOnmessage);
// https://html.spec.whatwg.org/multipage/#dom-websocket-url
fn Url(self) -> DOMString {
@ -218,7 +259,7 @@ impl<'a> WebSocketMethods for &'a WebSocket {
*/
let mut other_sender = self.sender.borrow_mut();
let my_sender = other_sender.as_mut().unwrap();
let _ = my_sender.send_message(Message::Text(data.unwrap().0));
let _ = my_sender.lock().unwrap().send_message(Message::Text(data.unwrap().0));
return Ok(())
}
@ -230,7 +271,7 @@ impl<'a> WebSocketMethods for &'a WebSocket {
let mut sender = this.sender.borrow_mut();
//TODO: Also check if the buffer is full
if let Some(sender) = sender.as_mut() {
let _ = sender.send_message(Message::Close(None));
let _ = sender.lock().unwrap().send_message(Message::Close(None));
}
}
@ -279,7 +320,7 @@ impl<'a> WebSocketMethods for &'a WebSocket {
/// Task queued when *the WebSocket connection is established*.
struct ConnectionEstablishedTask {
addr: Trusted<WebSocket>,
sender: Sender<WebSocketStream>,
sender: Arc<Mutex<Sender<WebSocketStream>>>,
}
impl Runnable for ConnectionEstablishedTask {
@ -346,3 +387,38 @@ impl Runnable for CloseTask {
event.fire(target);
}
}
struct MessageReceivedTask {
address: Trusted<WebSocket>,
message: MessageData,
}
impl Runnable for MessageReceivedTask {
fn handler(self: Box<Self>) {
let ws = self.address.root();
debug!("MessageReceivedTask::handler({:p}): readyState={:?}", &*ws,
ws.ready_state.get());
// Step 1.
if ws.ready_state.get() != WebSocketRequestState::Open {
return;
}
// Step 2-5.
let global = ws.global.root();
let cx = global.r().get_cx();
let _ar = JSAutoRequest::new(cx);
let _ac = JSAutoCompartment::new(cx, ws.reflector().get_jsobject().get());
let mut message = RootedValue::new(cx, UndefinedValue());
match self.message {
MessageData::Text(text) => text.to_jsval(cx, message.handle_mut()),
MessageData::Binary(data) => {
let blob = Blob::new(global.r(), Some(data), "");
blob.to_jsval(cx, message.handle_mut());
},
}
let target = EventTargetCast::from_ref(ws.r());
MessageEvent::dispatch_jsval(target, global.r(), message.handle());
}
}