构建基于 Ray 的分布式 Jetpack Compose UI 测试矩阵及其 Cypress E2E 自测实践


当团队的 Jetpack Compose UI 测试集超过500个案例时,CI/CD流水线中的单机执行就成了一个无法忽视的瓶颈。原本期望在30分钟内完成的验证过程,被拉长到数小时,这直接影响了迭代速度和发布频率。零敲碎打的优化已无济于事,问题的核心在于缺乏一个能够水平扩展的并行测试执行能力。

最初的构想是构建一个分布式的 Android 设备(或模拟器)集群,将庞大的测试套件分片后,并行下发到各个节点执行,最后统一汇总报告。这套系统的核心是一个稳定、高效、易于管理的任务调度与执行后端。

技术选型决策:为何是 Ray?

在调度后端的选型上,我们评估过几种方案。基于 Kubernetes Job 的方案虽然功能强大,但对于我们这个纯粹的测试执行场景而言,显得过于笨重,YAML 配置的复杂性和集群维护成本较高。另一个选项是使用 Celery 配合 Redis/RabbitMQ,这是一个成熟的任务队列方案,但其对于执行节点(Worker)的状态管理和资源感知能力相对较弱,难以优雅地处理“一个模拟器在特定时间内只能执行一个UI测试”这类独占性资源调度。

最终我们选择了 Ray。Ray 是一个为大规模 AI 和 Python 应用设计的开源分布式计算框架。它胜出的关键原因有几点:

  1. 轻量级与语言亲和性: 我们的工具链主要基于 Python,Ray 提供了原生的 Python API,几乎没有学习曲线。与 K8s 相比,ray.init() 就能启动一个集群,极为轻便。
  2. Actor 模型: Ray 的 Actor 模型是管理状态化资源的完美抽象。我们可以将每一个 Android 模拟器封装成一个 Ray Actor,Actor 内部维护着模拟器的状态(如 IDLE, BUSY, ERROR),并处理与该模拟器相关的所有操作(安装 APK、执行测试、抓取日志)。这比无状态的任务队列更能精确地控制资源。
  3. 资源感知调度: Ray 允许在创建 Actor 或提交任务时指定资源需求(如 num_cpus, num_gpus,或自定义资源 {"emulator": 1})。这使得 Ray 调度器可以智能地将测试任务分配给拥有空闲模拟器资源的节点,实现了资源的自动负载均衡。

我们的整体架构也因此变得清晰:

graph TD
    subgraph CI/CD Pipeline
        A[Git Push] --> B{Build APKs};
        B --> C[Trigger Test Orchestrator];
    end

    subgraph Test Orchestration System
        C --> D[Python Orchestrator Script];
        D -- ray.init() --> E(Ray Head Node);
        D -- Test Sharding --> E;
        E -- Distributes Tasks --> F{Ray Worker Node 1};
        E -- Distributes Tasks --> G{Ray Worker Node 2};
        E -- Distributes Tasks --> H[...];
    end
    
    subgraph Ray Worker Node
        F --> F1(EmulatorManager Actor);
        G --> G1(EmulatorManager Actor);
        F1 -- ADB Commands --> F2[Android Emulator];
        G1 -- ADB Commands --> G2[Android Emulator];
    end

    subgraph Control & Monitoring
        I(Web Dashboard) -- REST API --> D;
        J(Cypress E2E Tests) -- Tests --> I;
    end
    
    E -- Aggregates Results --> D;
    D --> K[Final Test Report];
    C --> I;

这个架构中,Cypress 的角色至关重要。测试平台本身也是一个软件产品,它的稳定性直接关系到整个移动开发团队的效率。我们为这个平台的 Web Dashboard 编写了一套完整的 Cypress E2E 测试,以确保平台自身的迭代不会引入破坏性变更。这是一种“测试先行”的延伸——保障测试基础设施的可靠性。

核心实现:EmulatorManager Actor

管理模拟器的 Ray Actor 是整个系统的基石。它的职责是封装所有与单个模拟器交互的底层细节,并向上层调度器暴露简洁的接口。在真实项目中,一个常见的错误是把过多逻辑放在主调度脚本里,而 Actor 应该是一个高内聚的、自包含的组件。

import ray
import subprocess
import logging
import os
import time
import uuid

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

@ray.remote(num_cpus=1)  # 每个 Actor 占用一个 CPU 核心
class EmulatorManager:
    """
    一个 Ray Actor,负责管理一个独立的 Android 模拟器实例。
    它封装了 ADB 命令、测试执行、状态管理和日志收集的逻辑。
    """
    def __init__(self, device_id: str, workspace_root: str):
        self.device_id = device_id
        self.state = "IDLE"  # 状态: IDLE, BUSY, ERROR
        self.current_test = None
        self.logger = logging.getLogger(f"EmulatorManager-{self.device_id}")
        self.workspace = os.path.join(workspace_root, self.device_id)
        
        # 为每个 actor 创建独立的工作空间,避免日志和结果文件冲突
        os.makedirs(self.workspace, exist_ok=True)
        self.logger.info(f"Actor initialized for device {self.device_id}. Workspace: {self.workspace}")

    def _run_adb_command(self, cmd: list, timeout: int = 120) -> tuple[bool, str, str]:
        """
        执行 ADB 命令的健壮性封装,包含超时和错误处理。
        """
        base_cmd = ["adb", "-s", self.device_id] + cmd
        try:
            self.logger.info(f"Executing: {' '.join(base_cmd)}")
            process = subprocess.run(
                base_cmd,
                capture_output=True,
                text=True,
                timeout=timeout,
                check=False # 我们手动检查返回码
            )
            if process.returncode != 0:
                error_msg = f"ADB command failed with exit code {process.returncode}. Stderr: {process.stderr.strip()}"
                self.logger.error(error_msg)
                return False, "", process.stderr.strip()
                
            return True, process.stdout.strip(), process.stderr.strip()
        except subprocess.TimeoutExpired:
            error_msg = f"ADB command timed out after {timeout}s: {' '.join(base_cmd)}"
            self.logger.error(error_msg)
            return False, "", error_msg
        except Exception as e:
            self.logger.exception(f"An unexpected error occurred while running ADB command.")
            return False, "", str(e)

    def get_status(self) -> dict:
        """返回当前 Actor 的状态"""
        return {"device_id": self.device_id, "state": self.state, "current_test": self.current_test}

    def prepare_device(self, app_apk_path: str, test_apk_path: str) -> bool:
        """
        准备设备环境,包括清理、安装 APK。
        这是实际项目中非常重要的一步,确保测试环境的纯净。
        """
        self.logger.info("Preparing device for test run...")
        
        # 卸载旧包,忽略失败
        self._run_adb_command(["uninstall", "com.example.myapp"], timeout=60)
        self._run_adb_command(["uninstall", "com.example.myapp.test"], timeout=60)

        # 安装 App APK
        success, _, stderr = self._run_adb_command(["install", "-r", "-g", app_apk_path], timeout=180)
        if not success:
            self.logger.error(f"Failed to install app APK: {stderr}")
            self.state = "ERROR"
            return False

        # 安装 Test APK
        success, _, stderr = self._run_adb_command(["install", "-r", "-g", test_apk_path], timeout=180)
        if not success:
            self.logger.error(f"Failed to install test APK: {stderr}")
            self.state = "ERROR"
            return False
            
        return True

    def run_test_class(self, test_class: str) -> dict:
        """
        执行单个测试类并返回结果。
        """
        if self.state != "IDLE":
            return {"status": "REJECTED", "reason": f"Device is not IDLE. Current state: {self.state}"}

        self.state = "BUSY"
        self.current_test = test_class
        start_time = time.time()
        
        # 日志文件名使用 UUID 确保唯一性
        logcat_file = os.path.join(self.workspace, f"logcat_{test_class.split('.')[-1]}_{uuid.uuid4()}.log")
        
        # 启动 logcat 进程,在后台捕获日志
        logcat_proc = subprocess.Popen(
            ["adb", "-s", self.device_id, "logcat", "-c"],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        )
        logcat_proc.wait()
        logcat_proc = subprocess.Popen(
            ["adb", "-s", self.device_id, "logcat"],
            stdout=open(logcat_file, 'w'), stderr=subprocess.STDOUT
        )

        # 核心:执行 instrumentation 测试
        # -w: 等待执行完成
        # -e class com.example.TestClass: 指定要运行的测试类
        test_command = [
            "shell", "am", "instrument", "-w",
            "-e", "class", test_class,
            "com.example.myapp.test/androidx.test.runner.AndroidJUnitRunner"
        ]
        
        success, stdout, stderr = self._run_adb_command(test_command, timeout=600) # 10分钟超时

        # 停止 logcat 进程
        logcat_proc.terminate()
        try:
            logcat_proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            logcat_proc.kill()

        end_time = time.time()
        duration = end_time - start_time

        # 结果解析,生产环境需要更复杂的解析逻辑
        result_status = "PASSED"
        if not success or "FAILURES!!!" in stdout or "INSTRUMENTATION_CODE: -1" in stdout:
            result_status = "FAILED"
        
        self.state = "IDLE"
        self.current_test = None

        return {
            "device_id": self.device_id,
            "test_class": test_class,
            "status": result_status,
            "duration_seconds": round(duration, 2),
            "output": stdout,
            "errors": stderr,
            "logcat_path": logcat_file
        }

任务分发与结果聚合

主调度脚本负责将整个测试任务池分发给空闲的 Actor。这里的关键是使用 ray.wait() 来实现一个动态的、非阻塞的任务调度循环。它允许我们持续向下发新任务,只要有 Actor 完成了上一个任务并变为空闲状态。

import ray
import os
import time
from typing import List

# ... EmulatorManager Actor 定义 ...

def get_test_shards(test_plan_file: str) -> List[str]:
    """
    从文件中读取测试分片列表。
    在真实场景中,这可能是通过解析 Gradle 的输出来动态生成的。
    """
    with open(test_plan_file, 'r') as f:
        return [line.strip() for line in f if line.strip()]

def main():
    # 连接到已有的 Ray 集群,或在本地启动一个
    ray.init(address='auto', namespace="test_farm")

    # 发现可用的设备
    # 这里的设备发现逻辑可以更复杂,例如从一个配置文件或服务发现中获取
    available_devices = ["emulator-5554", "emulator-5556"] 
    workspace_root = "/tmp/android_test_workspace"
    
    app_apk_path = "/path/to/your/app.apk"
    test_apk_path = "/path/to/your/test.apk"
    test_plan_file = "test_shards.txt" # 每行一个测试类名

    # 创建 Actor 池
    actor_pool = [
        EmulatorManager.remote(device_id, workspace_root) for device_id in available_devices
    ]
    
    # 预先准备所有设备,安装 APK
    # 这一步并行执行,可以显著缩短整体准备时间
    prep_futures = [actor.prepare_device.remote(app_apk_path, test_apk_path) for actor in actor_pool]
    prep_results = ray.get(prep_futures)
    
    if not all(prep_results):
        print("FATAL: One or more devices failed to prepare. Aborting test run.")
        return

    test_classes = get_test_shards(test_plan_file)
    tasks_in_flight = {}
    results = []

    start_time = time.time()
    
    while test_classes or tasks_in_flight:
        # 寻找空闲的 Actor 来分配新任务
        for i, actor in enumerate(actor_pool):
            if actor not in [ray.get(f[1]) for f in tasks_in_flight.values()] and test_classes:
                test_class = test_classes.pop(0)
                future = actor.run_test_class.remote(test_class)
                tasks_in_flight[future] = (test_class, actor) # 记录 future 对应的任务和 actor
                print(f"Dispatched {test_class} to {ray.get(actor.get_status.remote())['device_id']}")

        if not tasks_in_flight:
            break

        # 非阻塞地等待任何一个正在运行的任务完成
        ready_futures, _ = ray.wait(list(tasks_in_flight.keys()), num_returns=1, timeout=300)
        
        if not ready_futures:
            print("Warning: No tasks completed in the last 5 minutes.")
            continue

        for future in ready_futures:
            result = ray.get(future)
            results.append(result)
            
            test_class, actor_handle = tasks_in_flight.pop(future)
            print(f"Completed: {test_class} on {result['device_id']} with status {result['status']}")
            
    total_time = time.time() - start_time
    print(f"\n--- Test Run Finished in {total_time:.2f} seconds ---")
    # ... 在这里处理和报告 `results` ...

    ray.shutdown()

if __name__ == "__main__":
    main()

这个调度逻辑的健壮性体现在 ray.wait() 的使用上。它避免了简单的循环查询 Actor 状态,而是由 Ray 的内部调度器高效地通知我们哪些任务已完成,从而可以立即为对应的 Actor 分配新任务,最大化了集群的利用率。

Cypress E2E:为测试平台保驾护航

我们的测试平台有一个基于 React 的 Web Dashboard,用于触发测试、实时查看进度和历史结果。这个 Dashboard 的可靠性至关重要。一个常见的错误是,工具链团队发布了一个新版本的 Dashboard,却意外地破坏了“开始测试”按钮的功能,导致所有移动团队当天无法运行回归测试。

为了杜绝此类问题,我们为 Dashboard 编写了 Cypress E2E 测试。这里的核心是使用 cy.intercept() 来 mock 后端 API,使得 Cypress 测试可以独立于真实的 Ray 集群运行,速度更快,也更稳定。

下面是一个典型的 Cypress 测试用例 (dashboard_flow.cy.js):

// cypress/e2e/dashboard_flow.cy.js

describe('Android Test Farm Dashboard E2E', () => {
    beforeEach(() => {
        // 在每个测试用例开始前,拦截关键的 API 请求
        // 模拟初始状态:有两个设备,都处于空闲状态
        cy.intercept('GET', '/api/v1/devices/status', {
            statusCode: 200,
            body: [
                { device_id: 'emulator-5554', state: 'IDLE', current_test: null },
                { device_id: 'emulator-5556', state: 'IDLE', current_test: null },
            ],
        }).as('getInitialStatus');

        // 模拟触发测试运行的 API
        cy.intercept('POST', '/api/v1/test-run', {
            statusCode: 202,
            body: { run_id: 'run-abc-123', message: 'Test run accepted' },
        }).as('startTestRun');

        cy.visit('/'); // 访问 Dashboard 首页
        cy.wait('@getInitialStatus');
    });

    it('should display initial device status and allow starting a new test run', () => {
        // 验证初始UI状态是否正确
        cy.get('[data-cy=device-card]').should('have.length', 2);
        cy.contains('[data-cy=device-status-emulator-5554]', 'IDLE').should('be.visible');
        cy.get('[data-cy=start-run-button]').should('not.be.disabled');

        // 模拟用户点击“开始测试”按钮
        cy.get('[data-cy=start-run-button]').click();

        // 验证 POST 请求是否已发送
        cy.wait('@startTestRun').its('request.body').should('deep.include', {
            branch: 'main', // 假设UI会发送一些参数
        });

        // 模拟测试进行中的状态更新
        cy.intercept('GET', '/api/v1/devices/status', {
            statusCode: 200,
            body: [
                { device_id: 'emulator-5554', state: 'BUSY', current_test: 'com.example.TestClass1' },
                { device_id: 'emulator-5556', state: 'IDLE', current_test: null },
            ],
        }).as('getBusyStatus');
        
        // 通常前端会有轮询,这里手动触发一次检查
        cy.wait('@getBusyStatus');
        
        // 验证UI是否更新为“BUSY”状态
        cy.contains('[data-cy=device-status-emulator-5554]', 'BUSY').should('be.visible');
        cy.contains('[data-cy=current-test-emulator-5554]', 'com.example.TestClass1').should('be.visible');
        
        // 模拟测试完成后的状态
         cy.intercept('GET', '/api/v1/devices/status', {
            statusCode: 200,
            body: [
                { device_id: 'emulator-5554', state: 'IDLE', current_test: null },
                { device_id: 'emulator-5556', state: 'IDLE', current_test: null },
            ],
        }).as('getFinalStatus');
    });
});

这段 Cypress 代码的核心价值在于,它不关心后端是 Ray 还是其他技术,它只关心 Dashboard 的行为是否符合 API 契约。这使得前端和后端的开发可以解耦,并且为 Dashboard 的每一次部署都提供了一个强大的质量门禁。

遗留问题与未来迭代

这套基于 Ray 的系统解决了我们最紧迫的并行化测试痛点,但它并非完美。在真实项目中,总会有下一阶段需要改进的地方。

  1. 动态模拟器供应: 目前,Ray Worker 节点上的模拟器是静态配置的。这意味着我们无法根据测试负载动态增减模拟器数量。未来的一个重要方向是集成 Docker 和 Kubernetes,将 Android 模拟器容器化。然后,可以开发一个 Kubernetes Operator 来管理这些模拟器 Pod,并让 Ray 集群动态地在这些 Pod 中运行测试。这将实现真正的弹性伸缩,但也会引入云原生环境下的网络和存储复杂性。

  2. 容错性与任务恢复: 当前的 EmulatorManager Actor 是有状态的。如果一个 Ray Worker 节点崩溃,该节点上正在运行的测试任务的状态就会丢失。虽然 Ray 提供了 Actor 故障恢复的机制,但我们的实现尚未充分利用。一个改进方向是将任务的关键状态(例如,哪个测试正在哪个设备上运行)持久化到外部存储(如 Redis),当一个 Actor 意外失败后,主调度器可以检测到,并尝试在另一个可用的 Actor 上重新调度该任务。

  3. 测试分片的智能化: 当前的测试分片是静态的,基于测试类的数量均分。但不同 UI 测试的执行时间差异巨大。这会导致“长尾效应”——大部分设备都已空闲,但整个测试运行仍在等待最后一个执行耗时最长任务的设备。下一步是收集每个测试类的历史执行时间数据,并基于这些数据实现一个动态的、基于时长的负载均衡分片算法,确保每个 Actor 的总工作负载尽可能接近。


  目录