devtools: Track multiple clients better, and cleanup streams when a client isn't reachable.

This commit is contained in:
Josh Matthews 2020-08-05 14:56:32 -04:00
parent 0b619bf920
commit f4915ef6c9
21 changed files with 137 additions and 51 deletions

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
/// General actor system infrastructure.
use crate::StreamId;
use devtools_traits::PreciseTime;
use serde_json::{Map, Value};
use std::any::Any;
@ -21,18 +22,20 @@ pub enum ActorMessageStatus {
/// A common trait for all devtools actors that encompasses an immutable name
/// and the ability to process messages that are directed to particular actors.
/// TODO: ensure the name is immutable
pub trait Actor: Any + ActorAsAny {
pub(crate) trait Actor: Any + ActorAsAny {
fn handle_message(
&self,
registry: &ActorRegistry,
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<ActorMessageStatus, ()>;
fn name(&self) -> String;
fn cleanup(&self, _id: StreamId) {}
}
pub trait ActorAsAny {
pub(crate) trait ActorAsAny {
fn actor_as_any(&self) -> &dyn Any;
fn actor_as_any_mut(&mut self) -> &mut dyn Any;
}
@ -71,6 +74,12 @@ impl ActorRegistry {
}
}
pub(crate) fn cleanup(&self, id: StreamId) {
for actor in self.actors.values() {
actor.cleanup(id);
}
}
/// Creating shareable registry
pub fn create_shareable(self) -> Arc<Mutex<ActorRegistry>> {
if let Some(shareable) = self.shareable {
@ -131,11 +140,11 @@ impl ActorRegistry {
}
/// Add an actor to the registry of known actors that can receive messages.
pub fn register(&mut self, actor: Box<dyn Actor + Send>) {
pub(crate) fn register(&mut self, actor: Box<dyn Actor + Send>) {
self.actors.insert(actor.name(), actor);
}
pub fn register_later(&self, actor: Box<dyn Actor + Send>) {
pub(crate) fn register_later(&self, actor: Box<dyn Actor + Send>) {
let mut actors = self.new_actors.borrow_mut();
actors.push(actor);
}
@ -154,10 +163,11 @@ impl ActorRegistry {
/// Attempt to process a message as directed by its `to` property. If the actor is not
/// found or does not indicate that it knew how to process the message, ignore the failure.
pub fn handle_message(
pub(crate) fn handle_message(
&mut self,
msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<(), ()> {
let to = match msg.get("to") {
Some(to) => to.as_str().unwrap(),
@ -171,7 +181,7 @@ impl ActorRegistry {
None => debug!("message received for unknown actor \"{}\"", to),
Some(actor) => {
let msg_type = msg.get("type").unwrap().as_str().unwrap();
if actor.handle_message(self, msg_type, msg, stream)? !=
if actor.handle_message(self, msg_type, msg, stream, id)? !=
ActorMessageStatus::Processed
{
debug!(