Auto merge of #6635 - Ms2ger:ws-task, r=jdm

Spawn a thread for WebSocket messages.



<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/servo/servo/6635)
<!-- Reviewable:end -->
This commit is contained in:
bors-servo 2015-07-22 14:26:21 -06:00
commit 6fd31867ba

View file

@ -24,6 +24,7 @@ use script_task::ScriptMsg;
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::borrow::ToOwned; use std::borrow::ToOwned;
use util::str::DOMString; use util::str::DOMString;
use util::task::spawn_named;
use hyper::header::Host; use hyper::header::Host;
use websocket::Message; use websocket::Message;
@ -46,7 +47,6 @@ enum WebSocketRequestState {
} }
no_jsmanaged_fields!(Sender<WebSocketStream>); no_jsmanaged_fields!(Sender<WebSocketStream>);
no_jsmanaged_fields!(Receiver<WebSocketStream>);
#[dom_struct] #[dom_struct]
pub struct WebSocket { pub struct WebSocket {
@ -55,7 +55,6 @@ pub struct WebSocket {
global: GlobalField, global: GlobalField,
ready_state: Cell<WebSocketRequestState>, ready_state: Cell<WebSocketRequestState>,
sender: RefCell<Option<Sender<WebSocketStream>>>, sender: RefCell<Option<Sender<WebSocketStream>>>,
receiver: RefCell<Option<Receiver<WebSocketStream>>>,
failed: Cell<bool>, //Flag to tell if websocket was closed due to failure failed: Cell<bool>, //Flag to tell if websocket was closed due to failure
full: Cell<bool>, //Flag to tell if websocket queue is full full: Cell<bool>, //Flag to tell if websocket queue is full
clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail) clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail)
@ -86,7 +85,6 @@ impl WebSocket {
ready_state: Cell::new(WebSocketRequestState::Connecting), ready_state: Cell::new(WebSocketRequestState::Connecting),
failed: Cell::new(false), failed: Cell::new(false),
sender: RefCell::new(None), sender: RefCell::new(None),
receiver: RefCell::new(None),
full: Cell::new(false), full: Cell::new(false),
clean_close: Cell::new(true), clean_close: Cell::new(true),
code: Cell::new(0), code: Cell::new(0),
@ -104,6 +102,9 @@ impl WebSocket {
let parsed_url = try!(Url::parse(&url).map_err(|_| Error::Syntax)); let parsed_url = try!(Url::parse(&url).map_err(|_| Error::Syntax));
let url = try!(parse_url(&parsed_url).map_err(|_| Error::Syntax)); let url = try!(parse_url(&parsed_url).map_err(|_| Error::Syntax));
// Step 2: Disallow https -> ws connections.
// Step 3: Potentially block access to some ports.
// Step 4. // Step 4.
let protocols = protocols.as_slice(); let protocols = protocols.as_slice();
@ -124,52 +125,41 @@ impl WebSocket {
} }
} }
/*TODO: This constructor is only a prototype, it does not accomplish the specs // Step 6: Origin.
defined here:
http://html.spec.whatwg.org // Step 7.
The remaining 8 items must be satisfied.
TODO: This constructor should be responsible for spawning a thread for the
receive loop after ws.r().Open() - See comment
*/
let ws = reflect_dom_object(box WebSocket::new_inherited(global, parsed_url), let ws = reflect_dom_object(box WebSocket::new_inherited(global, parsed_url),
global, global,
WebSocketBinding::Wrap); WebSocketBinding::Wrap);
let address = Trusted::new(global.get_cx(), ws.r(), global.script_chan());
let channel = establish_a_websocket_connection(url, global.get_url().serialize()); let origin = global.get_url().serialize();
let (temp_sender, temp_receiver) = match channel { let sender = global.script_chan();
spawn_named(format!("WebSocket connection to {}", ws.Url()), move || {
// Step 8: Protocols.
// Step 9.
let channel = establish_a_websocket_connection(url, origin);
let (temp_sender, _temp_receiver) = match channel {
Ok(channel) => channel, Ok(channel) => channel,
Err(e) => { Err(e) => {
debug!("Failed to establish a WebSocket connection: {:?}", e); debug!("Failed to establish a WebSocket connection: {:?}", e);
let global_root = ws.r().global.root(); let task = box CloseTask {
let address = Trusted::new(global_root.r().get_cx(), ws.r(), global_root.r().script_chan().clone()); addr: address,
let task = box WebSocketTaskHandler::new(address, WebSocketTask::Close); };
global_root.r().script_chan().send(ScriptMsg::RunnableMsg(task)).unwrap(); sender.send(ScriptMsg::RunnableMsg(task)).unwrap();
return Ok(ws); return;
} }
}; };
*ws.r().sender.borrow_mut() = Some(temp_sender); let open_task = box ConnectionEstablishedTask {
*ws.r().receiver.borrow_mut() = Some(temp_receiver); addr: address,
sender: temp_sender,
};
sender.send(ScriptMsg::RunnableMsg(open_task)).unwrap();
});
//Create everything necessary for starting the open asynchronous task, then begin the task. // Step 7.
let global_root = ws.r().global.root();
let addr: Trusted<WebSocket> =
Trusted::new(global_root.r().get_cx(), ws.r(), global_root.r().script_chan().clone());
let open_task = box WebSocketTaskHandler::new(addr, WebSocketTask::ConnectionEstablished);
global_root.r().script_chan().send(ScriptMsg::RunnableMsg(open_task)).unwrap();
//TODO: Spawn thread here for receive loop
/*TODO: Add receive loop here and make new thread run this
Receive is an infinite loop "similiar" the one shown here:
https://github.com/cyderize/rust-websocket/blob/master/examples/client.rs#L64
TODO: The receive loop however does need to follow the spec. These are outlined here
under "WebSocket message has been received" items 1-5:
https://github.com/cyderize/rust-websocket/blob/master/examples/client.rs#L64
TODO: The receive loop also needs to dispatch an asynchronous event as stated here:
https://github.com/cyderize/rust-websocket/blob/master/examples/client.rs#L64
TODO: When the receive loop receives a close message from the server,
it confirms the websocket is now closed. This requires the close event
to be fired (dispatch_close fires the close event - see implementation below)
*/
Ok(ws) Ok(ws)
} }
@ -273,31 +263,18 @@ impl<'a> WebSocketMethods for &'a WebSocket {
} }
pub enum WebSocketTask {
/// Task queued when *the WebSocket connection is established*. /// Task queued when *the WebSocket connection is established*.
ConnectionEstablished, struct ConnectionEstablishedTask {
Close,
}
pub struct WebSocketTaskHandler {
addr: Trusted<WebSocket>, addr: Trusted<WebSocket>,
task: WebSocketTask, sender: Sender<WebSocketStream>,
} }
impl WebSocketTaskHandler { impl Runnable for ConnectionEstablishedTask {
pub fn new(addr: Trusted<WebSocket>, task: WebSocketTask) -> WebSocketTaskHandler { fn handler(self: Box<Self>) {
WebSocketTaskHandler {
addr: addr,
task: task,
}
}
fn connection_established(&self) {
/*TODO: Items 1, 3, 4, & 5 under "WebSocket connection is established" as specified here:
https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol
*/
let ws = self.addr.root(); let ws = self.addr.root();
*ws.r().sender.borrow_mut() = Some(self.sender);
// Step 1: Protocols. // Step 1: Protocols.
// Step 2. // Step 2.
@ -314,8 +291,14 @@ impl WebSocketTaskHandler {
EventCancelable::NotCancelable); EventCancelable::NotCancelable);
event.fire(EventTargetCast::from_ref(ws.r())); event.fire(EventTargetCast::from_ref(ws.r()));
} }
}
fn dispatch_close(&self) { struct CloseTask {
addr: Trusted<WebSocket>,
}
impl Runnable for CloseTask {
fn handler(self: Box<Self>) {
let ws = self.addr.root(); let ws = self.addr.root();
let ws = ws.r(); let ws = ws.r();
let global = ws.global.root(); let global = ws.global.root();
@ -350,17 +333,3 @@ impl WebSocketTaskHandler {
event.fire(target); event.fire(target);
} }
} }
impl Runnable for WebSocketTaskHandler {
fn handler(self: Box<WebSocketTaskHandler>) {
match self.task {
WebSocketTask::ConnectionEstablished => {
self.connection_established();
}
WebSocketTask::Close => {
self.dispatch_close();
}
}
}
}