Reduce right drift in scheme_fetch

This commit is contained in:
Pyfisch 2018-11-03 16:15:48 +01:00
parent 2481ad25f8
commit f512e262a5

View file

@ -528,136 +528,120 @@ fn scheme_fetch(
}, },
"file" => { "file" => {
if request.method == Method::GET { if request.method != Method::GET {
match url.to_file_path() { return Response::network_error(NetworkError::Internal(
Ok(file_path) => { "Unexpected method for file".into(),
match File::open(file_path.clone()) { ));
Ok(mut file) => { }
let mime = guess_mime_type(file_path); if let Ok(file_path) = url.to_file_path() {
if let Ok(file) = File::open(file_path.clone()) {
let mime = guess_mime_type(file_path);
let mut response = Response::new(url); let mut response = Response::new(url);
response.headers.typed_insert(ContentType::from(mime)); response.headers.typed_insert(ContentType::from(mime));
let (done_sender, done_receiver) = channel(); let (done_sender, done_receiver) = channel();
*done_chan = Some((done_sender.clone(), done_receiver)); *done_chan = Some((done_sender.clone(), done_receiver));
*response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]);
let mut res_body = response.body.clone(); let mut res_body = response.body.clone();
let cancellation_listener = context.cancellation_listener.clone(); let cancellation_listener = context.cancellation_listener.clone();
let (start, end) = if let Some(ref range) = let (start, end) = if let Some(ref range) = request.headers.typed_get::<Range>()
request.headers.typed_get::<Range>() {
{ match range
match range .iter()
.iter() .collect::<Vec<(Bound<u64>, Bound<u64>)>>()
.collect::<Vec<(Bound<u64>, Bound<u64>)>>() .first()
.first() {
{ Some(&(Bound::Included(start), Bound::Unbounded)) => (start, None),
Some(&(Bound::Included(start), Bound::Unbounded)) => { Some(&(Bound::Included(start), Bound::Included(end))) => {
(start, None) // `end` should be less or equal to `start`.
}, (start, Some(u64::max(start, end)))
Some(&(Bound::Included(start), Bound::Included(end))) => { },
// `end` should be less or equal to `start`. Some(&(Bound::Unbounded, Bound::Included(offset))) => {
(start, Some(u64::max(start, end))) if let Ok(metadata) = file.metadata() {
}, // `offset` cannot be bigger than the file size.
Some(&(Bound::Unbounded, Bound::Included(offset))) => { (metadata.len() - u64::min(metadata.len(), offset), None)
if let Ok(metadata) = file.metadata() {
// `offset` cannot be bigger than the file size.
(
metadata.len() -
u64::min(metadata.len(), offset),
None,
)
} else {
(0, None)
}
},
_ => (0, None),
}
} else { } else {
(0, None) (0, None)
}; }
thread::Builder::new()
.name("fetch file worker thread".to_string())
.spawn(move || {
let mut reader =
BufReader::with_capacity(FILE_CHUNK_SIZE, file);
if reader.seek(SeekFrom::Start(start)).is_err() {
warn!("Fetch - could not seek to {:?}", start);
}
loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() =
ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
}
let length = {
let mut buffer =
reader.fill_buf().unwrap().to_vec();
let mut buffer_len = buffer.len();
if let ResponseBody::Receiving(ref mut body) =
*res_body.lock().unwrap()
{
let offset = usize::min(
{
if let Some(end) = end {
let remaining_bytes = end as usize -
start as usize -
body.len();
if remaining_bytes <=
FILE_CHUNK_SIZE
{
// This is the last chunk so we set buffer len to 0 to break
// the reading loop.
buffer_len = 0;
remaining_bytes
} else {
FILE_CHUNK_SIZE
}
} else {
FILE_CHUNK_SIZE
}
},
buffer.len(),
);
body.extend_from_slice(&buffer[0..offset]);
let _ = done_sender.send(Data::Payload(buffer));
}
buffer_len
};
if length == 0 {
let mut body = res_body.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => {
mem::replace(body, vec![])
},
_ => vec![],
};
*body = ResponseBody::Done(completed_body);
let _ = done_sender.send(Data::Done);
break;
}
reader.consume(length);
}
})
.expect("Failed to create fetch file worker thread");
response
}, },
_ => Response::network_error(NetworkError::Internal( _ => (0, None),
"Opening file failed".into(),
)),
} }
}, } else {
_ => Response::network_error(NetworkError::Internal( (0, None)
"Constructing file path failed".into(), };
)),
thread::Builder::new()
.name("fetch file worker thread".to_string())
.spawn(move || {
let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file);
if reader.seek(SeekFrom::Start(start)).is_err() {
warn!("Fetch - could not seek to {:?}", start);
}
loop {
if cancellation_listener.lock().unwrap().cancelled() {
*res_body.lock().unwrap() = ResponseBody::Done(vec![]);
let _ = done_sender.send(Data::Cancelled);
return;
}
let length = {
let mut buffer = reader.fill_buf().unwrap().to_vec();
let mut buffer_len = buffer.len();
if let ResponseBody::Receiving(ref mut body) =
*res_body.lock().unwrap()
{
let offset = usize::min(
{
if let Some(end) = end {
let remaining_bytes =
end as usize - start as usize - body.len();
if remaining_bytes <= FILE_CHUNK_SIZE {
// This is the last chunk so we set buffer
// len to 0 to break the reading loop.
buffer_len = 0;
remaining_bytes
} else {
FILE_CHUNK_SIZE
}
} else {
FILE_CHUNK_SIZE
}
},
buffer.len(),
);
body.extend_from_slice(&buffer[0..offset]);
let _ = done_sender.send(Data::Payload(buffer));
}
buffer_len
};
if length == 0 {
let mut body = res_body.lock().unwrap();
let completed_body = match *body {
ResponseBody::Receiving(ref mut body) => {
mem::replace(body, vec![])
},
_ => vec![],
};
*body = ResponseBody::Done(completed_body);
let _ = done_sender.send(Data::Done);
break;
}
reader.consume(length);
}
})
.expect("Failed to create fetch file worker thread");
response
} else {
Response::network_error(NetworkError::Internal("Opening file failed".into()))
} }
} else { } else {
Response::network_error(NetworkError::Internal("Unexpected method for file".into())) Response::network_error(NetworkError::Internal(
"Constructing file path failed".into(),
))
} }
}, },