move websocket creation to resource task

This commit is contained in:
Nova Fallen 2015-12-02 23:54:24 -05:00
parent 9f9d3570fc
commit e8c8277f34
10 changed files with 234 additions and 101 deletions

View file

@ -21,32 +21,25 @@ use dom::closeevent::CloseEvent;
use dom::event::{Event, EventBubbles, EventCancelable};
use dom::eventtarget::EventTarget;
use dom::messageevent::MessageEvent;
use hyper::header::Host;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use js::jsapi::{JSAutoCompartment, JSAutoRequest, RootedValue};
use js::jsapi::{JS_GetArrayBufferData, JS_NewArrayBuffer};
use js::jsval::UndefinedValue;
use libc::{uint32_t, uint8_t};
use net_traits::ControlMsg::WebsocketConnect;
use net_traits::MessageData;
use net_traits::hosts::replace_hosts;
use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
use ref_slice::ref_slice;
use script_task::ScriptTaskEventCategory::WebSocketEvent;
use script_task::{CommonScriptMsg, Runnable};
use std::borrow::ToOwned;
use std::cell::Cell;
use std::ptr;
use std::sync::{Arc, Mutex};
use std::thread;
use util::str::DOMString;
use util::task::spawn_named;
use websocket::client::receiver::Receiver;
use websocket::client::request::Url;
use websocket::client::sender::Sender;
use websocket::header::Origin;
use websocket::message::Type;
use websocket::result::WebSocketResult;
use websocket::stream::WebSocketStream;
use websocket::ws::receiver::Receiver as WSReceiver;
use websocket::ws::sender::Sender as Sender_Object;
use websocket::ws::util::url::parse_url;
use websocket::{Client, Message};
#[derive(JSTraceable, PartialEq, Copy, Clone, Debug, HeapSizeOf)]
enum WebSocketRequestState {
@ -56,14 +49,6 @@ enum WebSocketRequestState {
Closed = 3,
}
no_jsmanaged_fields!(Sender<WebSocketStream>);
#[derive(HeapSizeOf)]
enum MessageData {
Text(String),
Binary(Vec<u8>),
}
// list of blacklist ports according to
// http://mxr.mozilla.org/mozilla-central/source/netwerk/base/nsIOService.cpp#87
const BLOCKED_PORTS_LIST: &'static [u16] = &[
@ -154,7 +139,7 @@ pub struct WebSocket {
buffered_amount: Cell<u32>,
clearing_buffer: Cell<bool>, //Flag to tell if there is a running task to clear buffered_amount
#[ignore_heap_size_of = "Defined in std"]
sender: DOMRefCell<Option<Arc<Mutex<Sender<WebSocketStream>>>>>,
sender: DOMRefCell<Option<IpcSender<WebSocketDomAction>>>,
failed: Cell<bool>, //Flag to tell if websocket was closed due to failure
full: Cell<bool>, //Flag to tell if websocket queue is full
clean_close: Cell<bool>, //Flag to tell if the websocket closed cleanly (not due to full or fail)
@ -163,29 +148,6 @@ pub struct WebSocket {
binary_type: Cell<BinaryType>,
}
/// *Establish a WebSocket Connection* as defined in RFC 6455.
fn establish_a_websocket_connection(resource_url: &Url, net_url: (Host, String, bool),
origin: String)
-> WebSocketResult<(Sender<WebSocketStream>, Receiver<WebSocketStream>)> {
// URL that we actually fetch from the network, after applying the replacements
// specified in the hosts file.
let host = Host {
hostname: resource_url.serialize_host().unwrap(),
port: resource_url.port_or_default()
};
let mut request = try!(Client::connect(net_url));
request.headers.set(Origin(origin));
request.headers.set(host);
let response = try!(request.send());
try!(response.validate());
Ok(response.begin().split())
}
impl WebSocket {
fn new_inherited(global: GlobalRef, url: Url) -> WebSocket {
WebSocket {
@ -217,8 +179,9 @@ impl WebSocket {
-> Fallible<Root<WebSocket>> {
// Step 1.
let resource_url = try!(Url::parse(&url).map_err(|_| Error::Syntax));
let net_url = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax));
// Although we do this replace and parse operation again in the resource task,
// we try here to be able to immediately throw a syntax error on failure.
let _ = try!(parse_url(&replace_hosts(&resource_url)).map_err(|_| Error::Syntax));
// Step 2: Disallow https -> ws connections.
// Step 3: Potentially block access to some ports.
@ -257,62 +220,58 @@ impl WebSocket {
let address = Trusted::new(global.get_cx(), ws.r(), global.networking_task_source());
let origin = global.get_url().serialize();
let connect_data = WebSocketConnectData {
resource_url: resource_url.clone(),
origin: origin,
};
// Create the interface for communication with the resource task
let (dom_action_sender, resource_action_receiver):
(IpcSender<WebSocketDomAction>,
IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap();
let (resource_event_sender, dom_event_receiver):
(IpcSender<WebSocketNetworkEvent>,
IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap();
let connect = WebSocketCommunicate {
event_sender: resource_event_sender,
action_receiver: resource_action_receiver,
};
let resource_task = global.resource_task();
let _ = resource_task.send(WebsocketConnect(connect, connect_data));
*ws.sender.borrow_mut() = Some(dom_action_sender);
let moved_address = address.clone();
let sender = global.networking_task_source();
spawn_named(format!("WebSocket connection to {}", ws.Url()), move || {
// Step 8: Protocols.
// Step 9.
let channel = establish_a_websocket_connection(&resource_url, net_url, origin);
let (ws_sender, mut receiver) = match channel {
Ok(channel) => channel,
Err(e) => {
debug!("Failed to establish a WebSocket connection: {:?}", e);
let task = box CloseTask {
addr: address,
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap();
return;
}
};
let ws_sender = Arc::new(Mutex::new(ws_sender));
let open_task = box ConnectionEstablishedTask {
addr: address.clone(),
sender: ws_sender.clone(),
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap();
for message in receiver.incoming_messages() {
let message: Message = match message {
Ok(m) => m,
Err(_) => break,
};
let message = match message.opcode {
Type::Text => MessageData::Text(String::from_utf8_lossy(&message.payload).into_owned()),
Type::Binary => MessageData::Binary(message.payload.into_owned()),
Type::Ping => {
let pong = Message::pong(message.payload);
ws_sender.lock().unwrap().send_message(&pong).unwrap();
continue;
thread::spawn(move || {
while let Ok(event) = dom_event_receiver.recv() {
match event {
WebSocketNetworkEvent::ConnectionEstablished => {
let open_task = box ConnectionEstablishedTask {
addr: moved_address.clone(),
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, open_task)).unwrap();
},
Type::Pong => continue,
Type::Close => {
ws_sender.lock().unwrap().send_message(&message).unwrap();
WebSocketNetworkEvent::MessageReceived(message) => {
let message_task = box MessageReceivedTask {
address: moved_address.clone(),
message: message,
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap();
},
WebSocketNetworkEvent::Close => {
let task = box CloseTask {
addr: address,
addr: moved_address.clone(),
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, task)).unwrap();
break;
},
};
let message_task = box MessageReceivedTask {
address: address.clone(),
message: message,
};
sender.send(CommonScriptMsg::RunnableMsg(WebSocketEvent, message_task)).unwrap();
}
}
});
// Step 7.
Ok(ws)
}
@ -408,7 +367,7 @@ impl WebSocketMethods for WebSocket {
if send_data {
let mut other_sender = self.sender.borrow_mut();
let my_sender = other_sender.as_mut().unwrap();
let _ = my_sender.lock().unwrap().send_message(&Message::text(data.0));
let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0)));
}
Ok(())
@ -427,7 +386,7 @@ impl WebSocketMethods for WebSocket {
if send_data {
let mut other_sender = self.sender.borrow_mut();
let my_sender = other_sender.as_mut().unwrap();
let _ = my_sender.lock().unwrap().send_message(&Message::binary(data.clone_bytes()));
let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(data.clone_bytes())));
}
Ok(())
@ -443,11 +402,10 @@ impl WebSocketMethods for WebSocket {
if let Some(sender) = sender.as_mut() {
let code: u16 = this.code.get();
let reason = this.reason.borrow().clone();
let _ = sender.lock().unwrap().send_message(&Message::close_because(code, reason));
let _ = sender.send(WebSocketDomAction::Close(code, reason));
}
}
if let Some(code) = code {
//Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications
if code != close_code::NORMAL && (code < 3000 || code > 4999) {
@ -490,15 +448,11 @@ impl WebSocketMethods for WebSocket {
/// Task queued when *the WebSocket connection is established*.
struct ConnectionEstablishedTask {
addr: Trusted<WebSocket>,
sender: Arc<Mutex<Sender<WebSocketStream>>>,
}
impl Runnable for ConnectionEstablishedTask {
fn handler(self: Box<Self>) {
let ws = self.addr.root();
*ws.sender.borrow_mut() = Some(self.sender);
// Step 1: Protocols.
// Step 2.