Measure cache memory usage (#19251):

Made the memory cache data structure derive MallocSizeOf, along with
manual size_of() implementations in malloc_size_of.

Added a Measurable struct that acts as a container for fields size_of() can be called for.

Added a new IpcReceiver used for listening to messages from the memory profiler,
and used run_with_memory reporting to register a memory reporter in the thread.
Now when a message from the memory profiler arrives, report includes sizes of public and private http caches.

Updated test file.
This commit is contained in:
modal-d17 2018-02-20 16:00:16 -05:00 committed by modal17
parent d232705106
commit af445a357d
13 changed files with 312 additions and 76 deletions

View file

@ -16,6 +16,7 @@ use http_cache::HttpCache;
use http_loader::{HttpState, http_redirect_fetch};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use net_traits::{CookieSource, CoreResourceThread};
use net_traits::{CoreResourceMsg, CustomResponseMediator, FetchChannels};
use net_traits::{FetchResponseMsg, ResourceThreads, WebSocketDomAction};
@ -23,9 +24,12 @@ use net_traits::WebSocketNetworkEvent;
use net_traits::request::{Request, RequestInit};
use net_traits::response::{Response, ResponseInit};
use net_traits::storage_thread::StorageThreadMsg;
use profile_traits::mem::{Report, ReportsChan, ReportKind};
use profile_traits::mem::ProfilerChan as MemProfilerChan;
use profile_traits::time::ProfilerChan;
use serde::{Deserialize, Serialize};
use serde_json;
use servo_allocator;
use servo_config::opts;
use servo_config::resource_files::resources_dir_path;
use servo_url::ServoUrl;
@ -47,13 +51,15 @@ const TFD_PROVIDER: &'static TFDProvider = &TFDProvider;
/// Returns a tuple of (public, private) senders to the new threads.
pub fn new_resource_threads(user_agent: Cow<'static, str>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
profiler_chan: ProfilerChan,
time_profiler_chan: ProfilerChan,
mem_profiler_chan: MemProfilerChan,
config_dir: Option<PathBuf>)
-> (ResourceThreads, ResourceThreads) {
let (public_core, private_core) = new_core_resource_thread(
user_agent,
devtools_chan,
profiler_chan,
time_profiler_chan,
mem_profiler_chan,
config_dir.clone());
let storage: IpcSender<StorageThreadMsg> = StorageThreadFactory::new(config_dir);
(ResourceThreads::new(public_core, storage.clone()),
@ -64,22 +70,34 @@ pub fn new_resource_threads(user_agent: Cow<'static, str>,
/// Create a CoreResourceThread
pub fn new_core_resource_thread(user_agent: Cow<'static, str>,
devtools_chan: Option<Sender<DevtoolsControlMsg>>,
profiler_chan: ProfilerChan,
time_profiler_chan: ProfilerChan,
mem_profiler_chan: MemProfilerChan,
config_dir: Option<PathBuf>)
-> (CoreResourceThread, CoreResourceThread) {
let (public_setup_chan, public_setup_port) = ipc::channel().unwrap();
let (private_setup_chan, private_setup_port) = ipc::channel().unwrap();
let (report_chan, report_port) = ipc::channel().unwrap();
thread::Builder::new().name("ResourceManager".to_owned()).spawn(move || {
let resource_manager = CoreResourceManager::new(
user_agent, devtools_chan, profiler_chan
user_agent, devtools_chan, time_profiler_chan
);
let mut channel_manager = ResourceChannelManager {
resource_manager: resource_manager,
config_dir: config_dir,
};
channel_manager.start(public_setup_port,
private_setup_port);
mem_profiler_chan.run_with_memory_reporting(|| (
channel_manager.start(
public_setup_port,
private_setup_port,
report_port)
),
String::from("network-cache-reporter"),
report_chan,
|report_chan| report_chan);
}).expect("Thread spawning failed");
(public_setup_chan, private_setup_chan)
}
@ -127,31 +145,69 @@ impl ResourceChannelManager {
#[allow(unsafe_code)]
fn start(&mut self,
public_receiver: IpcReceiver<CoreResourceMsg>,
private_receiver: IpcReceiver<CoreResourceMsg>) {
private_receiver: IpcReceiver<CoreResourceMsg>,
memory_reporter: IpcReceiver<ReportsChan>) {
let (public_http_state, private_http_state) =
create_http_states(self.config_dir.as_ref().map(Deref::deref));
let mut rx_set = IpcReceiverSet::new().unwrap();
let private_id = rx_set.add(private_receiver).unwrap();
let public_id = rx_set.add(public_receiver).unwrap();
let reporter_id = rx_set.add(memory_reporter).unwrap();
loop {
for (id, data) in rx_set.select().unwrap().into_iter().map(|m| m.unwrap()) {
let group = if id == private_id {
&private_http_state
for receiver in rx_set.select().unwrap().into_iter() {
// Handles case where profiler thread shuts down before resource thread.
match receiver {
ipc::IpcSelectionResult::ChannelClosed(..) => continue,
_ => {}
}
let (id, data) = receiver.unwrap();
// If message is memory report, get the size_of of public and private http caches
if id == reporter_id {
if let Ok(msg) = data.to() {
self.process_report(msg, &private_http_state, &public_http_state);
continue;
}
} else {
assert_eq!(id, public_id);
&public_http_state
};
if let Ok(msg) = data.to() {
if !self.process_msg(msg, group) {
return;
let group = if id == private_id {
&private_http_state
} else {
assert_eq!(id, public_id);
&public_http_state
};
if let Ok(msg) = data.to() {
if !self.process_msg(msg, group) {
return;
}
}
}
}
}
}
fn process_report(&mut self,
msg: ReportsChan,
public_http_state: &Arc<HttpState>,
private_http_state: &Arc<HttpState>) {
let mut ops = MallocSizeOfOps::new(servo_allocator::usable_size, None, None);
let public_cache = public_http_state.http_cache.read().unwrap();
let private_cache = private_http_state.http_cache.read().unwrap();
let public_report = Report {
path: path!["memory-cache", "public"],
kind: ReportKind::ExplicitJemallocHeapSize,
size: public_cache.size_of(&mut ops)
};
let private_report = Report {
path: path!["memory-cache", "private"],
kind: ReportKind::ExplicitJemallocHeapSize,
size: private_cache.size_of(&mut ops)
};
msg.send(vec!(public_report, private_report));
}
/// Returns false if the thread should exit.
fn process_msg(&mut self,