Make async XMLHttpRequest requests use async network events.

This commit is contained in:
Josh Matthews 2015-01-26 11:56:46 +00:00
parent 7517aac9e9
commit 5c7be5c9c3
4 changed files with 283 additions and 13 deletions

View file

@ -10,6 +10,7 @@
//! with CORSRequest being expanded into FetchRequest (etc)
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
use time;
use time::{now, Timespec};
@ -24,6 +25,15 @@ use hyper::method::Method;
use hyper::status::StatusClass::Success;
use url::{SchemeData, Url};
use util::task::spawn_named;
pub trait AsyncCORSResponseListener {
fn response_available(&self, response: CORSResponse);
}
pub trait AsyncCORSResponseTarget {
fn invoke_with_listener(&self, response: CORSResponse);
}
#[derive(Clone)]
pub struct CORSRequest {
@ -88,7 +98,17 @@ impl CORSRequest {
}
}
/// https://fetch.spec.whatwg.org/#concept-http-fetch
pub fn http_fetch_async(&self, listener: Box<AsyncCORSResponseTarget + Send>) {
// TODO: this exists only to make preflight check non-blocking
// perhaps should be handled by the resource task?
let req = self.clone();
spawn_named("cors".to_owned(), move || {
let response = req.http_fetch();
listener.invoke_with_listener(response);
});
}
/// http://fetch.spec.whatwg.org/#concept-http-fetch
/// This method assumes that the CORS flag is set
/// This does not perform the full HTTP fetch, rather it handles part of the CORS filtering
/// if self.mode is ForcedPreflight, then the CORS-with-forced-preflight

View file

@ -16,6 +16,7 @@ use std::cell::{BorrowState, RefCell, Ref, RefMut};
///
/// This extends the API of `core::cell::RefCell` to allow unsafe access in
/// certain situations, with dynamic checking in debug builds.
#[derive(Clone)]
pub struct DOMRefCell<T> {
value: RefCell<T>,
}

View file

@ -207,6 +207,16 @@ impl<T: JSTraceable> JSTraceable for Option<T> {
}
}
impl<T: JSTraceable, U: JSTraceable> JSTraceable for Result<T, U> {
#[inline]
fn trace(&self, trc: *mut JSTracer) {
match *self {
Ok(ref inner) => inner.trace(trc),
Err(ref inner) => inner.trace(trc),
}
}
}
impl<K,V,S> JSTraceable for HashMap<K, V, S>
where K: Hash + Eq + JSTraceable,
V: JSTraceable,
@ -297,6 +307,12 @@ impl JSTraceable for Box<LayoutRPC+'static> {
}
}
impl JSTraceable for () {
#[inline]
fn trace(&self, _trc: *mut JSTracer) {
}
}
/// Holds a set of vectors that need to be rooted
pub struct RootedCollectionSet {
set: Vec<HashSet<*const RootedVec<()>>>

View file

@ -44,18 +44,21 @@ use js::jsval::{JSVal, NullValue, UndefinedValue};
use net_traits::ControlMsg::Load;
use net_traits::ProgressMsg::{Payload, Done};
use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer};
use cors::{allow_cross_origin_request, CORSRequest, RequestMode};
use net_traits::{ResourceTask, ResourceCORSData, LoadData, LoadConsumer, AsyncResponseTarget};
use net_traits::{AsyncResponseListener, ResponseAction, Metadata};
use cors::{allow_cross_origin_request, CORSRequest, RequestMode, AsyncCORSResponseListener};
use cors::{AsyncCORSResponseTarget, CORSResponse};
use util::str::DOMString;
use util::task::spawn_named;
use std::ascii::AsciiExt;
use std::borrow::ToOwned;
use std::cell::Cell;
use std::cell::{RefCell, Cell};
use std::sync::mpsc::{Sender, Receiver, channel};
use std::default::Default;
use std::old_io::Timer;
use std::str::FromStr;
use std::sync::{Mutex, Arc};
use std::time::duration::Duration;
use time;
use url::{Url, UrlParser};
@ -159,6 +162,7 @@ pub struct XMLHttpRequest {
fetch_time: Cell<i64>,
terminate_sender: DOMRefCell<Option<Sender<TerminateReason>>>,
generation_id: Cell<GenerationId>,
response_status: Cell<Result<(), ()>>,
}
impl XMLHttpRequest {
@ -191,7 +195,8 @@ impl XMLHttpRequest {
timer: DOMRefCell::new(Timer::new().unwrap()),
fetch_time: Cell::new(0),
terminate_sender: DOMRefCell::new(None),
generation_id: Cell::new(GenerationId(0))
generation_id: Cell::new(GenerationId(0)),
response_status: Cell::new(Ok(())),
}
}
pub fn new(global: GlobalRef) -> Temporary<XMLHttpRequest> {
@ -210,6 +215,197 @@ impl XMLHttpRequest {
xhr.r().process_partial_response(progress);
}
#[allow(unsafe_code)]
fn fetch2(xhr: TrustedXHRAddress, script_chan: Box<ScriptChan+Send>,
resource_task: ResourceTask, load_data: LoadData, sync: bool,
terminate_receiver: Receiver<TerminateReason>,
cors_request: Result<Option<CORSRequest>,()>, gen_id: GenerationId) {
let cors_request = match cors_request {
Err(_) => {
// Happens in case of cross-origin non-http URIs
//notify_error_and_return!(Network);
return; //XXXjdm
}
Ok(req) => req,
};
#[derive(Clone)]
struct XHRContext {
xhr: TrustedXHRAddress,
gen_id: GenerationId,
cors_request: Option<CORSRequest>,
buf: DOMRefCell<Vec<u8>>,
terminate_receiver: Arc<Mutex<Receiver<TerminateReason>>>,
got_response_complete: Cell<bool>,
}
let context = Arc::new(Mutex::new(XHRContext {
xhr: xhr,
cors_request: cors_request.clone(),
gen_id: gen_id,
terminate_receiver: Arc::new(Mutex::new(terminate_receiver)),
buf: DOMRefCell::new(vec!()),
got_response_complete: Cell::new(false),
}));
if let Some(req) = cors_request {
struct CORSContext {
xhr: Arc<Mutex<XHRContext>>,
load_data: RefCell<Option<LoadData>>,
req: CORSRequest,
script_chan: Box<ScriptChan+Send>,
resource_task: ResourceTask,
}
impl AsyncCORSResponseListener for CORSContext {
fn response_available(&self, response: CORSResponse) {
if response.network_error {
//notify_error_and_return!(Network);
return; //XXXjdm
}
let mut load_data = self.load_data.borrow_mut().take().unwrap();
load_data.cors = Some(ResourceCORSData {
preflight: self.req.preflight_flag,
origin: self.req.origin.clone()
});
initiate_async_xhr(self.xhr.clone(), self.script_chan.clone(),
self.resource_task.clone(), load_data);
}
}
struct CORSListener {
context: Arc<Mutex<CORSContext>>,
script_chan: Box<ScriptChan+Send>,
}
impl AsyncCORSResponseTarget for CORSListener {
fn invoke_with_listener(&self, response: CORSResponse) {
self.script_chan.send(ScriptMsg::RunnableMsg(box CORSRunnable {
context: self.context.clone(),
response: response,
})).unwrap();
}
}
struct CORSRunnable {
context: Arc<Mutex<CORSContext>>,
response: CORSResponse,
}
impl Runnable for CORSRunnable {
fn handler(self: Box<CORSRunnable>) {
let this = *self;
let context = this.context.lock().unwrap();
context.response_available(this.response);
}
}
let cors_context = Arc::new(Mutex::new(CORSContext {
xhr: context.clone(),
load_data: RefCell::new(Some(load_data)),
req: req.clone(),
script_chan: script_chan.clone(),
resource_task: resource_task,
}));
req.http_fetch_async(box CORSListener {
context: cors_context,
script_chan: script_chan
});
} else {
initiate_async_xhr(context.clone(), script_chan, resource_task, load_data);
}
impl AsyncResponseListener for XHRContext {
fn headers_available(&self, metadata: Metadata) {
let xhr = self.xhr.to_temporary().root();
let _decision = xhr.r().process_headers_available(self.cors_request.clone(),
self.gen_id,
metadata);
}
fn data_available(&self, payload: Vec<u8>) {
self.buf.borrow_mut().push_all(payload.as_slice());
let xhr = self.xhr.to_temporary().root();
xhr.r().process_data_available(self.gen_id, self.buf.borrow().clone());
}
fn response_complete(&self, status: Result<(), String>) {
let xhr = self.xhr.to_temporary().root();
xhr.r().process_response_complete(self.gen_id, status);
self.got_response_complete.set(true);
}
}
struct XHRRunnable {
context: Arc<Mutex<XHRContext>>,
action: ResponseAction,
}
impl Runnable for XHRRunnable {
fn handler(self: Box<XHRRunnable>) {
let this = *self;
let context = this.context.lock();
let context = context.unwrap();
let xhr = context.xhr.to_temporary().root();
if xhr.r().generation_id.get() != context.gen_id {
return;
}
{
let terminate_receiver = context.terminate_receiver.lock().unwrap();
if let Ok(reason) = terminate_receiver.try_recv() {
match reason {
TerminateReason::AbortedOrReopened => return, //Err(Abort)
TerminateReason::TimedOut => {
xhr.r().process_partial_response(
XHRProgress::Errored(context.gen_id, Network));
return; //Err(Network)
}
}
}
}
this.action.process(&*context);
}
}
struct XHRListener {
context: Arc<Mutex<XHRContext>>,
script_chan: Box<ScriptChan+Send>,
}
impl AsyncResponseTarget for XHRListener {
fn invoke_with_listener(&self, action: ResponseAction) {
self.script_chan.send(ScriptMsg::RunnableMsg(box XHRRunnable {
context: self.context.clone(),
action: action
})).unwrap();
}
}
fn initiate_async_xhr(context: Arc<Mutex<XHRContext>>,
script_chan: Box<ScriptChan+Send>,
resource_task: ResourceTask,
load_data: LoadData) {
let listener = box XHRListener {
context: context,
script_chan: script_chan,
};
resource_task.send(Load(load_data, LoadConsumer::Listener(listener))).unwrap();
}
if sync {
while !context.lock().unwrap().got_response_complete.get() {
//TODO: spin the event loop
panic!("don't know how to spin the event loop yet");
}
}
}
#[allow(unsafe_code)]
fn fetch(fetch_type: &SyncOrAsync, resource_task: ResourceTask,
mut load_data: LoadData, terminate_receiver: Receiver<TerminateReason>,
@ -659,14 +855,8 @@ impl<'a> XMLHttpRequestMethods for JSRef<'a, XMLHttpRequest> {
// inflight events queued up in the script task's port.
let addr = Trusted::new(self.global.root().r().get_cx(), self,
script_chan.clone());
spawn_named("XHRTask".to_owned(), move || {
let _ = XMLHttpRequest::fetch(&mut SyncOrAsync::Async(addr, script_chan),
resource_task,
load_data,
terminate_receiver,
cors_request,
gen_id);
});
XMLHttpRequest::fetch2(addr, script_chan, resource_task, load_data, self.sync.get(),
terminate_receiver, cors_request, gen_id);
let timeout = self.timeout.get();
if timeout > 0 {
self.set_timeout(timeout);
@ -811,6 +1001,10 @@ pub type TrustedXHRAddress = Trusted<XMLHttpRequest>;
trait PrivateXMLHttpRequestHelpers {
fn change_ready_state(self, XMLHttpRequestState);
fn process_headers_available(&self, cors_request: Option<CORSRequest>,
gen_id: GenerationId, metadata: Metadata) -> Result<(), Error>;
fn process_data_available(self, gen_id: GenerationId, payload: Vec<u8>);
fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>);
fn process_partial_response(self, progress: XHRProgress);
fn terminate_ongoing_fetch(self);
fn insert_trusted_header(self, name: String, value: String);
@ -836,6 +1030,38 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> {
event.r().fire(target);
}
fn process_headers_available(&self, cors_request: Option<CORSRequest>,
gen_id: GenerationId, metadata: Metadata) -> Result<(), Error> {
match cors_request {
Some(ref req) => {
match metadata.headers {
Some(ref h) if allow_cross_origin_request(req, h) => {},
_ => {
self.process_partial_response(XHRProgress::Errored(gen_id, Network));
return Err(Network);
}
}
},
_ => {}
};
// XXXManishearth Clear cache entries in case of a network error
self.process_partial_response(XHRProgress::HeadersReceived(gen_id,
metadata.headers, metadata.status));
Ok(())
}
fn process_data_available(self, gen_id: GenerationId, payload: Vec<u8>) {
self.process_partial_response(XHRProgress::Loading(gen_id, ByteString::new(payload)));
}
fn process_response_complete(self, gen_id: GenerationId, status: Result<(), String>) {
match status {
Ok(()) => self.process_partial_response(XHRProgress::Done(gen_id)),
Err(_) => self.process_partial_response(XHRProgress::Errored(gen_id, Network)),
}
}
fn process_partial_response(self, progress: XHRProgress) {
let msg_id = progress.generation_id();
@ -852,6 +1078,11 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> {
// Ignore message if it belongs to a terminated fetch
return_if_fetch_was_terminated!();
// Ignore messages coming from previously-errored responses
if self.response_status.get().is_err() {
return;
}
match progress {
XHRProgress::HeadersReceived(_, headers, status) => {
assert!(self.ready_state.get() == XMLHttpRequestState::Opened);
@ -918,6 +1149,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> {
self.dispatch_response_progress_event("loadend".to_owned());
},
XHRProgress::Errored(_, e) => {
self.response_status.set(Err(()));
self.send_flag.set(false);
// XXXManishearth set response to NetworkError
self.change_ready_state(XMLHttpRequestState::Done);
@ -952,6 +1184,7 @@ impl<'a> PrivateXMLHttpRequestHelpers for JSRef<'a, XMLHttpRequest> {
let GenerationId(prev_id) = self.generation_id.get();
self.generation_id.set(GenerationId(prev_id + 1));
self.terminate_sender.borrow().as_ref().map(|s| s.send(TerminateReason::AbortedOrReopened));
self.response_status.set(Ok(()));
}
fn insert_trusted_header(self, name: String, value: String) {