Auto merge of #27512 - jdm:devtools-panic, r=Manishearth

Handle devtools clients disconnecting better

These changes fix many opportunities for the devtools server to panic if a client disconnects early. Now that it's easy to have multiple clients connected simultaneously, this also makes it easier for server code to distinguish between different clients and ensure that one client disconnecting doesn't affect another client that's still connected.

---
- [x] `./mach build -d` does not report any errors
- [x] `./mach test-tidy` does not report any errors
- [x] These changes fix #27509
- [x] These changes do not require tests because the devtools server is a wild west.

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
This commit is contained in:
bors-servo 2020-08-06 14:02:17 -04:00 committed by GitHub
commit 6a7e9ff438
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 244 additions and 136 deletions

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
/// General actor system infrastructure.
use crate::StreamId;
use devtools_traits::PreciseTime;
use serde_json::{Map, Value};
use std::any::Any;
@ -21,18 +22,20 @@ pub enum ActorMessageStatus {
/// A common trait for all devtools actors that encompasses an immutable name
/// and the ability to process messages that are directed to particular actors.
/// TODO: ensure the name is immutable
pub trait Actor: Any + ActorAsAny {
pub(crate) trait Actor: Any + ActorAsAny {
fn handle_message(
&self,
registry: &ActorRegistry,
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<ActorMessageStatus, ()>;
fn name(&self) -> String;
fn cleanup(&self, _id: StreamId) {}
}
pub trait ActorAsAny {
pub(crate) trait ActorAsAny {
fn actor_as_any(&self) -> &dyn Any;
fn actor_as_any_mut(&mut self) -> &mut dyn Any;
}
@ -71,6 +74,12 @@ impl ActorRegistry {
}
}
pub(crate) fn cleanup(&self, id: StreamId) {
for actor in self.actors.values() {
actor.cleanup(id);
}
}
/// Creating shareable registry
pub fn create_shareable(self) -> Arc<Mutex<ActorRegistry>> {
if let Some(shareable) = self.shareable {
@ -131,11 +140,11 @@ impl ActorRegistry {
}
/// Add an actor to the registry of known actors that can receive messages.
pub fn register(&mut self, actor: Box<dyn Actor + Send>) {
pub(crate) fn register(&mut self, actor: Box<dyn Actor + Send>) {
self.actors.insert(actor.name(), actor);
}
pub fn register_later(&self, actor: Box<dyn Actor + Send>) {
pub(crate) fn register_later(&self, actor: Box<dyn Actor + Send>) {
let mut actors = self.new_actors.borrow_mut();
actors.push(actor);
}
@ -154,10 +163,11 @@ impl ActorRegistry {
/// Attempt to process a message as directed by its `to` property. If the actor is not
/// found or does not indicate that it knew how to process the message, ignore the failure.
pub fn handle_message(
pub(crate) fn handle_message(
&mut self,
msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<(), ()> {
let to = match msg.get("to") {
Some(to) => to.as_str().unwrap(),
@ -171,7 +181,7 @@ impl ActorRegistry {
None => debug!("message received for unknown actor \"{}\"", to),
Some(actor) => {
let msg_type = msg.get("type").unwrap().as_str().unwrap();
if actor.handle_message(self, msg_type, msg, stream)? !=
if actor.handle_message(self, msg_type, msg, stream, id)? !=
ActorMessageStatus::Processed
{
debug!(

View file

@ -17,6 +17,7 @@ use crate::actors::tab::TabDescriptorActor;
use crate::actors::thread::ThreadActor;
use crate::actors::timeline::TimelineActor;
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use devtools_traits::DevtoolScriptControlMsg::{self, WantsLiveNotifications};
use devtools_traits::DevtoolsPageInfo;
use devtools_traits::NavigationState;
@ -24,6 +25,7 @@ use ipc_channel::ipc::IpcSender;
use msg::constellation_msg::{BrowsingContextId, PipelineId};
use serde_json::{Map, Value};
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::net::TcpStream;
#[derive(Serialize)]
@ -118,7 +120,7 @@ pub struct BrowsingContextActorMsg {
manifestActor: String,*/
}
pub struct BrowsingContextActor {
pub(crate) struct BrowsingContextActor {
pub name: String,
pub title: RefCell<String>,
pub url: RefCell<String>,
@ -131,7 +133,7 @@ pub struct BrowsingContextActor {
pub styleSheets: String,
pub thread: String,
pub tab: String,
pub streams: RefCell<Vec<TcpStream>>,
pub streams: RefCell<HashMap<StreamId, TcpStream>>,
pub browsing_context_id: BrowsingContextId,
pub active_pipeline: Cell<PipelineId>,
pub script_chan: IpcSender<DevtoolScriptControlMsg>,
@ -148,6 +150,7 @@ impl Actor for BrowsingContextActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"reconfigure" => {
@ -160,7 +163,7 @@ impl Actor for BrowsingContextActor {
}
}
}
stream.write_json_packet(&ReconfigureReply { from: self.name() });
let _ = stream.write_json_packet(&ReconfigureReply { from: self.name() });
ActorMessageStatus::Processed
},
@ -181,26 +184,26 @@ impl Actor for BrowsingContextActor {
watchpoints: false,
},
};
self.streams.borrow_mut().push(stream.try_clone().unwrap());
stream.write_json_packet(&msg);
if stream.write_json_packet(&msg).is_err() {
return Ok(ActorMessageStatus::Processed);
}
self.streams
.borrow_mut()
.insert(id, stream.try_clone().unwrap());
self.script_chan
.send(WantsLiveNotifications(self.active_pipeline.get(), true))
.unwrap();
ActorMessageStatus::Processed
},
//FIXME: The current implementation won't work for multiple connections. Need to ensure
// that the correct stream is removed.
"detach" => {
let msg = BrowsingContextDetachedReply {
from: self.name(),
type_: "detached".to_owned(),
};
self.streams.borrow_mut().pop();
stream.write_json_packet(&msg);
self.script_chan
.send(WantsLiveNotifications(self.active_pipeline.get(), false))
.unwrap();
let _ = stream.write_json_packet(&msg);
self.cleanup(id);
ActorMessageStatus::Processed
},
@ -215,7 +218,7 @@ impl Actor for BrowsingContextActor {
title: self.title.borrow().clone(),
}],
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -224,13 +227,22 @@ impl Actor for BrowsingContextActor {
from: self.name(),
workers: vec![],
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
_ => ActorMessageStatus::Ignored,
})
}
fn cleanup(&self, id: StreamId) {
self.streams.borrow_mut().remove(&id);
if self.streams.borrow().is_empty() {
self.script_chan
.send(WantsLiveNotifications(self.active_pipeline.get(), false))
.unwrap();
}
}
}
impl BrowsingContextActor {
@ -284,7 +296,7 @@ impl BrowsingContextActor {
styleSheets: styleSheets.name(),
tab: tabdesc.name(),
thread: thread.name(),
streams: RefCell::new(Vec::new()),
streams: RefCell::new(HashMap::new()),
browsing_context_id: id,
active_pipeline: Cell::new(pipeline),
};
@ -347,8 +359,8 @@ impl BrowsingContextActor {
state: state.to_owned(),
isFrameSwitching: false,
};
for stream in &mut *self.streams.borrow_mut() {
stream.write_json_packet(&msg);
for stream in self.streams.borrow_mut().values_mut() {
let _ = stream.write_json_packet(&msg);
}
}

View file

@ -12,7 +12,7 @@ use crate::actors::browsing_context::BrowsingContextActor;
use crate::actors::object::ObjectActor;
use crate::actors::worker::WorkerActor;
use crate::protocol::JsonPacketStream;
use crate::UniqueId;
use crate::{StreamId, UniqueId};
use devtools_traits::CachedConsoleMessage;
use devtools_traits::ConsoleMessage;
use devtools_traits::EvaluateJSReply::{ActorValue, BooleanValue, StringValue};
@ -23,7 +23,7 @@ use devtools_traits::{
use ipc_channel::ipc::{self, IpcSender};
use msg::constellation_msg::TEST_PIPELINE_ID;
use serde_json::{self, Map, Number, Value};
use std::cell::{RefCell, RefMut};
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::TcpStream;
use time::precise_time_ns;
@ -130,15 +130,20 @@ impl ConsoleActor {
}
}
fn streams_mut<'a>(&self, registry: &'a ActorRegistry) -> RefMut<'a, Vec<TcpStream>> {
fn streams_mut<'a>(&self, registry: &'a ActorRegistry, cb: impl Fn(&mut TcpStream)) {
match &self.root {
Root::BrowsingContext(bc) => registry
.find::<BrowsingContextActor>(bc)
.streams
.borrow_mut(),
Root::DedicatedWorker(worker) => {
registry.find::<WorkerActor>(worker).streams.borrow_mut()
},
.borrow_mut()
.values_mut()
.for_each(cb),
Root::DedicatedWorker(worker) => registry
.find::<WorkerActor>(worker)
.streams
.borrow_mut()
.values_mut()
.for_each(cb),
}
}
@ -255,9 +260,9 @@ impl ConsoleActor {
type_: "pageError".to_owned(),
pageError: page_error,
};
for stream in &mut *self.streams_mut(registry) {
stream.write_json_packet(&msg);
}
self.streams_mut(registry, |stream| {
let _ = stream.write_json_packet(&msg);
});
}
}
@ -303,9 +308,9 @@ impl ConsoleActor {
columnNumber: console_message.columnNumber,
},
};
for stream in &mut *self.streams_mut(registry) {
stream.write_json_packet(&msg);
}
self.streams_mut(registry, |stream| {
let _ = stream.write_json_packet(&msg);
});
}
}
}
@ -321,6 +326,7 @@ impl Actor for ConsoleActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"clearMessagesCache" => {
@ -380,7 +386,7 @@ impl Actor for ConsoleActor {
from: self.name(),
messages: messages,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -396,7 +402,7 @@ impl Actor for ConsoleActor {
.collect(),
traits: StartedListenersTraits,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -413,7 +419,7 @@ impl Actor for ConsoleActor {
.map(|listener| listener.as_str().unwrap().to_owned())
.collect(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -425,13 +431,13 @@ impl Actor for ConsoleActor {
matches: vec![],
matchProp: "".to_owned(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"evaluateJS" => {
let msg = self.evaluateJS(&registry, &msg);
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -443,7 +449,9 @@ impl Actor for ConsoleActor {
};
// Emit an eager reply so that the client starts listening
// for an async event with the resultID
stream.write_json_packet(&early_reply);
if stream.write_json_packet(&early_reply).is_err() {
return Ok(ActorMessageStatus::Processed);
}
if msg.get("eager").and_then(|v| v.as_bool()).unwrap_or(false) {
// We don't support the side-effect free evaluation that eager evalaution
@ -464,7 +472,7 @@ impl Actor for ConsoleActor {
helperResult: reply.helperResult,
};
// Send the data from evaluateJS along with a resultID
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -473,7 +481,7 @@ impl Actor for ConsoleActor {
from: self.name(),
updated: vec![],
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -5,6 +5,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::protocol::{ActorDescription, Method};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -34,6 +35,7 @@ impl Actor for DeviceActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getDescription" => {
@ -44,7 +46,7 @@ impl Actor for DeviceActor {
platformVersion: "71.0".to_string(),
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -21,6 +22,7 @@ impl Actor for EmulationActor {
msg_type: &str,
_msg: &Map<String, Value>,
_stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
_ => ActorMessageStatus::Ignored,

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::actors::timeline::HighResolutionStamp;
use crate::StreamId;
use devtools_traits::DevtoolScriptControlMsg;
use ipc_channel::ipc::IpcSender;
use msg::constellation_msg::PipelineId;
@ -32,6 +33,7 @@ impl Actor for FramerateActor {
_msg_type: &str,
_msg: &Map<String, Value>,
_stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(ActorMessageStatus::Ignored)
}

View file

@ -8,6 +8,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::actors::browsing_context::BrowsingContextActor;
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use devtools_traits::DevtoolScriptControlMsg::{GetChildren, GetDocumentElement, GetRootNode};
use devtools_traits::DevtoolScriptControlMsg::{GetLayout, ModifyAttribute};
use devtools_traits::{ComputedNodeLayout, DevtoolScriptControlMsg, NodeInfo};
@ -68,17 +69,18 @@ impl Actor for HighlighterActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"showBoxModel" => {
let msg = ShowBoxModelReply { from: self.name() };
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"hideBoxModel" => {
let msg = HideBoxModelReply { from: self.name() };
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -103,6 +105,7 @@ impl Actor for NodeActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"modifyAttributes" => {
@ -123,7 +126,7 @@ impl Actor for NodeActor {
))
.unwrap();
let reply = ModifyAttributeReply { from: self.name() };
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -289,11 +292,12 @@ impl Actor for WalkerActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"querySelector" => {
let msg = QuerySelectorReply { from: self.name() };
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -310,13 +314,13 @@ impl Actor for WalkerActor {
from: self.name(),
node: node,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"clearPseudoClassLocks" => {
let msg = ClearPseudoclassesReply { from: self.name() };
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -343,7 +347,7 @@ impl Actor for WalkerActor {
.collect(),
from: self.name(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -471,6 +475,7 @@ impl Actor for PageStyleActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getApplied" => {
@ -481,7 +486,7 @@ impl Actor for PageStyleActor {
sheets: vec![],
from: self.name(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -491,7 +496,7 @@ impl Actor for PageStyleActor {
computed: vec![],
from: self.name(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -576,7 +581,7 @@ impl Actor for PageStyleActor {
};
let msg = serde_json::to_string(&msg).unwrap();
let msg = serde_json::from_str::<Value>(&msg).unwrap();
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -596,6 +601,7 @@ impl Actor for InspectorActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
let browsing_context = registry.find::<BrowsingContextActor>(&self.browsing_context);
let pipeline = browsing_context.active_pipeline.get();
@ -625,7 +631,7 @@ impl Actor for InspectorActor {
root: node,
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -647,7 +653,7 @@ impl Actor for InspectorActor {
actor: self.pageStyle.borrow().clone().unwrap(),
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -670,7 +676,7 @@ impl Actor for InspectorActor {
actor: self.highlighter.borrow().clone().unwrap(),
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -34,6 +35,7 @@ impl Actor for MemoryActor {
_msg_type: &str,
_msg: &Map<String, Value>,
_stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(ActorMessageStatus::Ignored)
}

View file

@ -8,6 +8,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use devtools_traits::HttpRequest as DevtoolsHttpRequest;
use devtools_traits::HttpResponse as DevtoolsHttpResponse;
use headers::{ContentType, Cookie, HeaderMapExt};
@ -180,6 +181,7 @@ impl Actor for NetworkEventActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getRequestHeaders" => {
@ -201,7 +203,7 @@ impl Actor for NetworkEventActor {
headerSize: headersSize,
rawHeaders: rawHeadersString,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getRequestCookies" => {
@ -217,7 +219,7 @@ impl Actor for NetworkEventActor {
from: self.name(),
cookies: cookies,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getRequestPostData" => {
@ -226,7 +228,7 @@ impl Actor for NetworkEventActor {
postData: self.request.body.clone(),
postDataDiscarded: false,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getResponseHeaders" => {
@ -251,7 +253,7 @@ impl Actor for NetworkEventActor {
headerSize: headersSize,
rawHeaders: rawHeadersString,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
}
ActorMessageStatus::Processed
},
@ -268,7 +270,7 @@ impl Actor for NetworkEventActor {
from: self.name(),
cookies: cookies,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getResponseContent" => {
@ -277,7 +279,7 @@ impl Actor for NetworkEventActor {
content: self.response.body.clone(),
contentDiscarded: self.response.body.is_none(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getEventTimings" => {
@ -297,7 +299,7 @@ impl Actor for NetworkEventActor {
timings: timingsObj,
totalTime: total,
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"getSecurityInfo" => {
@ -308,7 +310,7 @@ impl Actor for NetworkEventActor {
state: "insecure".to_owned(),
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
_ => ActorMessageStatus::Ignored,

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -21,6 +22,7 @@ impl Actor for ObjectActor {
_: &str,
_: &Map<String, Value>,
_: &mut TcpStream,
_: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(ActorMessageStatus::Ignored)
}

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::{ActorDescription, JsonPacketStream, Method};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -57,6 +58,7 @@ impl Actor for PerformanceActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"connect" => {
@ -72,7 +74,7 @@ impl Actor for PerformanceActor {
},
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"canCurrentlyRecord" => {
@ -83,7 +85,7 @@ impl Actor for PerformanceActor {
errors: vec![],
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
_ => ActorMessageStatus::Ignored,

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -28,6 +29,7 @@ impl Actor for PreferenceActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getBoolPref" => {
@ -35,7 +37,7 @@ impl Actor for PreferenceActor {
from: self.name(),
value: false,
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -44,7 +46,7 @@ impl Actor for PreferenceActor {
from: self.name(),
value: "".to_owned(),
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -53,7 +55,7 @@ impl Actor for PreferenceActor {
from: self.name(),
value: 0,
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -34,6 +35,7 @@ impl Actor for ProcessActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"listWorkers" => {
@ -41,7 +43,7 @@ impl Actor for ProcessActor {
from: self.name(),
workers: vec![],
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},

View file

@ -3,6 +3,7 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -21,6 +22,7 @@ impl Actor for ProfilerActor {
_msg_type: &str,
_msg: &Map<String, Value>,
_stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(ActorMessageStatus::Ignored)
}

View file

@ -12,6 +12,7 @@ use crate::actors::performance::PerformanceActor;
use crate::actors::tab::{TabDescriptorActor, TabDescriptorActorMsg};
use crate::actors::worker::{WorkerActor, WorkerMsg};
use crate::protocol::{ActorDescription, JsonPacketStream};
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -124,6 +125,7 @@ impl Actor for RootActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"listAddons" => {
@ -131,7 +133,7 @@ impl Actor for RootActor {
from: "root".to_owned(),
addons: vec![],
};
stream.write_json_packet(&actor);
let _ = stream.write_json_packet(&actor);
ActorMessageStatus::Processed
},
@ -144,7 +146,7 @@ impl Actor for RootActor {
isParent: true,
}],
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -157,7 +159,7 @@ impl Actor for RootActor {
isParent: true,
},
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -169,7 +171,7 @@ impl Actor for RootActor {
deviceActor: self.device.clone(),
preferenceActor: self.preference.clone(),
};
stream.write_json_packet(&actor);
let _ = stream.write_json_packet(&actor);
ActorMessageStatus::Processed
},
@ -188,7 +190,7 @@ impl Actor for RootActor {
})
.collect(),
};
stream.write_json_packet(&actor);
let _ = stream.write_json_packet(&actor);
ActorMessageStatus::Processed
},
@ -197,7 +199,7 @@ impl Actor for RootActor {
from: self.name(),
registrations: vec![],
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -210,7 +212,7 @@ impl Actor for RootActor {
.map(|name| registry.find::<WorkerActor>(name).encodable())
.collect(),
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -220,7 +222,7 @@ impl Actor for RootActor {
from: self.name(),
tab: tab.encodable(&registry),
};
stream.write_json_packet(&reply);
let _ = stream.write_json_packet(&reply);
ActorMessageStatus::Processed
},
@ -232,7 +234,7 @@ impl Actor for RootActor {
device: DeviceActor::description(),
},
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -27,6 +28,7 @@ impl Actor for StyleSheetsActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getStyleSheets" => {
@ -34,7 +36,7 @@ impl Actor for StyleSheetsActor {
from: self.name(),
styleSheets: vec![],
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -6,6 +6,7 @@ use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::actors::browsing_context::{BrowsingContextActor, BrowsingContextActorMsg};
use crate::actors::root::RootActor;
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -48,13 +49,14 @@ impl Actor for TabDescriptorActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"getTarget" => {
let frame = registry
.find::<BrowsingContextActor>(&self.browsing_context_actor)
.encodable();
stream.write_json_packet(&GetTargetReply {
let _ = stream.write_json_packet(&GetTargetReply {
from: self.name(),
frame,
});

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use serde_json::{Map, Value};
use std::net::TcpStream;
@ -84,6 +85,7 @@ impl Actor for ThreadActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"attach" => {
@ -100,8 +102,8 @@ impl Actor for ThreadActor {
type_: "attached".to_owned(),
},
};
stream.write_json_packet(&msg);
stream.write_json_packet(&VoidAttachedReply { from: self.name() });
let _ = stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&VoidAttachedReply { from: self.name() });
ActorMessageStatus::Processed
},
@ -110,8 +112,8 @@ impl Actor for ThreadActor {
from: self.name(),
type_: "resumed".to_owned(),
};
stream.write_json_packet(&msg);
stream.write_json_packet(&VoidAttachedReply { from: self.name() });
let _ = stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&VoidAttachedReply { from: self.name() });
ActorMessageStatus::Processed
},
@ -120,12 +122,12 @@ impl Actor for ThreadActor {
from: self.name(),
type_: "interrupted".to_owned(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
"reconfigure" => {
stream.write_json_packet(&ReconfigureReply { from: self.name() });
let _ = stream.write_json_packet(&ReconfigureReply { from: self.name() });
ActorMessageStatus::Processed
},
@ -134,7 +136,7 @@ impl Actor for ThreadActor {
from: self.name(),
sources: vec![],
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},

View file

@ -6,6 +6,7 @@ use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::actors::framerate::FramerateActor;
use crate::actors::memory::{MemoryActor, TimelineMemoryReply};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use devtools_traits::DevtoolScriptControlMsg;
use devtools_traits::DevtoolScriptControlMsg::{DropTimelineMarkers, SetTimelineMarkers};
use devtools_traits::{PreciseTime, TimelineMarker, TimelineMarkerType};
@ -14,6 +15,7 @@ use msg::constellation_msg::PipelineId;
use serde::{Serialize, Serializer};
use serde_json::{Map, Value};
use std::cell::RefCell;
use std::error::Error;
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::thread;
@ -166,7 +168,9 @@ impl TimelineActor {
while let Ok(Some(marker)) = receiver.try_recv() {
markers.push(emitter.marker(marker));
}
emitter.send(markers);
if emitter.send(markers).is_err() {
break;
}
thread::sleep(Duration::from_millis(DEFAULT_TIMELINE_DATA_PULL_TIMEOUT));
})
@ -185,6 +189,7 @@ impl Actor for TimelineActor {
msg_type: &str,
msg: &Map<String, Value>,
stream: &mut TcpStream,
_id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"start" => {
@ -199,6 +204,7 @@ impl Actor for TimelineActor {
))
.unwrap();
//TODO: support multiple connections by using root actor's streams instead.
*self.stream.borrow_mut() = stream.try_clone().ok();
// init memory actor
@ -235,7 +241,7 @@ impl Actor for TimelineActor {
from: self.name(),
value: HighResolutionStamp::new(registry.start_stamp(), PreciseTime::now()),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -245,7 +251,7 @@ impl Actor for TimelineActor {
value: HighResolutionStamp::new(registry.start_stamp(), PreciseTime::now()),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
self.script_sender
.send(DropTimelineMarkers(
self.pipeline,
@ -253,6 +259,7 @@ impl Actor for TimelineActor {
))
.unwrap();
//TODO: move this to the cleanup method.
if let Some(ref actor_name) = *self.framerate_actor.borrow() {
registry.drop_actor_later(actor_name.clone());
}
@ -272,7 +279,7 @@ impl Actor for TimelineActor {
value: self.is_recording.lock().unwrap().clone(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -311,7 +318,7 @@ impl Emitter {
}
}
fn send(&mut self, markers: Vec<TimelineMarkerReply>) {
fn send(&mut self, markers: Vec<TimelineMarkerReply>) -> Result<(), Box<dyn Error>> {
let end_time = PreciseTime::now();
let reply = MarkersEmitterReply {
type_: "markers".to_owned(),
@ -319,7 +326,7 @@ impl Emitter {
from: self.from.clone(),
endTime: HighResolutionStamp::new(self.start_stamp, end_time),
};
self.stream.write_json_packet(&reply);
self.stream.write_json_packet(&reply)?;
if let Some(ref actor_name) = self.framerate_actor {
let mut lock = self.registry.lock();
@ -331,7 +338,7 @@ impl Emitter {
delta: HighResolutionStamp::new(self.start_stamp, end_time),
timestamps: framerate_actor.take_pending_ticks(),
};
self.stream.write_json_packet(&framerateReply);
self.stream.write_json_packet(&framerateReply)?;
}
if let Some(ref actor_name) = self.memory_actor {
@ -343,7 +350,9 @@ impl Emitter {
delta: HighResolutionStamp::new(self.start_stamp, end_time),
measurement: memory_actor.measure(),
};
self.stream.write_json_packet(&memoryReply);
self.stream.write_json_packet(&memoryReply)?;
}
Ok(())
}
}

View file

@ -4,6 +4,7 @@
use crate::actor::{Actor, ActorMessageStatus, ActorRegistry};
use crate::protocol::JsonPacketStream;
use crate::StreamId;
use devtools_traits::DevtoolScriptControlMsg::WantsLiveNotifications;
use devtools_traits::{DevtoolScriptControlMsg, WorkerId};
use ipc_channel::ipc::IpcSender;
@ -11,6 +12,7 @@ use msg::constellation_msg::TEST_PIPELINE_ID;
use serde_json::{Map, Value};
use servo_url::ServoUrl;
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::TcpStream;
#[derive(Clone, Copy)]
@ -21,7 +23,7 @@ pub enum WorkerType {
Service = 2,
}
pub struct WorkerActor {
pub(crate) struct WorkerActor {
pub name: String,
pub console: String,
pub thread: String,
@ -29,7 +31,7 @@ pub struct WorkerActor {
pub url: ServoUrl,
pub type_: WorkerType,
pub script_chan: IpcSender<DevtoolScriptControlMsg>,
pub streams: RefCell<Vec<TcpStream>>,
pub streams: RefCell<HashMap<StreamId, TcpStream>>,
}
impl WorkerActor {
@ -58,6 +60,7 @@ impl Actor for WorkerActor {
msg_type: &str,
_msg: &Map<String, Value>,
stream: &mut TcpStream,
id: StreamId,
) -> Result<ActorMessageStatus, ()> {
Ok(match msg_type {
"attach" => {
@ -66,8 +69,12 @@ impl Actor for WorkerActor {
type_: "attached".to_owned(),
url: self.url.as_str().to_owned(),
};
self.streams.borrow_mut().push(stream.try_clone().unwrap());
stream.write_json_packet(&msg);
if stream.write_json_packet(&msg).is_err() {
return Ok(ActorMessageStatus::Processed);
}
self.streams
.borrow_mut()
.insert(id, stream.try_clone().unwrap());
// FIXME: fix messages to not require forging a pipeline for worker messages
self.script_chan
.send(WantsLiveNotifications(TEST_PIPELINE_ID, true))
@ -82,7 +89,7 @@ impl Actor for WorkerActor {
threadActor: self.thread.clone(),
consoleActor: self.console.clone(),
};
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
ActorMessageStatus::Processed
},
@ -91,18 +98,23 @@ impl Actor for WorkerActor {
from: self.name(),
type_: "detached".to_string(),
};
// FIXME: we should ensure we're removing the correct stream.
self.streams.borrow_mut().pop();
stream.write_json_packet(&msg);
self.script_chan
.send(WantsLiveNotifications(TEST_PIPELINE_ID, false))
.unwrap();
let _ = stream.write_json_packet(&msg);
self.cleanup(id);
ActorMessageStatus::Processed
},
_ => ActorMessageStatus::Ignored,
})
}
fn cleanup(&self, id: StreamId) {
self.streams.borrow_mut().remove(&id);
if self.streams.borrow().is_empty() {
self.script_chan
.send(WantsLiveNotifications(TEST_PIPELINE_ID, false))
.unwrap();
}
}
}
#[derive(Serialize)]

View file

@ -127,6 +127,9 @@ pub fn start_server(port: u16, embedder: EmbedderProxy) -> Sender<DevtoolsContro
sender
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(crate) struct StreamId(u32);
fn run_server(
sender: Sender<DevtoolsControlMsg>,
receiver: Receiver<DevtoolsControlMsg>,
@ -188,22 +191,25 @@ fn run_server(
let mut actor_workers: HashMap<WorkerId, String> = HashMap::new();
/// Process the input from a single devtools client until EOF.
fn handle_client(actors: Arc<Mutex<ActorRegistry>>, mut stream: TcpStream) {
fn handle_client(actors: Arc<Mutex<ActorRegistry>>, mut stream: TcpStream, id: StreamId) {
debug!("connection established to {}", stream.peer_addr().unwrap());
{
let actors = actors.lock().unwrap();
let msg = actors.find::<RootActor>("root").encodable();
stream.write_json_packet(&msg);
if let Err(e) = stream.write_json_packet(&msg) {
warn!("Error writing response: {:?}", e);
return;
}
}
'outer: loop {
match stream.read_json_packet() {
Ok(Some(json_packet)) => {
if let Err(()) = actors
.lock()
.unwrap()
.handle_message(json_packet.as_object().unwrap(), &mut stream)
{
if let Err(()) = actors.lock().unwrap().handle_message(
json_packet.as_object().unwrap(),
&mut stream,
id,
) {
debug!("error: devtools actor stopped responding");
let _ = stream.shutdown(Shutdown::Both);
break 'outer;
@ -219,6 +225,8 @@ fn run_server(
},
}
}
actors.lock().unwrap().cleanup(id);
}
fn handle_framerate_tick(actors: Arc<Mutex<ActorRegistry>>, actor_name: String, tick: f64) {
@ -451,7 +459,7 @@ fn run_server(
eventActor: actor.event_actor(),
};
for stream in &mut connections {
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
}
},
NetworkEvent::HttpResponse(httpresponse) => {
@ -464,7 +472,7 @@ fn run_server(
updateType: "requestHeaders".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &actor.request_headers());
let _ = stream.write_merged_json_packet(&msg, &actor.request_headers());
}
let msg = NetworkEventUpdateMsg {
@ -473,7 +481,7 @@ fn run_server(
updateType: "requestCookies".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &actor.request_cookies());
let _ = stream.write_merged_json_packet(&msg, &actor.request_cookies());
}
//Send a networkEventUpdate (responseStart) to the client
@ -485,7 +493,7 @@ fn run_server(
};
for stream in &mut connections {
stream.write_json_packet(&msg);
let _ = stream.write_json_packet(&msg);
}
let msg = NetworkEventUpdateMsg {
from: netevent_actor_name.clone(),
@ -496,7 +504,7 @@ fn run_server(
totalTime: actor.total_time(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &extra);
let _ = stream.write_merged_json_packet(&msg, &extra);
}
let msg = NetworkEventUpdateMsg {
@ -508,7 +516,7 @@ fn run_server(
state: "insecure".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &extra);
let _ = stream.write_merged_json_packet(&msg, &extra);
}
let msg = NetworkEventUpdateMsg {
@ -517,7 +525,7 @@ fn run_server(
updateType: "responseContent".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &actor.response_content());
let _ = stream.write_merged_json_packet(&msg, &actor.response_content());
}
let msg = NetworkEventUpdateMsg {
@ -526,7 +534,7 @@ fn run_server(
updateType: "responseCookies".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &actor.response_cookies());
let _ = stream.write_merged_json_packet(&msg, &actor.response_cookies());
}
let msg = NetworkEventUpdateMsg {
@ -535,7 +543,7 @@ fn run_server(
updateType: "responseHeaders".to_owned(),
};
for stream in &mut connections {
stream.write_merged_json_packet(&msg, &actor.response_headers());
let _ = stream.write_merged_json_packet(&msg, &actor.response_headers());
}
},
}
@ -583,15 +591,18 @@ fn run_server(
})
.expect("Thread spawning failed");
let mut next_id = StreamId(0);
while let Ok(msg) = receiver.recv() {
debug!("{:?}", msg);
match msg {
DevtoolsControlMsg::FromChrome(ChromeToDevtoolsControlMsg::AddClient(stream)) => {
let actors = actors.clone();
let id = next_id;
next_id = StreamId(id.0 + 1);
accepted_connections.push(stream.try_clone().unwrap());
thread::Builder::new()
.name("DevtoolsClientHandler".to_owned())
.spawn(move || handle_client(actors, stream.try_clone().unwrap()))
.spawn(move || handle_client(actors, stream.try_clone().unwrap(), id))
.expect("Thread spawning failed");
},
DevtoolsControlMsg::FromScript(ScriptToDevtoolsControlMsg::FramerateTick(

View file

@ -8,6 +8,7 @@
use serde::Serialize;
use serde_json::{self, Value};
use std::error::Error;
use std::io::{Read, Write};
use std::net::TcpStream;
@ -26,29 +27,38 @@ pub struct Method {
}
pub trait JsonPacketStream {
fn write_json_packet<T: Serialize>(&mut self, obj: &T);
fn write_merged_json_packet<T: Serialize, U: Serialize>(&mut self, base: &T, extra: &U);
fn write_json_packet<T: Serialize>(&mut self, obj: &T) -> Result<(), Box<dyn Error>>;
fn write_merged_json_packet<T: Serialize, U: Serialize>(
&mut self,
base: &T,
extra: &U,
) -> Result<(), Box<dyn Error>>;
fn read_json_packet(&mut self) -> Result<Option<Value>, String>;
}
impl JsonPacketStream for TcpStream {
fn write_json_packet<T: Serialize>(&mut self, obj: &T) {
let s = serde_json::to_string(obj).unwrap();
fn write_json_packet<T: Serialize>(&mut self, obj: &T) -> Result<(), Box<dyn Error>> {
let s = serde_json::to_string(obj)?;
debug!("<- {}", s);
write!(self, "{}:{}", s.len(), s).unwrap();
write!(self, "{}:{}", s.len(), s)?;
Ok(())
}
fn write_merged_json_packet<T: Serialize, U: Serialize>(&mut self, base: &T, extra: &U) {
let mut obj = serde_json::to_value(base).unwrap();
fn write_merged_json_packet<T: Serialize, U: Serialize>(
&mut self,
base: &T,
extra: &U,
) -> Result<(), Box<dyn Error>> {
let mut obj = serde_json::to_value(base)?;
let obj = obj.as_object_mut().unwrap();
let extra = serde_json::to_value(extra).unwrap();
let extra = serde_json::to_value(extra)?;
let extra = extra.as_object().unwrap();
for (key, value) in extra {
obj.insert(key.to_owned(), value.to_owned());
}
self.write_json_packet(obj);
self.write_json_packet(obj)
}
fn read_json_packet(&mut self) -> Result<Option<Value>, String> {
@ -74,7 +84,9 @@ impl JsonPacketStream for TcpStream {
Err(_) => return Err("packet length missing / not parsable".to_owned()),
};
let mut packet = String::new();
self.take(packet_len).read_to_string(&mut packet).unwrap();
self.take(packet_len)
.read_to_string(&mut packet)
.map_err(|e| e.to_string())?;
debug!("{}", packet);
return match serde_json::from_str(&packet) {
Ok(json) => Ok(Some(json)),