Port StorageThreadMsg to GenericChannel (#38932)

This change includes the following additions to GenericChannel:
- Add a GenericSend trait which is meant to replace the `IpcSend` trait
over time, as channels are migrated. For the time being this means, that
we often need to use `GenericSend::send()` to disambiguate from the
`IpcSend::send` function, until all usages of `IpcSend` have been
replaced.
- Add an OpaqueSender impl for GenericSender
- Add a profiled version of GenericChannel. The profiling is 1:1 the
same as for the existing profiled IPC channel, namely that only the
blocked time during `recv` is measured.


Testing: No functional changes, covered by existing tests
Part of #38912

---------

Signed-off-by: Jonathan Schwender <schwenderjonathan@gmail.com>
This commit is contained in:
Jonathan Schwender 2025-08-27 03:58:43 +02:00 committed by GitHub
parent c4dcd17214
commit 32aba08be7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 113 additions and 27 deletions

View file

@ -0,0 +1,53 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
use base::generic_channel;
use serde::{Deserialize, Serialize};
use crate::time::{ProfilerCategory, ProfilerChan};
use crate::time_profile;
pub struct GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
receiver: generic_channel::GenericReceiver<T>,
time_profile_chan: ProfilerChan,
}
impl<T> GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
pub fn recv(&self) -> Result<T, generic_channel::ReceiveError> {
time_profile!(
ProfilerCategory::IpcReceiver,
None,
self.time_profile_chan.clone(),
move || self.receiver.recv(),
)
}
pub fn try_recv(&self) -> Result<T, generic_channel::TryReceiveError> {
self.receiver.try_recv()
}
pub fn into_inner(self) -> generic_channel::GenericReceiver<T> {
self.receiver
}
}
pub fn channel<T>(
time_profile_chan: ProfilerChan,
) -> Option<(generic_channel::GenericSender<T>, GenericReceiver<T>)>
where
T: for<'de> Deserialize<'de> + Serialize,
{
let (sender, receiver) = generic_channel::channel()?;
let profiled_receiver = GenericReceiver {
receiver,
time_profile_chan,
};
Some((sender, profiled_receiver))
}

View file

@ -8,6 +8,7 @@
#![deny(unsafe_code)]
pub mod generic_channel;
pub mod ipc;
pub mod mem;
pub mod time;

View file

@ -11,6 +11,7 @@ use std::collections::HashSet;
use std::ffi::c_void;
use std::marker::Send;
use base::generic_channel::GenericSender;
use crossbeam_channel::Sender;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
@ -49,6 +50,20 @@ where
}
}
impl<T> OpaqueSender<T> for GenericSender<T>
where
T: serde::Serialize,
{
fn send(&self, message: T) {
if let Err(e) = GenericSender::send(self, message) {
warn!(
"Error communicating with the target thread from the profiler: {}",
e
);
}
}
}
/// Front-end representation of the profiler used to communicate with the
/// profiler.
#[derive(Clone, Debug, Deserialize, Serialize)]