mirror of
https://github.com/servo/servo.git
synced 2025-08-06 22:15:33 +01:00
add servo_channel crate
This commit is contained in:
parent
704f7a06b1
commit
b977b4994c
12 changed files with 252 additions and 11 deletions
21
components/channel/Cargo.toml
Normal file
21
components/channel/Cargo.toml
Normal file
|
@ -0,0 +1,21 @@
|
|||
[package]
|
||||
name = "servo_channel"
|
||||
version = "0.0.1"
|
||||
authors = ["The Servo Project Developers"]
|
||||
license = "MPL-2.0"
|
||||
publish = false
|
||||
|
||||
[lib]
|
||||
name = "servo_channel"
|
||||
path = "lib.rs"
|
||||
test = false
|
||||
doctest = false
|
||||
|
||||
[dependencies]
|
||||
crossbeam-channel = "0.2.5"
|
||||
ipc-channel = "0.11"
|
||||
serde = "1.0"
|
||||
|
||||
[[test]]
|
||||
name = "main"
|
||||
path = "tests/disconnect.rs"
|
163
components/channel/lib.rs
Normal file
163
components/channel/lib.rs
Normal file
|
@ -0,0 +1,163 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
extern crate crossbeam_channel;
|
||||
extern crate ipc_channel;
|
||||
extern crate serde;
|
||||
|
||||
pub mod base_channel {
|
||||
pub use crossbeam_channel::*;
|
||||
}
|
||||
// Needed to re-export the select macro.
|
||||
pub use crossbeam_channel::*;
|
||||
|
||||
use ipc_channel::ipc::IpcReceiver;
|
||||
use ipc_channel::router::ROUTER;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
|
||||
pub fn route_ipc_receiver_to_new_servo_receiver<T>(ipc_receiver: IpcReceiver<T>) -> Receiver<T>
|
||||
where
|
||||
T: for<'de> Deserialize<'de> + Serialize + Send + 'static
|
||||
{
|
||||
let (servo_sender, servo_receiver) = channel();
|
||||
ROUTER.add_route(
|
||||
ipc_receiver.to_opaque(),
|
||||
Box::new(move |message| {
|
||||
drop(servo_sender.send(message.to::<T>().unwrap()))
|
||||
}),
|
||||
);
|
||||
servo_receiver
|
||||
}
|
||||
|
||||
pub fn route_ipc_receiver_to_new_servo_sender<T>(ipc_receiver: IpcReceiver<T>, servo_sender: Sender<T>)
|
||||
where
|
||||
T: for<'de> Deserialize<'de> + Serialize + Send + 'static
|
||||
{
|
||||
ROUTER.add_route(
|
||||
ipc_receiver.to_opaque(),
|
||||
Box::new(move |message| {
|
||||
drop(servo_sender.send(message.to::<T>().unwrap()))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
let (base_sender, base_receiver) = crossbeam_channel::unbounded::<T>();
|
||||
let is_disconnected = Arc::new(AtomicBool::new(false));
|
||||
(Sender::new(base_sender, is_disconnected.clone()),
|
||||
Receiver::new(base_receiver, is_disconnected))
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ChannelError {
|
||||
ChannelClosedError
|
||||
}
|
||||
|
||||
pub struct Receiver<T> {
|
||||
receiver: crossbeam_channel::Receiver<T>,
|
||||
is_disconnected: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
self.is_disconnected.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Receiver<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Receiver {
|
||||
receiver: self.receiver.clone(),
|
||||
is_disconnected: self.is_disconnected.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
pub fn new(receiver: crossbeam_channel::Receiver<T>, is_disconnected: Arc<AtomicBool>) -> Receiver<T> {
|
||||
Receiver {
|
||||
receiver,
|
||||
is_disconnected,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv(&self) -> Option<T> {
|
||||
self.receiver.recv()
|
||||
}
|
||||
|
||||
pub fn try_recv(&self) -> Option<T> {
|
||||
self.receiver.try_recv()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.receiver.len()
|
||||
}
|
||||
|
||||
pub fn select(&self) -> &crossbeam_channel::Receiver<T> {
|
||||
&self.receiver
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Iterator for Receiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.receiver.recv()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> IntoIterator for &'a Receiver<T> {
|
||||
type Item = T;
|
||||
type IntoIter = crossbeam_channel::Receiver<T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.receiver.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Sender<T> {
|
||||
sender: crossbeam_channel::Sender<T>,
|
||||
is_disconnected: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T> Clone for Sender<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Sender {
|
||||
sender: self.sender.clone(),
|
||||
is_disconnected: self.is_disconnected.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sender<T> {
|
||||
pub fn new(sender: crossbeam_channel::Sender<T>, is_disconnected: Arc<AtomicBool>) -> Sender<T> {
|
||||
Sender {
|
||||
sender,
|
||||
is_disconnected,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: T) -> Result<(), ChannelError> {
|
||||
if self.is_disconnected.load(Ordering::SeqCst) {
|
||||
Err(ChannelError::ChannelClosedError)
|
||||
} else {
|
||||
Ok(self.sender.send(msg))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.sender.len()
|
||||
}
|
||||
|
||||
pub fn select(&self) -> Option<&crossbeam_channel::Sender<T>> {
|
||||
if self.is_disconnected.load(Ordering::SeqCst) {
|
||||
None
|
||||
} else {
|
||||
Some(&self.sender)
|
||||
}
|
||||
}
|
||||
}
|
31
components/channel/tests/disconnect.rs
Normal file
31
components/channel/tests/disconnect.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#[macro_use]
|
||||
extern crate servo_channel;
|
||||
|
||||
use servo_channel::{channel, ChannelError};
|
||||
|
||||
#[test]
|
||||
fn send_after_receiver_dropped() {
|
||||
let (sender, receiver) = channel();
|
||||
drop(receiver);
|
||||
assert_eq!(sender.send(1), Err(ChannelError::ChannelClosedError));
|
||||
let sent = select! {
|
||||
send(sender.select(), 1) => true,
|
||||
default => false
|
||||
};
|
||||
assert_eq!(sent, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_with_receiver_connected() {
|
||||
let (sender, _receiver) = channel();
|
||||
assert_eq!(sender.send(1), Ok(()));
|
||||
let sent = select! {
|
||||
send(sender.select(), 1) => true,
|
||||
default => false
|
||||
};
|
||||
assert_eq!(sent, true);
|
||||
}
|
|
@ -34,6 +34,8 @@ extern crate profile_traits;
|
|||
extern crate script_traits;
|
||||
#[macro_use]
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
extern crate servo_channel;
|
||||
extern crate servo_config;
|
||||
extern crate servo_rand;
|
||||
extern crate servo_remutex;
|
||||
|
|
|
@ -6,8 +6,7 @@ use ipc_channel::ipc::{self, IpcSender};
|
|||
use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg};
|
||||
use std::cmp::{self, Ord};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::TryRecvError::{Disconnected, Empty};
|
||||
use servo_channel::base_channel;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -40,7 +39,7 @@ impl PartialEq for ScheduledEvent {
|
|||
impl TimerScheduler {
|
||||
pub fn start() -> IpcSender<TimerSchedulerMsg> {
|
||||
let (req_ipc_sender, req_ipc_receiver) = ipc::channel().expect("Channel creation failed.");
|
||||
let (req_sender, req_receiver) = mpsc::sync_channel(1);
|
||||
let (req_sender, req_receiver) = base_channel::bounded(1);
|
||||
|
||||
// We could do this much more directly with recv_timeout
|
||||
// (https://github.com/rust-lang/rfcs/issues/962).
|
||||
|
|
|
@ -44,6 +44,8 @@ extern crate serde_json;
|
|||
extern crate servo_allocator;
|
||||
extern crate servo_arc;
|
||||
extern crate servo_atoms;
|
||||
#[macro_use]
|
||||
extern crate servo_channel;
|
||||
extern crate servo_config;
|
||||
extern crate servo_geometry;
|
||||
extern crate servo_url;
|
||||
|
|
|
@ -49,6 +49,8 @@ use profile_traits::ipc;
|
|||
use script_traits::{DrawAPaintImageResult, PaintWorkletError};
|
||||
use script_traits::Painter;
|
||||
use servo_atoms::Atom;
|
||||
use servo_channel::{channel, Sender};
|
||||
use servo_channel::base_channel;
|
||||
use servo_config::prefs::PREFS;
|
||||
use servo_url::ServoUrl;
|
||||
use std::cell::Cell;
|
||||
|
@ -364,9 +366,12 @@ impl PaintWorkletGlobalScope {
|
|||
.as_u64()
|
||||
.unwrap_or(10u64);
|
||||
|
||||
let timeout_duration = Duration::from_millis(timeout);
|
||||
receiver.recv_timeout(timeout_duration)
|
||||
.map_err(|e| PaintWorkletError::from(e))
|
||||
select! {
|
||||
recv(base_channel::after(Duration::from_millis(timeout))) => {
|
||||
Err(PaintWorkletError::Timeout)
|
||||
}
|
||||
recv(receiver.select(), msg) => msg.ok_or(PaintWorkletError::Timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
Box::new(WorkletPainter {
|
||||
|
|
|
@ -84,6 +84,7 @@ extern crate serde_bytes;
|
|||
extern crate servo_allocator;
|
||||
extern crate servo_arc;
|
||||
#[macro_use] extern crate servo_atoms;
|
||||
#[macro_use] extern crate servo_channel;
|
||||
extern crate servo_config;
|
||||
extern crate servo_geometry;
|
||||
extern crate servo_media;
|
||||
|
|
|
@ -51,6 +51,7 @@ profile_traits = {path = "../profile_traits"}
|
|||
script = {path = "../script"}
|
||||
script_layout_interface = {path = "../script_layout_interface"}
|
||||
script_traits = {path = "../script_traits"}
|
||||
servo_channel = {path = "../channel"}
|
||||
servo_config = {path = "../config"}
|
||||
servo_geometry = {path = "../geometry"}
|
||||
servo_url = {path = "../url"}
|
||||
|
|
|
@ -28,6 +28,7 @@ pub extern crate bluetooth;
|
|||
pub extern crate bluetooth_traits;
|
||||
pub extern crate canvas;
|
||||
pub extern crate canvas_traits;
|
||||
pub extern crate servo_channel;
|
||||
pub extern crate compositing;
|
||||
pub extern crate constellation;
|
||||
pub extern crate debugger;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue