diff --git a/ports/servoshell/egl/app_state.rs b/ports/servoshell/egl/app_state.rs index 0ea8495c425..0489948e24e 100644 --- a/ports/servoshell/egl/app_state.rs +++ b/ports/servoshell/egl/app_state.rs @@ -8,21 +8,26 @@ use std::rc::Rc; use crossbeam_channel::Receiver; use dpi::PhysicalSize; use embedder_traits::webdriver::WebDriverSenders; +use embedder_traits::{ + ContextMenuResult, InputMethodType, KeyboardEvent, MediaSessionActionType, MediaSessionEvent, + MouseButton, MouseButtonAction, ScreenGeometry, TouchEvent, TouchEventType, TouchId, + WebDriverJSResult, +}; +use euclid::{Point2D, Rect, Scale, Size2D, Vector2D}; +use keyboard_types::{CompositionEvent, CompositionState, Key, KeyState, NamedKey}; use log::{debug, error, info, warn}; use raw_window_handle::{RawWindowHandle, WindowHandle}; use servo::base::generic_channel::GenericSender; use servo::base::id::WebViewId; -use servo::euclid::{Point2D, Rect, Scale, Size2D, Vector2D}; +use servo::ipc_channel::ipc::IpcSender; use servo::servo_geometry::DeviceIndependentPixel; use servo::webrender_api::ScrollLocation; use servo::webrender_api::units::{DeviceIntRect, DeviceIntSize, DevicePixel}; use servo::{ - AllowOrDenyRequest, CompositionEvent, CompositionState, ContextMenuResult, ImeEvent, - InputEvent, InputMethodType, Key, KeyState, KeyboardEvent, LoadStatus, MediaSessionActionType, - MediaSessionEvent, MouseButton, MouseButtonAction, MouseButtonEvent, MouseMoveEvent, NamedKey, - NavigationRequest, PermissionRequest, RenderingContext, ScreenGeometry, Servo, ServoDelegate, - ServoError, SimpleDialog, TouchEvent, TouchEventType, TouchId, WebDriverCommandMsg, WebView, - WebViewBuilder, WebViewDelegate, WindowRenderingContext, + AllowOrDenyRequest, FocusId, ImeEvent, InputEvent, LoadStatus, MouseButtonEvent, + MouseMoveEvent, NavigationRequest, PermissionRequest, RenderingContext, Servo, ServoDelegate, + ServoError, SimpleDialog, TraversalId, WebDriverCommandMsg, WebDriverLoadStatus, + WebDriverScriptCommand, WebView, WebViewBuilder, WebViewDelegate, WindowRenderingContext, }; use url::Url; @@ -154,11 +159,22 @@ impl WebViewDelegate for RunningAppState { .on_url_changed(entries[current].clone().to_string()); } - fn notify_load_status_changed(&self, _webview: WebView, load_status: LoadStatus) { + fn notify_load_status_changed(&self, webview: WebView, load_status: LoadStatus) { self.callbacks .host_callbacks .notify_load_status_changed(load_status); + if load_status == LoadStatus::Complete { + if let Some(sender) = self + .webdriver_senders + .borrow_mut() + .load_status_senders + .remove(&webview.id()) + { + let _ = sender.send(WebDriverLoadStatus::Complete); + } + } + #[cfg(feature = "tracing")] if load_status == LoadStatus::Complete { #[cfg(feature = "tracing-hitrace")] @@ -202,6 +218,26 @@ impl WebViewDelegate for RunningAppState { } } + fn notify_focus_complete(&self, webview: servo::WebView, focus_id: FocusId) { + let mut webdriver_state = self.webdriver_senders.borrow_mut(); + if let std::collections::hash_map::Entry::Occupied(entry) = + webdriver_state.pending_focus.entry(focus_id) + { + let sender = entry.remove(); + let _ = sender.send(webview.focused()); + } + } + + fn notify_traversal_complete(&self, _webview: servo::WebView, traversal_id: TraversalId) { + let mut webdriver_state = self.webdriver_senders.borrow_mut(); + if let std::collections::hash_map::Entry::Occupied(entry) = + webdriver_state.pending_traversals.entry(traversal_id) + { + let sender = entry.remove(); + let _ = sender.send(WebDriverLoadStatus::Complete); + } + } + fn notify_media_session_event(&self, _webview: WebView, event: MediaSessionEvent) { match event { MediaSessionEvent::SetMetadata(metadata) => self @@ -348,7 +384,43 @@ impl RunningAppState { app_state } - pub(crate) fn create_and_focus_toplevel_webview(self: &Rc, url: Url) { + pub(crate) fn set_script_command_interrupt_sender( + &self, + sender: Option>, + ) { + self.webdriver_senders + .borrow_mut() + .script_evaluation_interrupt_sender = sender; + } + + pub(crate) fn set_pending_focus(&self, focus_id: FocusId, sender: IpcSender) { + self.webdriver_senders + .borrow_mut() + .pending_focus + .insert(focus_id, sender); + } + + pub(crate) fn set_pending_traversal( + &self, + traversal_id: TraversalId, + sender: GenericSender, + ) { + self.webdriver_senders + .borrow_mut() + .pending_traversals + .insert(traversal_id, sender); + } + + pub fn webviews(&self) -> Vec<(WebViewId, WebView)> { + let inner = self.inner(); + inner + .creation_order + .iter() + .map(|id| (*id, inner.webviews.get(id).unwrap().clone())) + .collect() + } + + pub(crate) fn create_and_focus_toplevel_webview(self: &Rc, url: Url) -> WebView { let webview = WebViewBuilder::new(&self.servo) .url(url) .hidpi_scale_factor(self.inner().hidpi_scale_factor) @@ -357,11 +429,18 @@ impl RunningAppState { webview.focus(); self.add(webview.clone()); + webview } pub(crate) fn add(&self, webview: WebView) { - self.inner_mut().creation_order.push(webview.id()); - self.inner_mut().webviews.insert(webview.id(), webview); + let webview_id = webview.id(); + self.inner_mut().creation_order.push(webview_id); + self.inner_mut().webviews.insert(webview_id, webview); + info!( + "Added webview with ID: {:?}, total webviews: {}", + webview_id, + self.inner().webviews.len() + ); } /// The focused webview will not be immediately valid via `active_webview()` @@ -381,6 +460,14 @@ impl RunningAppState { self.inner.borrow_mut() } + pub(crate) fn servo(&self) -> &Servo { + &self.servo + } + + pub(crate) fn webdriver_receiver(&self) -> Option<&Receiver> { + self.webdriver_receiver.as_ref() + } + fn get_browser_id(&self) -> Result { let webview_id = match self.inner().focused_webview_id { Some(id) => id, @@ -404,6 +491,27 @@ impl RunningAppState { .expect("Should always have an active WebView") } + fn handle_webdriver_script_command(&self, msg: &WebDriverScriptCommand) { + match msg { + WebDriverScriptCommand::ExecuteScript(_webview_id, response_sender) | + WebDriverScriptCommand::ExecuteAsyncScript(_webview_id, response_sender) => { + // Give embedder a chance to interrupt the script command. + // Webdriver only handles 1 script command at a time, so we can + // safely set a new interrupt sender and remove the previous one here. + self.set_script_command_interrupt_sender(Some(response_sender.clone())); + }, + WebDriverScriptCommand::AddLoadStatusSender(webview_id, load_status_sender) => { + self.set_load_status_sender(*webview_id, load_status_sender.clone()); + }, + WebDriverScriptCommand::RemoveLoadStatusSender(webview_id) => { + self.remove_load_status_sender(*webview_id); + }, + _ => { + self.set_script_command_interrupt_sender(None); + }, + } + } + /// Request shutdown. Will call on_shutdown_complete. pub fn request_shutdown(&self) { self.servo.start_shutting_down(); @@ -494,8 +602,8 @@ impl RunningAppState { } /// WebDriver message handling methods - pub(crate) fn webdriver_receiver(&self) -> Option<&Receiver> { - self.webdriver_receiver.as_ref() + pub fn webview_by_id(&self, id: WebViewId) -> Option { + self.inner().webviews.get(&id).cloned() } pub fn handle_webdriver_messages(self: &Rc) { @@ -503,25 +611,166 @@ impl RunningAppState { while let Ok(msg) = webdriver_receiver.try_recv() { match msg { WebDriverCommandMsg::LoadUrl(webview_id, url, load_status_sender) => { - info!( - "(Not Implemented) Loading URL in webview {}: {}", - webview_id, url - ); + info!("Loading URL in webview {}: {}", webview_id, url); + + if let Some(webview) = self.webview_by_id(webview_id) { + self.set_load_status_sender(webview_id, load_status_sender.clone()); + self.inner_mut().focused_webview_id = Some(webview_id); + webview.focus(); + let url_string = url.to_string(); + webview.load(url.into_url()); + info!( + "Successfully loaded URL {} in focused webview {}", + url_string, webview_id + ); + } else { + warn!("WebView {} not found for LoadUrl command", webview_id); + } }, WebDriverCommandMsg::NewWebView(response_sender, load_status_sender) => { - info!("(Not Implemented) Creating new webview"); + info!("Creating new webview via WebDriver"); + let new_webview = self + .create_and_focus_toplevel_webview(Url::parse("about:blank").unwrap()); + + if let Err(error) = response_sender.send(new_webview.id()) { + warn!("Failed to send response of NewWebview: {error}"); + } + if let Some(load_status_sender) = load_status_sender { + self.set_load_status_sender(new_webview.id(), load_status_sender); + } + }, + WebDriverCommandMsg::CloseWebView(webview_id, response_sender) => { + info!("(Not Implemented) Closing webview {}", webview_id); }, WebDriverCommandMsg::FocusWebView(webview_id, response_sender) => { - info!("(Not Implemented) Focusing webview {}", webview_id); + if self.inner().webviews.contains_key(&webview_id) { + if let Some(webview) = self.webview_by_id(webview_id) { + let focus_id = webview.focus(); + info!("Successfully focused webview {}", webview_id); + self.set_pending_focus(focus_id, response_sender.clone()); + } else { + warn!("Webview {} not found after cleanup", webview_id); + let _ = response_sender.send(false); + } + } else { + warn!("Webview {} not found for focusing", webview_id); + let _ = response_sender.send(false); + } + }, + WebDriverCommandMsg::IsWebViewOpen(webview_id, response_sender) => { + let context = self.webview_by_id(webview_id); + + if let Err(error) = response_sender.send(context.is_some()) { + warn!("Failed to send response of IsWebViewOpen: {error}"); + } + }, + WebDriverCommandMsg::IsBrowsingContextOpen(..) => { + self.servo().execute_webdriver_command(msg); + }, + WebDriverCommandMsg::GetFocusedWebView(response_sender) => { + let focused_id = self + .inner() + .focused_webview_id + .and_then(|id| self.inner().webviews.get(&id).cloned()); + + if let Err(error) = response_sender.send(focused_id.map(|w| w.id())) { + warn!("Failed to send response of GetFocusedWebView: {error}"); + } + }, + WebDriverCommandMsg::Refresh(webview_id, load_status_sender) => { + info!("Refreshing webview {}", webview_id); + if let Some(webview) = self.webview_by_id(webview_id) { + self.set_load_status_sender(webview_id, load_status_sender); + webview.reload(); + } else { + warn!("WebView {} not found for Refresh command", webview_id); + } + }, + WebDriverCommandMsg::GoBack(webview_id, load_status_sender) => { + info!("Going back in webview {}", webview_id); + if let Some(webview) = self.webview_by_id(webview_id) { + let traversal_id = webview.go_back(1); + self.set_pending_traversal(traversal_id, load_status_sender); + } else { + warn!("WebView {} not found for GoBack command", webview_id); + } + }, + WebDriverCommandMsg::GoForward(webview_id, load_status_sender) => { + info!("Going forward in webview {}", webview_id); + if let Some(webview) = self.webview_by_id(webview_id) { + let traversal_id = webview.go_forward(1); + self.set_pending_traversal(traversal_id, load_status_sender); + } else { + warn!("WebView {} not found for GoForward command", webview_id); + } + }, + WebDriverCommandMsg::GetAllWebViews(response_sender) => { + let webviews = self + .webviews() + .iter() + .map(|(id, _)| *id) + .collect::>(); + + if let Err(error) = response_sender.send(webviews) { + warn!("Failed to send response of GetAllWebViews: {error}"); + } + }, + WebDriverCommandMsg::ScriptCommand(_, ref webdriver_script_command) => { + info!("Handling ScriptCommand: {:?}", webdriver_script_command); + self.handle_webdriver_script_command(webdriver_script_command); + self.servo().execute_webdriver_command(msg); + }, + WebDriverCommandMsg::CurrentUserPrompt(webview_id, response_sender) => { + info!("Handling CurrentUserPrompt for webview {}", webview_id); + if let Err(error) = response_sender.send(None) { + warn!("Failed to send response of CurrentUserPrompt: {error}"); + }; + }, + WebDriverCommandMsg::HandleUserPrompt(webview_id, action, response_sender) => { + info!( + "Handling HandleUserPrompt for webview {} with action {:?}", + webview_id, action + ); + + if let Err(error) = response_sender.send(Err(())) { + warn!("Failed to send response of HandleUserPrompt: {error}"); + }; + }, + WebDriverCommandMsg::GetAlertText(webview_id, response_sender) => { + info!("Handling GetAlertText for webview {}", webview_id); + let _ = response_sender.send(Err(())); + }, + WebDriverCommandMsg::SendAlertText(webview_id, text) => { + info!( + "Handling SendAlertText for webview {} with text: {}", + webview_id, text + ); }, _ => { - info!("(Not Implemented) Received WebDriver command: {:?}", msg); + info!("Received WebDriver command: {:?}", msg); }, } } } } + pub(crate) fn set_load_status_sender( + &self, + webview_id: WebViewId, + sender: GenericSender, + ) { + self.webdriver_senders + .borrow_mut() + .load_status_senders + .insert(webview_id, sender); + } + + pub(crate) fn remove_load_status_sender(&self, webview_id: WebViewId) { + self.webdriver_senders + .borrow_mut() + .load_status_senders + .remove(&webview_id); + } /// Touch event: press down pub fn touch_down(&self, x: f32, y: f32, pointer_id: i32) { self.active_webview() diff --git a/python/servo/testing_commands.py b/python/servo/testing_commands.py index 216f429e326..339b99c1f02 100644 --- a/python/servo/testing_commands.py +++ b/python/servo/testing_commands.py @@ -21,6 +21,7 @@ import sys import textwrap from time import sleep from typing import Any +from pathlib import Path import tidy import wpt @@ -440,6 +441,37 @@ class MachCommands(CommandBase): return 1 return wpt.update.update_tests(**kwargs) + @Command("test-ohos-wpt", description="Run a single WPT test on OHOS device using WebDriver", category="testing") + @CommandArgument("--test", required=True, help="Path to WPT test (relative to tests/wpt/tests/)") + @CommandArgument("--webdriver-port", type=int, default=7000, help="WebDriver server port on OHOS device") + @CommandArgument("--wpt-server-port", type=int, default=8000, help="WPT server port on desktop") + @CommandArgument("--verbose", action="store_true", help="Enable verbose logging") + def test_ohos_wpt(self, **kwargs: Any) -> int: + """Run a single WPT test on OHOS device.""" + script_path = Path(__file__).parent.parent / "wpt" / "ohos_webdriver_test.py" + + cmd = [ + sys.executable, + str(script_path), + "--test", + kwargs["test"], + "--webdriver-port", + str(kwargs["webdriver_port"]), + "--wpt-server-port", + str(kwargs["wpt_server_port"]), + ] + + if kwargs.get("verbose"): + cmd.append("--verbose") + + print(f"Running OHOS WPT test: {kwargs['test']}") + print("Make sure:") + print("1. OHOS device is connected and running Servo with WebDriver enabled") + print("2. WPT server is running on desktop") + print("3. HDC is available in PATH") + + return subprocess.call(cmd) + @Command("test-jquery", description="Run the jQuery test suite", category="testing") @CommandBase.common_command_arguments(binary_selection=True) def test_jquery(self, servo_binary: str) -> int: diff --git a/python/wpt/ohos_test_parser.js b/python/wpt/ohos_test_parser.js new file mode 100644 index 00000000000..8e855476be9 --- /dev/null +++ b/python/wpt/ohos_test_parser.js @@ -0,0 +1,155 @@ +// OHOS WebDriver Test Result Parser +// Parses WPT (Web Platform Test) test results from the DOM and extracts info +// Executed in the browser context via WebDriver to analyze +// the test results displayed on the page after WPT tests complete. + +try { + var result = { + title: document.title, + readyState: document.readyState, + bodyText: document.body ? document.body.textContent : '' + }; + + var bodyText = result.bodyText || ''; + var titleText = result.title || ''; + + if (bodyText.includes('Harness status: OK')) { + // Look for test result patterns like "X Pass Y Fail" + var passMatch = bodyText.match(/(\d+)\s+Pass/i); + var failMatch = bodyText.match(/(\d+)\s+Fail/i); + + var passCount = passMatch ? parseInt(passMatch[1]) : 0; + var failCount = failMatch ? parseInt(failMatch[1]) : 0; + + result.passCount = passCount; + result.failCount = failCount; + result.failingTests = []; + + // Parse individual test results by splitting by "Fail" keyword + var testSections = bodyText.split('Fail'); + + for (var i = 1; i < testSections.length; i++) { + var section = testSections[i]; + if (!section || section.trim().length === 0) continue; + + // Find the end of this test section (next "Pass" or "Fail" or "Asserts run") + var endMarkers = ['Pass', 'Asserts run']; + var endIndex = section.length; + + for (var j = 0; j < endMarkers.length; j++) { + var markerIndex = section.indexOf(endMarkers[j]); + if (markerIndex !== -1 && markerIndex < endIndex) { + endIndex = markerIndex; + } + } + + var testContent = section.substring(0, endIndex).trim(); + if (!testContent) continue; + + // Error message patterns to split test name from error + var errorPatterns = [ + 'promise_test:', + 'assert_equals:', + 'assert_less_than:', + 'assert_greater_than:', + 'assert_true:', + 'assert_false:', + 'TypeError:', + 'ReferenceError:' + ]; + + var testName = ''; + var errorMessage = ''; + var splitIndex = -1; + + for (var k = 0; k < errorPatterns.length; k++) { + var patternIndex = testContent.indexOf(errorPatterns[k]); + if (patternIndex !== -1) { + if (splitIndex === -1 || patternIndex < splitIndex) { + splitIndex = patternIndex; + } + } + } + + if (splitIndex !== -1) { + testName = testContent.substring(0, splitIndex).trim(); + errorMessage = testContent.substring(splitIndex).trim(); + } else { + // No clear error pattern, use first line as test name and rest as error + var lines = testContent.split('\n'); + testName = lines[0] ? lines[0].trim() : ''; + errorMessage = lines.slice(1).join(' ').trim(); + } + + // Clean up test name + if (!testName || testName.length === 0) { + testName = 'Unnamed Test #' + result.failingTests.length; + } + + var isAssertionLine = false; + var isFilePathLine = false; + + // Check if it's an assertion line (starts with assert_ and has parentheses and file reference) + if (testName.indexOf('assert_') === 0 && + testName.indexOf('(') !== -1 && + testName.indexOf(')') !== -1 && + testName.indexOf('.html:') !== -1) { + isAssertionLine = true; + } + + // Check if it's a file path line (starts with /css/ or has only file reference) + if (testName.indexOf('/css/') === 0 || + (testName.indexOf('.html:') !== -1 && testName.length < 60 && testName.indexOf(' ') === -1)) { + isFilePathLine = true; + } + + // Additional check: if it looks like just an assertion call with file location + if (testName.indexOf('assert_') === 0 && + testName.indexOf('(') !== -1 && + testName.indexOf(')') !== -1 && + testName.indexOf(',') !== -1) { + isAssertionLine = true; + } + + if (errorMessage.length > 250) { + errorMessage = errorMessage.substring(0, 250) + '...'; + } + + // Only add if we have meaningful content, avoid assertion lines, and prevent duplicates + if (testName && errorMessage && !isAssertionLine && !isFilePathLine) { + var isDuplicate = false; + for (var m = 0; m < result.failingTests.length; m++) { + if (result.failingTests[m].name === testName) { + isDuplicate = true; + break; + } + } + + if (!isDuplicate) { + result.failingTests.push({ + name: testName, + error: errorMessage + }); + } + } + } + + if (failCount > 0) { + result.status = 'FAIL'; + } else if (passCount > 0) { + result.status = 'PASS'; + } else { + result.status = 'UNKNOWN'; + } + } else if (bodyText.includes('PASS') || titleText.includes('PASS')) { + result.status = 'PASS'; + } else if (bodyText.includes('FAIL') || titleText.includes('FAIL')) { + result.status = 'FAIL'; + } else { + result.status = 'UNKNOWN'; + } + + return result; +} catch (e) { + return {status: 'ERROR', title: document.title, error: e.message}; +} diff --git a/python/wpt/ohos_webdriver_test.py b/python/wpt/ohos_webdriver_test.py new file mode 100644 index 00000000000..49ec6a45722 --- /dev/null +++ b/python/wpt/ohos_webdriver_test.py @@ -0,0 +1,497 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +#!/usr/bin/env python3 + +import argparse +import json +import logging +import os +import subprocess +import time +import sys +import urllib.request +import urllib.error +from typing import Dict, Optional, Any + + +class OHOSWebDriverController: + """Controller for running WebDriver tests on OHOS devices using HTTP API.""" + + def __init__(self, webdriver_port: int = 7000, wpt_server_port: int = 8000) -> None: + self.webdriver_port = webdriver_port + self.wpt_server_port = wpt_server_port + self.session_id: Optional[str] = None + self.wpt_server_process: Optional[subprocess.Popen] = None + + def setup_wpt_server_access(self) -> bool: + """Set up access to WPT server for OHOS device.""" + try: + cmd = ["hdc", "rport", f"tcp:{self.wpt_server_port}", f"tcp:{self.wpt_server_port}"] + logging.info(f"Setting up HDC reverse port forwarding for WPT: {' '.join(cmd)}") + + subprocess.run(cmd, capture_output=True, text=True, timeout=10) + + logging.info(f"HDC reverse port forwarding established for WPT server on port {self.wpt_server_port}") + return True + + except FileNotFoundError: + logging.error("HDC command not found. Please install HDC and add it to PATH") + return False + except subprocess.TimeoutExpired: + logging.error("HDC reverse port forwarding command timed out") + return False + except Exception as e: + logging.error(f"Failed to set up WPT server access: {e}") + return False + + def setup_hdc_forwarding(self) -> bool: + """Set up HDC port forwarding for WebDriver communication.""" + try: + cmd = ["hdc", "fport", f"tcp:{self.webdriver_port}", f"tcp:{self.webdriver_port}"] + logging.info(f"Setting up HDC port forwarding: {' '.join(cmd)}") + + subprocess.run(cmd, capture_output=True, text=True, timeout=10) + + logging.info(f"HDC port forwarding established on port {self.webdriver_port}") + return True + + except FileNotFoundError: + logging.error("HDC command not found. Make sure OHOS SDK is installed and hdc is in PATH") + return False + except subprocess.TimeoutExpired: + logging.error("HDC port forwarding command timed out") + return False + except Exception as e: + logging.error(f"Failed to set up HDC forwarding: {e}") + return False + + def start_wpt_server(self) -> bool: + """Start the WPT server on desktop.""" + try: + # For now, assume WPT server is already running or started manually + # In a complete implementation, this would start the WPT server + logging.info(f"Assuming WPT server is running on port {self.wpt_server_port}") + return True + + except Exception as e: + logging.error(f"Failed to start WPT server: {e}") + return False + + def webdriver_request( + self, method: str, path: str, data: Optional[Dict] = None, timeout: Optional[int] = None + ) -> Dict[str, Any]: + """Make a WebDriver HTTP request.""" + url = f"http://127.0.0.1:{self.webdriver_port}{path}" + + headers = { + "Content-Type": "application/json", + "Host": f"127.0.0.1:{self.webdriver_port}", + } + request_data = json.dumps(data).encode("utf-8") if data else None + + request = urllib.request.Request(url, data=request_data, headers=headers, method=method) + + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + response_data = response.read().decode("utf-8") + return json.loads(response_data) if response_data else {} + except urllib.error.HTTPError as e: + error_response = e.read().decode("utf-8") if e.fp else "No response body" + logging.error(f"WebDriver HTTP error {e.code}: {error_response}, {path}") + new_error = urllib.error.HTTPError(e.url, e.code, e.msg, e.hdrs, None) + # Set the error_response as an attribute for later access + setattr(new_error, "error_response", error_response) + raise new_error + except Exception as e: + logging.error(f"WebDriver request failed: {method} {path} - {e}") + raise + + def delete_session(self, session_id: str) -> bool: + """Delete a WebDriver session.""" + try: + self.webdriver_request("DELETE", f"/session/{session_id}") + logging.info(f"Deleted WebDriver session: {session_id}") + return True + except Exception as e: + logging.error(f"Failed to delete session {session_id}: {e}") + return False + + def create_session(self) -> bool: + """Create a new WebDriver session.""" + try: + capabilities = {"capabilities": {"alwaysMatch": {"browserName": "servo"}}} + + logging.debug(f"Sending session request: {json.dumps(capabilities, indent=2)}") + response = self.webdriver_request("POST", "/session", capabilities) + logging.debug(f"Session response: {json.dumps(response, indent=2)}") + + self.session_id = response.get("value", {}).get("sessionId") + + if self.session_id: + logging.info(f"WebDriver session created: {self.session_id}") + return True + else: + logging.error("Failed to create WebDriver session") + return False + + except urllib.error.HTTPError as e: + error_response = getattr(e, "error_response", "No error response available") + + logging.debug(f"HTTP error during session creation: {e.code} - {error_response}") + + if "session not created" in error_response: + raise RuntimeError(f"Session not created. Please restart the WebDriver server: {error_response}") + else: + raise + except Exception as e: + logging.error(f"Failed to create WebDriver session: {e}") + raise + + def create_window(self) -> bool: + """Create a new window/webview if needed.""" + try: + if not self.session_id: + raise RuntimeError("No WebDriver session") + + try: + handles_response = self.webdriver_request("GET", f"/session/{self.session_id}/window/handles") + handles = handles_response.get("value", []) + + if handles: + logging.info(f"Found existing windows: {handles}") + # Focus on the first window + self.webdriver_request("POST", f"/session/{self.session_id}/window", {"handle": handles[0]}) + return True + except Exception as e: + logging.debug(f"Could not get window handles: {e}") + + # Try to explicitly create a new window + try: + logging.info("Attempting to create new window via WebDriver") + new_window_response = self.webdriver_request( + "POST", f"/session/{self.session_id}/window/new", {"type": "tab"} + ) + if new_window_response: + logging.info(f"Created new window: {new_window_response}") + return True + except Exception as e: + logging.debug(f"New window creation failed: {e}") + + logging.info("No existing windows found, assuming window will be created on navigation") + return True + + except Exception as e: + logging.error(f"Failed to create window: {e}") + return False + + def navigate_to_url(self, url: str, timeout: int = 10) -> bool: + """Navigate to a URL with OHOS-specific handling.""" + if not self.session_id: + raise RuntimeError("No WebDriver session") + + logging.info(f"Attempting to navigate to: {url}") + data = {"url": url} + + try: + navigation_success = self.webdriver_request( + "POST", f"/session/{self.session_id}/url", data, timeout=timeout + ) + logging.info(f"Navigation completed successfully: {navigation_success}") + return True + except Exception as nav_error: + logging.debug(f"Navigation request failed: {nav_error}") + return False + + def run_test(self, test_path: str) -> Dict[str, Any]: + """Run a single WPT test.""" + try: + if not self.create_session(): + return { + "status": "ERROR", + "title": "", + "details": "Failed to create WebDriver session", + "passCount": 0, + "failCount": 0, + "failingTests": [], + } + + if not self.create_window(): + return { + "status": "ERROR", + "title": "", + "details": "Failed to create window", + "passCount": 0, + "failCount": 0, + "failingTests": [], + } + + test_url = f"http://localhost:{self.wpt_server_port}/{test_path}" + + logging.info(f"Navigating URL: {test_url}") + + navigation_result = self.navigate_to_url(test_url, timeout=5) + + if navigation_result: + logging.info("Navigation completed, proceeding to test completion check") + else: + logging.warning("Navigation may have failed, but continuing with test completion check") + + return self.wait_for_test_completion_ohos() + + except Exception as e: + logging.error(f"Error running test: {e}") + return { + "status": "ERROR", + "title": "", + "details": str(e), + "passCount": 0, + "failCount": 0, + "failingTests": [], + } + + def wait_for_test_completion_ohos(self, timeout: int = 30) -> Dict[str, Any]: + """OHOS test completion handling""" + try: + logging.info("OHOS test completion handling...") + + logging.info("Waiting for page to load and test to complete...") + for i in range(6): + time.sleep(5) + logging.info(f"Waiting... ({(i + 1) * 5}/{timeout}s)") + + try: + script_path = os.path.join(os.path.dirname(__file__), "ohos_test_parser.js") + with open(script_path, "r", encoding="utf-8") as f: + script = f.read() + + script_data = {"script": script, "args": []} + script_response = self.webdriver_request( + "POST", f"/session/{self.session_id}/execute/sync", script_data, timeout=2 + ) + result = script_response.get("value", {}) + + if result.get("status") in ["PASS", "FAIL"]: + return { + "status": result.get("status"), + "title": result.get("title", ""), + "details": result.get("bodyText", "")[:200] + "..." + if len(result.get("bodyText", "")) > 200 + else result.get("bodyText", ""), + "passCount": result.get("passCount", 0), + "failCount": result.get("failCount", 0), + "failingTests": result.get("failingTests", []), + } + else: + logging.info( + f"Test still running, status: {result.get('status')}, body preview: {result.get('bodyText', '')[:100]}..." + ) + except Exception as api_error: + logging.debug(f"API request failed: {api_error}") + + # If we get here, either test timed out or API is completely unresponsive + logging.warning("WebDriver API appears to be unresponsive - this is a known OHOS limitation") + + # Take screenshot for debugging + screenshot_path = f"test_output/servo_ohos_screenshot_{int(time.time())}.jpeg" + self.take_screenshot(screenshot_path) + + return { + "status": "INDETERMINATE", + "title": "OHOS WebDriver Limitation", + "details": ( + "Test was successfully loaded on OHOS device, but WebDriver API became " + "unresponsive. Please check the test result manually on the device screen," + "or refer to the screenshot at of Desktop at: " + screenshot_path + ), + "passCount": 0, + "failCount": 0, + "failingTests": [], + } + + except Exception as e: + logging.error(f"Error in OHOS test completion handling: {e}") + + # Take screenshot for debugging on error + screenshot_path = f"test_output/servo_ohos_error_screenshot_{int(time.time())}.jpeg" + self.take_screenshot(screenshot_path) + + return { + "status": "ERROR", + "title": "", + "details": str(e), + "passCount": 0, + "failCount": 0, + "failingTests": [], + } + + def take_screenshot(self, output_path: str) -> bool: + """Take a screenshot from OHOS device for debugging.""" + try: + output_dir = os.path.dirname(output_path) + os.makedirs(output_dir, exist_ok=True) + snapshot_cmd = ["hdc", "shell", "snapshot_display", "-f", "/data/local/tmp/servo.jpeg"] + result = subprocess.run(snapshot_cmd, capture_output=True, text=True, timeout=10) + + if "fail" in result.stdout.lower() or "error" in result.stdout.lower(): + logging.warning(f"Screenshot capture failed: {result.stdout.strip()}") + return False + + recv_cmd = ["hdc", "file", "recv", "/data/local/tmp/servo.jpeg", output_path] + result = subprocess.run(recv_cmd, capture_output=True, text=True, timeout=10) + + if "fail" in result.stdout.lower() or "error" in result.stdout.lower(): + logging.warning(f"Screenshot transfer failed: {result.stdout.strip()}") + return False + + logging.info(f"Screenshot saved to: {output_path}") + return True + + except Exception as e: + logging.warning(f"Failed to take screenshot: {e}") + return False + + def cleanup(self) -> None: + """Clean up resources.""" + if self.session_id: + try: + self.webdriver_request("DELETE", f"/session/{self.session_id}") + except Exception: + pass + self.session_id = None + + if self.wpt_server_process: + try: + self.wpt_server_process.terminate() + self.wpt_server_process.wait(timeout=5) + except Exception: + try: + self.wpt_server_process.kill() + except Exception: + pass + self.wpt_server_process = None + + +def main() -> int: + parser = argparse.ArgumentParser(description="Run a single WPT test on OHOS device") + parser.add_argument("--test", required=True, help="Path to WPT test (relative to tests/wpt/tests/)") + parser.add_argument("--webdriver-port", type=int, default=7000, help="WebDriver server port") + parser.add_argument("--wpt-server-port", type=int, default=8000, help="WPT server port") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s") + + controller = OHOSWebDriverController(args.webdriver_port, args.wpt_server_port) + + try: + logging.info("Killing any existing servo instances and starting fresh...") + + try: + subprocess.run(["hdc", "shell", "killall org.servo.servo"], capture_output=True, text=True, timeout=10) + logging.info("Killed existing servo processes") + except Exception as e: + logging.debug(f"killall command failed (may be expected): {e}") + + try: + subprocess.run( + ["hdc", "shell", "aa force-stop org.servo.servo"], capture_output=True, text=True, timeout=10 + ) + logging.info("Force stopped servo application") + except Exception as e: + logging.debug(f"force-stop command failed (may be expected): {e}") + + try: + subprocess.run( + ["hdc", "shell", "aa start -a EntryAbility -b org.servo.servo"], + capture_output=True, + text=True, + timeout=15, + ) + logging.info("Started servo application") + time.sleep(3) + except Exception as e: + logging.error(f"Failed to start servo application: {e}") + return 1 + + logging.info("Setting up test infrastructure...") + + if not controller.setup_hdc_forwarding(): + logging.error("Failed to set up HDC forwarding") + return 1 + + controller.setup_wpt_server_access() + + if not controller.start_wpt_server(): + logging.error("Failed to start WPT server") + return 1 + + logging.info(f"Running test: {args.test}") + result = controller.run_test(args.test) + + print("\nTest Results:") + print("=" * 50) + print(f"Status: {result['status']}") + print(f"Title: {result['title']}") + + if "passCount" in result and "failCount" in result: + total_tests = result["passCount"] + result["failCount"] + print(f"Total Tests: {total_tests}") + print(f"Passed: {result['passCount']}") + print(f"Failed: {result['failCount']}") + + if result["failCount"] > 0 and "failingTests" in result and result["failingTests"]: + print(f"\nFailing Tests ({len(result['failingTests'])} extracted):") + print("-" * 50) + actual_count = 0 + for i, failing_test in enumerate(result["failingTests"], 1): + if isinstance(failing_test, dict): + test_name = failing_test.get("name", "Unknown") + error_msg = failing_test.get("error", "No error message") + else: + test_name = str(failing_test) + error_msg = "No error message" + + actual_count += 1 + print(f"{actual_count}. Test: {test_name}") + print(f" Error: {error_msg}") + print() + + if actual_count >= result["failCount"]: + break + + return 0 if result["status"] == "PASS" else 1 + + except KeyboardInterrupt: + logging.info("Test interrupted by user") + return 1 + except Exception as e: + logging.error(f"Unexpected error: {e}") + return 1 + finally: + controller.cleanup() + + logging.info("Cleaning up servo instances...") + try: + subprocess.run(["hdc", "shell", "killall org.servo.servo"], capture_output=True, text=True, timeout=10) + logging.info("Killed servo processes") + except Exception as e: + logging.debug(f"killall command failed during cleanup: {e}") + + try: + subprocess.run( + ["hdc", "shell", "aa force-stop org.servo.servo"], capture_output=True, text=True, timeout=10 + ) + logging.info("Force stopped servo application") + except Exception as e: + logging.debug(f"force-stop command failed during cleanup: {e}") + + # This should never be reached + return 1 + + +if __name__ == "__main__": + sys.exit(main())