Auto merge of #24664 - gterzian:fix_port_transfer, r=jdm

Fix loophole in messageport transfer

<!-- Please describe your changes on the following line: -->

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #24600 (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
This commit is contained in:
bors-servo 2019-11-18 11:35:25 -05:00 committed by GitHub
commit 0d2c2045cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 468 additions and 52 deletions

View file

@ -184,6 +184,18 @@ enum TransferState {
/// The port is currently in-transfer,
/// and incoming tasks should be buffered until it becomes managed again.
TransferInProgress(VecDeque<PortMessageTask>),
/// A global has requested the transfer to be completed,
/// it's pending a confirmation of either failure or success to complete the transfer.
CompletionInProgress(MessagePortRouterId),
/// While a completion of a transfer was in progress, the port was shipped,
/// hence the transfer failed to complete.
/// We start buffering incoming messages,
/// while awaiting the return of the previous buffer from the global
/// that failed to complete the transfer.
CompletionFailed(VecDeque<PortMessageTask>),
/// While a completion failed, another global requested to complete the transfer.
/// We are still buffering messages, and awaiting the return of the buffer from the global who failed.
CompletionRequested(MessagePortRouterId, VecDeque<PortMessageTask>),
/// The entangled port has been removed while the port was in-transfer,
/// the current port should be removed as well once it is managed again.
EntangledRemoved,
@ -1549,6 +1561,13 @@ where
};
match content {
FromScriptMsg::CompleteMessagePortTransfer(router_id, ports) => {
self.handle_complete_message_port_transfer(router_id, ports);
},
FromScriptMsg::MessagePortTransferResult(router_id, succeeded, failed) => {
self.handle_message_port_transfer_completed(router_id, succeeded);
self.handle_message_port_transfer_failed(failed);
},
FromScriptMsg::RerouteMessagePort(port_id, task) => {
self.handle_reroute_messageport(port_id, task);
},
@ -1775,6 +1794,224 @@ where
}
}
fn handle_message_port_transfer_completed(
&mut self,
router_id: Option<MessagePortRouterId>,
ports: Vec<MessagePortId>,
) {
let router_id = match router_id {
Some(router_id) => router_id,
None => {
if !ports.is_empty() {
warn!("Constellation unable to process port transfer successes, since no router id was received");
}
return;
},
};
for port_id in ports.into_iter() {
let mut entry = match self.message_ports.entry(port_id) {
Entry::Vacant(_) => {
warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
);
continue;
},
Entry::Occupied(entry) => entry,
};
match entry.get().state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
continue;
},
TransferState::CompletionInProgress(expected_router_id) => {
// Here, the transfer was normally completed.
if expected_router_id != router_id {
return warn!(
"Transfer completed by an unexpected router: {:?}",
router_id
);
}
// Update the state to managed.
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with,
};
entry.insert(new_info);
},
_ => warn!("Constellation received unexpected port transfer completed message"),
}
}
}
fn handle_message_port_transfer_failed(
&mut self,
ports: HashMap<MessagePortId, VecDeque<PortMessageTask>>,
) {
for (port_id, mut previous_buffer) in ports.into_iter() {
let entry = match self.message_ports.remove(&port_id) {
None => {
warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
);
continue;
},
Some(entry) => entry,
};
let new_info = match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// just drop it.
continue;
},
TransferState::CompletionFailed(mut current_buffer) => {
// The transfer failed,
// and now the global has returned us the buffer we previously sent.
// So the next update is back to a "normal" transfer in progress.
// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Update the state to transfer-in-progress.
MessagePortInfo {
state: TransferState::TransferInProgress(current_buffer),
entangled_with: entry.entangled_with,
}
},
TransferState::CompletionRequested(target_router_id, mut current_buffer) => {
// Here, before the global who failed the last transfer could return us the buffer,
// another global already sent us a request to complete a new transfer.
// So we use the returned buffer to update
// the current-buffer(of new incoming messages),
// and we send everything to the global
// who is waiting for completion of the current transfer.
// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Forward the buffered message-queue to complete the current transfer.
if let Some(sender) = self.message_port_routers.get(&target_router_id) {
if sender
.send(MessagePortMsg::CompletePendingTransfer(
port_id,
current_buffer,
))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", target_router_id);
}
// Update the state to completion-in-progress.
MessagePortInfo {
state: TransferState::CompletionInProgress(target_router_id),
entangled_with: entry.entangled_with,
}
},
_ => {
warn!("Unexpected port transfer failed message received");
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
}
fn handle_complete_message_port_transfer(
&mut self,
router_id: MessagePortRouterId,
ports: Vec<MessagePortId>,
) {
let mut response = HashMap::new();
for port_id in ports.into_iter() {
let entry = match self.message_ports.remove(&port_id) {
None => {
warn!(
"Constellation asked to complete transfer for unknown messageport {:?}",
port_id
);
continue;
},
Some(entry) => entry,
};
let new_info = match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
continue;
},
TransferState::TransferInProgress(buffer) => {
response.insert(port_id, buffer);
// If the port was in transfer, and a global is requesting completion,
// we note the start of the completion.
MessagePortInfo {
state: TransferState::CompletionInProgress(router_id),
entangled_with: entry.entangled_with,
}
},
TransferState::CompletionFailed(buffer) |
TransferState::CompletionRequested(_, buffer) => {
// If the completion had already failed,
// this is a request coming from a global to complete a new transfer,
// but we're still awaiting the return of the buffer
// from the first global who failed.
//
// So we note the request from the new global,
// and continue to buffer incoming messages
// and wait for the buffer used in the previous transfer to be returned.
//
// If another global requests completion in the CompletionRequested state,
// we simply swap the target router-id for the new one,
// keeping the buffer.
MessagePortInfo {
state: TransferState::CompletionRequested(router_id, buffer),
entangled_with: entry.entangled_with,
}
},
_ => {
warn!("Unexpected complete port transfer message received");
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
if !response.is_empty() {
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
if sender
.send(MessagePortMsg::CompleteTransfer(response))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", router_id);
}
}
}
fn handle_reroute_messageport(&mut self, port_id: MessagePortId, task: PortMessageTask) {
let info = match self.message_ports.get_mut(&port_id) {
Some(info) => info,
@ -1786,7 +2023,10 @@ where
},
};
match &mut info.state {
TransferState::Managed(router_id) => {
TransferState::Managed(router_id) | TransferState::CompletionInProgress(router_id) => {
// In both the managed and completion of a transfer case, we forward the message.
// Note that in both cases, if the port is transferred before the message is handled,
// it will be sent back here and buffered while the transfer is ongoing.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::NewTask(port_id, task));
} else {
@ -1794,6 +2034,8 @@ where
}
},
TransferState::TransferInProgress(queue) => queue.push_back(task),
TransferState::CompletionFailed(queue) => queue.push_back(task),
TransferState::CompletionRequested(_, queue) => queue.push_back(task),
TransferState::EntangledRemoved => warn!(
"Messageport received a message, but entangled has alread been removed {:?}",
port_id
@ -1803,8 +2045,19 @@ where
fn handle_messageport_shipped(&mut self, port_id: MessagePortId) {
if let Some(info) = self.message_ports.get_mut(&port_id) {
if let TransferState::Managed(_) = info.state {
info.state = TransferState::TransferInProgress(VecDeque::new());
match info.state {
TransferState::Managed(_) => {
// If shipped while managed, note the start of a transfer.
info.state = TransferState::TransferInProgress(VecDeque::new());
},
TransferState::CompletionInProgress(_) => {
// If shipped while completion of a transfer was in progress,
// the completion failed.
// This will be followed by a MessagePortTransferFailed message,
// containing the buffer we previously sent.
info.state = TransferState::CompletionFailed(VecDeque::new());
},
_ => warn!("Unexpected messageport shipped received"),
}
} else {
warn!(
@ -1828,37 +2081,11 @@ where
fn handle_new_messageport(&mut self, router_id: MessagePortRouterId, port_id: MessagePortId) {
match self.message_ports.entry(port_id) {
// If we know about this port, it means it was transferred.
Entry::Occupied(mut entry) => {
if let TransferState::EntangledRemoved = entry.get().state {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
return;
}
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
let old_info = entry.insert(new_info);
let buffer = match old_info.state {
TransferState::TransferInProgress(buffer) => buffer,
_ => {
return warn!("Completing transfer of a port that did not have a transfer in progress.");
},
};
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::CompleteTransfer(port_id.clone(), buffer));
} else {
warn!("No message-port sender for {:?}", router_id);
}
},
// If it's a new port, we should not know about it.
Entry::Occupied(_) => warn!(
"Constellation asked to start tracking an existing messageport {:?}",
port_id
),
Entry::Vacant(entry) => {
let info = MessagePortInfo {
state: TransferState::Managed(router_id),
@ -1897,7 +2124,10 @@ where
"Constellation asked to remove entangled messageport by a port that was already removed {:?}",
port_id
),
TransferState::TransferInProgress(_) => {
TransferState::TransferInProgress(_) |
TransferState::CompletionInProgress(_) |
TransferState::CompletionFailed(_) |
TransferState::CompletionRequested(_, _) => {
// Note: since the port is in-transer, we don't have a router to send it a message
// to let it know that its entangled port has been removed.
// Hence we mark it so that it will be messaged and removed once the transfer completes.