使用 Rust 与 Paxos 实现 Envoy 的分布式动态配置存储


管理一组 Envoy 实例的动态配置,尤其是在需要强一致性状态的场景下,是一个棘手的挑战。例如,为一个自定义的鉴权 Filter 动态更新全局唯一的 Nonce 列表,或者为一个全局限流 Filter 同步集群范围内的令牌桶状态。使用 Redis 或类似方案会引入单点故障,或在主从切换时面临数据一致性的风险。我们需要的,是一个内嵌在配置服务中的、去中心化的、容错的一致性存储。

问题的核心归结为分布式共识。我们的初步构想是构建一个轻量级的 gRPC 服务,它既是 Envoy 的 xDS 服务器,也通过节点间通信维护一个高可用的状态机。这自然地引向了 Paxos 算法。尽管 Raft 在工程实现上更受欢迎,但从头实现一个经典的 Paxos 不仅能提供所需的容错能力,更能迫使我们深入理解分布式系统中最核心的协议。

技术选型上,Rust 是构建这类基础设施组件的不二之选。它的内存安全和并发模型能从根本上消除 целый класс 的运行时错误,而其高性能的异步生态(Tokio)和 gRPC 框架(Tonic)则为实现网络服务提供了坚实的基础。

整个系统由三个主要部分组成:

  1. Paxos 共识核心 (Rust): 一个实现了 Multi-Paxos 协议的库,负责在多个节点间同步状态。
  2. xDS 配置服务 (Rust/gRPC): 封装 Paxos 核心,对外提供 gRPC 接口。其中一个接口遵循 Envoy ECDS (Extension Config Discovery Service) 规范,用于向 Envoy 推送动态配置;另一个接口则用于接收来自管理端的配置更新请求。
  3. 管理界面 (PWA): 一个简单的 PWA 应用,允许运维人员查看当前集群状态并提交新的配置。

第一步:Rust 实现 Paxos 核心状态机

我们不追求实现一个完备的 Paxos 库,而是聚焦于核心的 Proposer 和 Acceptor 逻辑。我们将实现一个简化的 Multi-Paxos,其中一个节点被隐式地认为是 Leader,以简化提议流程,但在真实项目中,这需要一个完整的 Leader 选举机制。

我们的状态将是一个简单的 String,代表要推送给 Envoy Filter 的配置。

首先,定义 Cargo.toml 的关键依赖:

[package]
name = "paxos-config-store"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = "0.10"
prost = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"

接下来是 Paxos 协议中的核心数据结构。我们将通过 gRPC 在节点间传递这些消息。

paxos.proto 定义:

syntax = "proto3";

package paxos;

// 提议,由 Proposer 发出
message PrepareRequest {
    uint64 proposal_id = 1; // 提议编号
}

// 承诺,由 Acceptor 回复
message PromiseResponse {
    bool promised = 1;
    uint64 last_accepted_id = 2;
    string last_accepted_value = 3;
}

// 接受请求,由 Proposer 在收到足够承诺后发出
message AcceptRequest {
    uint64 proposal_id = 1;
    string value = 2;
}

// 已接受,由 Acceptor 回复
message AcceptedResponse {
    bool accepted = 1;
}

service PaxosNode {
    rpc Prepare(PrepareRequest) returns (PromiseResponse);
    rpc Accept(AcceptRequest) returns (AcceptedResponse);
}

现在,我们在 Rust 中实现 Acceptor 的逻辑。Acceptor 是 Paxos 中的“投票者”,它需要持久化一些状态。

// src/paxos/acceptor.rs

use std::sync::{Arc, Mutex};
use tonic::{Request, Response, Status};
use crate::paxos_proto::{
    paxos_node_server::PaxosNode, PrepareRequest, PromiseResponse, AcceptRequest, AcceptedResponse,
};

// Acceptor 需要持久化的状态
#[derive(Debug, Clone, Default)]
pub struct AcceptorState {
    // 承诺过的最高提议ID
    promised_id: u64,
    // 已接受的最高提议ID
    accepted_id: u64,
    // 已接受的提议值
    accepted_value: String,
}

// Acceptor 服务实现
#[derive(Debug)]
pub struct PaxosAcceptor {
    // 使用 Mutex 保护状态,因为 gRPC 服务是多线程的
    state: Arc<Mutex<AcceptorState>>,
    // 节点ID,用于日志
    node_id: String,
}

impl PaxosAcceptor {
    pub fn new(node_id: String) -> Self {
        Self {
            state: Arc::new(Mutex::new(AcceptorState::default())),
            node_id,
        }
    }
}

#[tonic::async_trait]
impl PaxosNode for PaxosAcceptor {
    // 处理 Prepare 请求
    async fn prepare(
        &self,
        request: Request<PrepareRequest>,
    ) -> Result<Response<PromiseResponse>, Status> {
        let proposal_id = request.into_inner().proposal_id;
        tracing::info!("[{}] Received Prepare request with ID: {}", self.node_id, proposal_id);

        let mut state = self.state.lock().unwrap();

        if proposal_id > state.promised_id {
            // 如果提议ID大于之前承诺过的ID,则更新承诺ID,并返回承诺
            state.promised_id = proposal_id;
            tracing::info!("[{}] Promising for ID: {}", self.node_id, proposal_id);

            Ok(Response::new(PromiseResponse {
                promised: true,
                last_accepted_id: state.accepted_id,
                last_accepted_value: state.accepted_value.clone(),
            }))
        } else {
            // 否则,拒绝承诺
            tracing::warn!("[{}] Rejecting Prepare for ID: {} (already promised {})", self.node_id, proposal_id, state.promised_id);
            Ok(Response::new(PromiseResponse {
                promised: false,
                last_accepted_id: state.accepted_id,
                last_accepted_value: state.accepted_value.clone(),
            }))
        }
    }

    // 处理 Accept 请求
    async fn accept(
        &self,
        request: Request<AcceptRequest>,
    ) -> Result<Response<AcceptedResponse>, Status> {
        let req = request.into_inner();
        let proposal_id = req.proposal_id;
        let value = req.value;

        tracing::info!("[{}] Received Accept request for ID: {} with value: '{}'", self.node_id, proposal_id, value);

        let mut state = self.state.lock().unwrap();

        // 只有当提议ID大于等于已承诺的ID时,才接受
        if proposal_id >= state.promised_id {
            state.promised_id = proposal_id;
            state.accepted_id = proposal_id;
            state.accepted_value = value;
            tracing::info!("[{}] Accepted value for ID: {}", self.node_id, proposal_id);

            // 在真实项目中,这里应该将状态持久化到磁盘
            // e.g., fs.write("paxos_state.json", serde_json::to_string(&*state).unwrap())

            Ok(Response::new(AcceptedResponse { accepted: true }))
        } else {
            tracing::warn!("[{}] Rejecting Accept for ID: {} (already promised {})", self.node_id, proposal_id, state.promised_id);
            Ok(Response::new(AcceptedResponse { accepted: false }))
        }
    }
}

Proposer 的逻辑则更为复杂,它负责发起提议并驱动整个共识流程。

// src/paxos/proposer.rs

use crate::paxos_proto::paxos_node_client::PaxosNodeClient;
use crate::paxos_proto::{PrepareRequest, AcceptRequest};
use tonic::transport::Channel;
use std::time::{SystemTime, UNIX_EPOCH};

pub struct Proposer {
    // 集群中其他节点的客户端连接
    peers: Vec<PaxosNodeClient<Channel>>,
    // 集群总节点数
    quorum_size: usize,
    // 提议者自己的ID,用于生成唯一的提议ID
    node_id: u64,
}

impl Proposer {
    pub fn new(peers: Vec<PaxosNodeClient<Channel>>, node_id: u64) -> Self {
        let total_nodes = peers.len() + 1;
        Self {
            peers,
            quorum_size: total_nodes / 2 + 1,
            node_id,
        }
    }

    // 发起一次提议
    pub async fn propose(&mut self, value: String) -> Result<String, String> {
        // 1. Prepare 阶段
        // 生成一个全局唯一的、单调递增的提议ID
        // 简单实现:时间戳 + 节点ID
        let proposal_id = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_millis() as u64;
        let unique_proposal_id = (proposal_id << 16) | self.node_id;

        tracing::info!("Starting proposal {} with value '{}'", unique_proposal_id, value);

        let mut promises = 0;
        let mut highest_accepted_id = 0;
        let mut value_to_propose = value.clone();

        let mut prepare_futures = Vec::new();
        for peer in &mut self.peers {
            let request = tonic::Request::new(PrepareRequest {
                proposal_id: unique_proposal_id,
            });
            // 并行发送 Prepare 请求
            prepare_futures.push(peer.prepare(request));
        }
        
        // 加上自己本地的承诺
        promises += 1; // Assuming self-promise is always successful for simplicity

        let results = futures::future::join_all(prepare_futures).await;

        for result in results {
            match result {
                Ok(response) => {
                    let promise = response.into_inner();
                    if promise.promised {
                        promises += 1;
                        // 如果 Acceptor 返回了之前接受过的值,我们需要采纳那个值
                        if promise.last_accepted_id > highest_accepted_id {
                            highest_accepted_id = promise.last_accepted_id;
                            value_to_propose = promise.last_accepted_value;
                        }
                    }
                }
                Err(e) => tracing::error!("Failed to send Prepare to a peer: {}", e),
            }
        }

        // 2. Accept 阶段
        if promises < self.quorum_size {
            return Err(format!("Failed to get quorum in Prepare phase. Promises: {}", promises));
        }

        tracing::info!("Quorum reached ({} promises). Moving to Accept phase with value '{}'", promises, value_to_propose);

        let mut accepts = 0;
        let mut accept_futures = Vec::new();
        for peer in &mut self.peers {
            let request = tonic::Request::new(AcceptRequest {
                proposal_id: unique_proposal_id,
                value: value_to_propose.clone(),
            });
            accept_futures.push(peer.accept(request));
        }

        // 加上自己本地的接受
        accepts += 1; 

        let results = futures::future::join_all(accept_futures).await;
        for result in results {
            match result {
                Ok(response) => {
                    if response.into_inner().accepted {
                        accepts += 1;
                    }
                }
                Err(e) => tracing::error!("Failed to send Accept to a peer: {}", e),
            }
        }

        if accepts >= self.quorum_size {
            tracing::info!("Proposal {} succeeded with {} accepts!", unique_proposal_id, accepts);
            Ok(value_to_propose) // 返回最终达成共识的值
        } else {
            Err(format!("Failed to get quorum in Accept phase. Accepts: {}", accepts))
        }
    }
}

这里的错误处理和重试逻辑都做了简化,一个生产级的实现需要处理网络分区、节点超时、以及更复杂的提议ID生成策略来避免冲突。

第二步:集成 xDS 和管理 API

现在我们将 Paxos 逻辑封装到一个统一的服务中。这个服务需要监听两个端口:一个用于节点间 Paxos 通信,另一个用于对外的 xDS 和管理 API。

graph TD
    subgraph "管理"
        PWA -->|HTTP/gRPC-Web| MgmtAPI[Management API]
    end

    subgraph "Paxos 集群 (3个节点)"
        Node1[Rust Service 1]
        Node2[Rust Service 2]
        Node3[Rust Service 3]
        Node1 <-->|Paxos gRPC| Node2
        Node2 <-->|Paxos gRPC| Node3
        Node3 <-->|Paxos gRPC| Node1
    end
    
    subgraph "Envoy 代理"
        Envoy1[Envoy]
        Envoy2[Envoy]
    end

    MgmtAPI -- "Propose New Config" --> Node1
    Node1 -- "ECDS over gRPC" --> Envoy1
    Node2 -- "ECDS over gRPC" --> Envoy2
    Node3 -- "ECDS over gRPC" --> Envoy1

我们将把最终达成共识的值存储在内存中,并通过 ECDS (Extension Config Discovery Service) API 推送给 Envoy。

// src/server.rs
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tonic::{transport::Server, Request, Response, Status};

// ... 导入 Paxos 相关模块和 protobuf 生成的代码

// 代表最终达成共识并推送给 Envoy 的配置
// 使用 watch channel 来通知所有等待的 xDS 连接
type ConsensusState = Arc<watch::Sender<String>>;

// 管理 API 的 gRPC 服务
struct ManagementService {
    // 引用 Proposer,用于发起新的提议
    proposer: Arc<Mutex<Proposer>>,
    // 引用共识状态的发送端,用于更新值
    state_tx: ConsensusState,
}

#[tonic::async_trait]
impl Management for ManagementService {
    async fn update_config(&self, request: Request<UpdateConfigRequest>) -> Result<Response<UpdateConfigResponse>, Status> {
        let new_value = request.into_inner().config_value;
        let mut proposer = self.proposer.lock().unwrap();

        match proposer.propose(new_value).await {
            Ok(committed_value) => {
                // 提议成功,更新共享状态
                self.state_tx.send(committed_value.clone()).unwrap();
                Ok(Response::new(UpdateConfigResponse {
                    success: true,
                    committed_value,
                }))
            }
            Err(e) => {
                Err(Status::internal(format!("Paxos proposal failed: {}", e)))
            }
        }
    }
}

// xDS ECDS 服务
struct EcdsService {
    // 订阅共识状态的变化
    state_rx: watch::Receiver<String>,
}

#[tonic::async_trait]
impl ExtensionConfigDiscoveryService for EcdsService {
    // ... 实现 stream_extension_configs 方法
    // 当 state_rx 接收到新值时,构建 Any 类型的 protobuf 消息并发送给 Envoy
}

// 主函数,启动所有服务
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // ... 初始化日志、解析命令行参数获取节点ID和Peers地址

    // 启动 Paxos Acceptor gRPC 服务
    let acceptor_service = PaxosAcceptor::new(node_id.clone());
    // ...

    // 创建到其他节点的 gRPC 客户端连接
    let mut peer_clients = Vec::new();
    // ...

    // 创建 Proposer
    let proposer = Arc::new(Mutex::new(Proposer::new(peer_clients, node_id_u64)));

    // 创建共享状态
    let (state_tx, state_rx) = watch::channel("initial_default_config".to_string());
    let shared_state_tx = Arc::new(state_tx);

    // 启动管理和 xDS 服务
    let mgmt_service = ManagementService {
        proposer: proposer.clone(),
        state_tx: shared_state_tx,
    };
    let ecds_service = EcdsService { state_rx };
    
    // ... 在不同端口上启动两个 gRPC 服务器
    
    Ok(())
}

第三步:构建与工具链

一个健壮的项目离不开可靠的构建和部署流程。我们使用 Docker 进行容器化。

Dockerfile:

# --- Builder Stage ---
FROM rust:1.73 AS builder

WORKDIR /usr/src/app
COPY . .

# 使用 cargo-chef 进行依赖缓存,加速后续构建
RUN cargo install cargo-chef
RUN cargo chef prepare --recipe-path recipe.json
RUN cargo chef cook --release --recipe-path recipe.json

# 构建应用
RUN cargo build --release --bin paxos-config-store

# --- Runtime Stage ---
FROM debian:bullseye-slim

# 创建非 root 用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

WORKDIR /app

# 从 builder stage 复制编译好的二进制文件
COPY --from=builder /usr/src/app/target/release/paxos-config-store .

# 暴露 gRPC 端口
EXPOSE 50051 50052

USER appuser
CMD ["./paxos-config-store"]

这个多阶段 Dockerfile 首先在一个包含完整 Rust 工具链的镜像中编译项目,然后将最终的二进制文件复制到一个极小的 debian-slim 镜像中。这显著减小了最终镜像的体积,并减少了潜在的安全攻击面。

使用 docker-compose.yml 启动一个三节点集群:

version: '3.8'

services:
  paxos-node1:
    build: .
    command: --node-id node1 --port 50051 --peers "paxos-node2:50051,paxos-node3:50051"
    ports:
      - "9091:9090" # Expose Mgmt/xDS port
  
  paxos-node2:
    build: .
    command: --node-id node2 --port 50051 --peers "paxos-node1:50051,paxos-node3:50051"
  
  paxos-node3:
    build: .
    command: --node-id node3 --port 50051 --peers "paxos-node1:50051,paxos-node2:50051"

  envoy:
    image: envoyproxy/envoy:v1.27.0
    volumes:
      - ./envoy.yaml:/etc/envoy/envoy.yaml
    ports:
      - "10000:10000"
    command: /usr/local/bin/envoy -c /etc/envoy/envoy.yaml --service-cluster my-envoy-cluster

envoy.yaml 的关键部分是配置 config_discovery,使其指向我们的 Rust 服务集群。

static_resources:
  # ...
  clusters:
  - name: xds_cluster
    type: STRICT_DNS
    connect_timeout: 0.25s
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: xds_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: paxos-node1 # Docker DNS
                port_value: 9090

dynamic_resources:
  cds_config:
    # ...
  ads_config:
    api_type: GRPC
    transport_api_version: V3
    grpc_services:
      - envoy_grpc:
          cluster_name: xds_cluster

这里为了简化,我们只指向一个节点,但在生产环境中会配置所有节点地址。

第四步:PWA 作为操作界面

PWA 的作用是提供一个轻量级的、无需安装的管理界面。它的核心功能是通过 gRPC-Web 调用我们暴露的管理 API。

// pwa/src/api.js

import { ManagementClient } from './generated/management_grpc_web_pb.js';
import { UpdateConfigRequest } from './generated/management_pb.js';

// gRPC-Web 客户端指向 Envoy 代理(或直接指向 Rust 服务,如果配置了 CORS)
const client = new ManagementClient('http://localhost:9091');

export function updateConfig(newValue) {
  return new Promise((resolve, reject) => {
    const request = new UpdateConfigRequest();
    request.setConfigValue(newValue);

    client.updateConfig(request, {}, (err, response) => {
      if (err) {
        console.error(`gRPC error: ${err.code}, ${err.message}`);
        return reject(err);
      }
      
      if (response.getSuccess()) {
        resolve(response.getCommittedValue());
      } else {
        reject(new Error('Update failed on server.'));
      }
    });
  });
}

这个 PWA 可以使用 Service Worker 缓存静态资源,实现离线访问。当运维人员在界面上提交一个新的配置字符串时,updateConfig 函数会被调用,向集群中的一个节点发起提议。由于 Paxos 的保证,无论请求发送到哪个节点,最终集群都会对这个值达成共لي。成功后,所有连接到 xDS 服务的 Envoy 实例都会收到这个更新。

局限与展望

这个实现是一个概念验证,距离生产环境还有很长的路。

  1. Paxos 实现简化: 我们没有实现 Leader 选举(这在 Multi-Paxos 中至关重要以避免活锁和提高性能),也没有实现日志压缩和快照,长时间运行会导致日志无限增长。节点成员变更(增加或移除节点)也未被处理。
  2. 持久化: Acceptor 的状态仅在内存中,节点重启后会丢失所有状态。生产系统必须将 promised_id, accepted_id, accepted_value 持久化到磁盘。
  3. xDS 服务不完整: 一个完整的 xDS 服务需要处理 ACK/NACK、版本号、以及更复杂的资源发现流程。我们的实现仅仅是单向推送。
  4. 构建与部署: 虽然使用了 Docker,但没有涉及 Kubernetes 部署、健康检查、可观测性(Metrics, Tracing)等生产环境必备的元素。

尽管存在这些简化,这个过程展示了如何将一个核心的分布式算法(Paxos)用一种现代、安全的系统编程语言(Rust)实现,并将其集成到一个云原生生态(Envoy, gRPC)中,解决一个具体的、有价值的工程问题。下一步的迭代方向将是引入一个成熟的 Raft 库(如 async-raft),或者在我们当前的 Paxos 实现上补全 Leader 选举和日志持久化,并构建更完善的 xDS 服务端逻辑。


  目录