管理一组 Envoy 实例的动态配置,尤其是在需要强一致性状态的场景下,是一个棘手的挑战。例如,为一个自定义的鉴权 Filter 动态更新全局唯一的 Nonce 列表,或者为一个全局限流 Filter 同步集群范围内的令牌桶状态。使用 Redis 或类似方案会引入单点故障,或在主从切换时面临数据一致性的风险。我们需要的,是一个内嵌在配置服务中的、去中心化的、容错的一致性存储。
问题的核心归结为分布式共识。我们的初步构想是构建一个轻量级的 gRPC 服务,它既是 Envoy 的 xDS 服务器,也通过节点间通信维护一个高可用的状态机。这自然地引向了 Paxos 算法。尽管 Raft 在工程实现上更受欢迎,但从头实现一个经典的 Paxos 不仅能提供所需的容错能力,更能迫使我们深入理解分布式系统中最核心的协议。
技术选型上,Rust 是构建这类基础设施组件的不二之选。它的内存安全和并发模型能从根本上消除 целый класс 的运行时错误,而其高性能的异步生态(Tokio)和 gRPC 框架(Tonic)则为实现网络服务提供了坚实的基础。
整个系统由三个主要部分组成:
- Paxos 共识核心 (Rust): 一个实现了 Multi-Paxos 协议的库,负责在多个节点间同步状态。
- xDS 配置服务 (Rust/gRPC): 封装 Paxos 核心,对外提供 gRPC 接口。其中一个接口遵循 Envoy ECDS (Extension Config Discovery Service) 规范,用于向 Envoy 推送动态配置;另一个接口则用于接收来自管理端的配置更新请求。
- 管理界面 (PWA): 一个简单的 PWA 应用,允许运维人员查看当前集群状态并提交新的配置。
第一步:Rust 实现 Paxos 核心状态机
我们不追求实现一个完备的 Paxos 库,而是聚焦于核心的 Proposer 和 Acceptor 逻辑。我们将实现一个简化的 Multi-Paxos,其中一个节点被隐式地认为是 Leader,以简化提议流程,但在真实项目中,这需要一个完整的 Leader 选举机制。
我们的状态将是一个简单的 String
,代表要推送给 Envoy Filter 的配置。
首先,定义 Cargo.toml
的关键依赖:
[package]
name = "paxos-config-store"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = "0.10"
prost = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
接下来是 Paxos 协议中的核心数据结构。我们将通过 gRPC 在节点间传递这些消息。
paxos.proto
定义:
syntax = "proto3";
package paxos;
// 提议,由 Proposer 发出
message PrepareRequest {
uint64 proposal_id = 1; // 提议编号
}
// 承诺,由 Acceptor 回复
message PromiseResponse {
bool promised = 1;
uint64 last_accepted_id = 2;
string last_accepted_value = 3;
}
// 接受请求,由 Proposer 在收到足够承诺后发出
message AcceptRequest {
uint64 proposal_id = 1;
string value = 2;
}
// 已接受,由 Acceptor 回复
message AcceptedResponse {
bool accepted = 1;
}
service PaxosNode {
rpc Prepare(PrepareRequest) returns (PromiseResponse);
rpc Accept(AcceptRequest) returns (AcceptedResponse);
}
现在,我们在 Rust 中实现 Acceptor
的逻辑。Acceptor
是 Paxos 中的“投票者”,它需要持久化一些状态。
// src/paxos/acceptor.rs
use std::sync::{Arc, Mutex};
use tonic::{Request, Response, Status};
use crate::paxos_proto::{
paxos_node_server::PaxosNode, PrepareRequest, PromiseResponse, AcceptRequest, AcceptedResponse,
};
// Acceptor 需要持久化的状态
#[derive(Debug, Clone, Default)]
pub struct AcceptorState {
// 承诺过的最高提议ID
promised_id: u64,
// 已接受的最高提议ID
accepted_id: u64,
// 已接受的提议值
accepted_value: String,
}
// Acceptor 服务实现
#[derive(Debug)]
pub struct PaxosAcceptor {
// 使用 Mutex 保护状态,因为 gRPC 服务是多线程的
state: Arc<Mutex<AcceptorState>>,
// 节点ID,用于日志
node_id: String,
}
impl PaxosAcceptor {
pub fn new(node_id: String) -> Self {
Self {
state: Arc::new(Mutex::new(AcceptorState::default())),
node_id,
}
}
}
#[tonic::async_trait]
impl PaxosNode for PaxosAcceptor {
// 处理 Prepare 请求
async fn prepare(
&self,
request: Request<PrepareRequest>,
) -> Result<Response<PromiseResponse>, Status> {
let proposal_id = request.into_inner().proposal_id;
tracing::info!("[{}] Received Prepare request with ID: {}", self.node_id, proposal_id);
let mut state = self.state.lock().unwrap();
if proposal_id > state.promised_id {
// 如果提议ID大于之前承诺过的ID,则更新承诺ID,并返回承诺
state.promised_id = proposal_id;
tracing::info!("[{}] Promising for ID: {}", self.node_id, proposal_id);
Ok(Response::new(PromiseResponse {
promised: true,
last_accepted_id: state.accepted_id,
last_accepted_value: state.accepted_value.clone(),
}))
} else {
// 否则,拒绝承诺
tracing::warn!("[{}] Rejecting Prepare for ID: {} (already promised {})", self.node_id, proposal_id, state.promised_id);
Ok(Response::new(PromiseResponse {
promised: false,
last_accepted_id: state.accepted_id,
last_accepted_value: state.accepted_value.clone(),
}))
}
}
// 处理 Accept 请求
async fn accept(
&self,
request: Request<AcceptRequest>,
) -> Result<Response<AcceptedResponse>, Status> {
let req = request.into_inner();
let proposal_id = req.proposal_id;
let value = req.value;
tracing::info!("[{}] Received Accept request for ID: {} with value: '{}'", self.node_id, proposal_id, value);
let mut state = self.state.lock().unwrap();
// 只有当提议ID大于等于已承诺的ID时,才接受
if proposal_id >= state.promised_id {
state.promised_id = proposal_id;
state.accepted_id = proposal_id;
state.accepted_value = value;
tracing::info!("[{}] Accepted value for ID: {}", self.node_id, proposal_id);
// 在真实项目中,这里应该将状态持久化到磁盘
// e.g., fs.write("paxos_state.json", serde_json::to_string(&*state).unwrap())
Ok(Response::new(AcceptedResponse { accepted: true }))
} else {
tracing::warn!("[{}] Rejecting Accept for ID: {} (already promised {})", self.node_id, proposal_id, state.promised_id);
Ok(Response::new(AcceptedResponse { accepted: false }))
}
}
}
Proposer
的逻辑则更为复杂,它负责发起提议并驱动整个共识流程。
// src/paxos/proposer.rs
use crate::paxos_proto::paxos_node_client::PaxosNodeClient;
use crate::paxos_proto::{PrepareRequest, AcceptRequest};
use tonic::transport::Channel;
use std::time::{SystemTime, UNIX_EPOCH};
pub struct Proposer {
// 集群中其他节点的客户端连接
peers: Vec<PaxosNodeClient<Channel>>,
// 集群总节点数
quorum_size: usize,
// 提议者自己的ID,用于生成唯一的提议ID
node_id: u64,
}
impl Proposer {
pub fn new(peers: Vec<PaxosNodeClient<Channel>>, node_id: u64) -> Self {
let total_nodes = peers.len() + 1;
Self {
peers,
quorum_size: total_nodes / 2 + 1,
node_id,
}
}
// 发起一次提议
pub async fn propose(&mut self, value: String) -> Result<String, String> {
// 1. Prepare 阶段
// 生成一个全局唯一的、单调递增的提议ID
// 简单实现:时间戳 + 节点ID
let proposal_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let unique_proposal_id = (proposal_id << 16) | self.node_id;
tracing::info!("Starting proposal {} with value '{}'", unique_proposal_id, value);
let mut promises = 0;
let mut highest_accepted_id = 0;
let mut value_to_propose = value.clone();
let mut prepare_futures = Vec::new();
for peer in &mut self.peers {
let request = tonic::Request::new(PrepareRequest {
proposal_id: unique_proposal_id,
});
// 并行发送 Prepare 请求
prepare_futures.push(peer.prepare(request));
}
// 加上自己本地的承诺
promises += 1; // Assuming self-promise is always successful for simplicity
let results = futures::future::join_all(prepare_futures).await;
for result in results {
match result {
Ok(response) => {
let promise = response.into_inner();
if promise.promised {
promises += 1;
// 如果 Acceptor 返回了之前接受过的值,我们需要采纳那个值
if promise.last_accepted_id > highest_accepted_id {
highest_accepted_id = promise.last_accepted_id;
value_to_propose = promise.last_accepted_value;
}
}
}
Err(e) => tracing::error!("Failed to send Prepare to a peer: {}", e),
}
}
// 2. Accept 阶段
if promises < self.quorum_size {
return Err(format!("Failed to get quorum in Prepare phase. Promises: {}", promises));
}
tracing::info!("Quorum reached ({} promises). Moving to Accept phase with value '{}'", promises, value_to_propose);
let mut accepts = 0;
let mut accept_futures = Vec::new();
for peer in &mut self.peers {
let request = tonic::Request::new(AcceptRequest {
proposal_id: unique_proposal_id,
value: value_to_propose.clone(),
});
accept_futures.push(peer.accept(request));
}
// 加上自己本地的接受
accepts += 1;
let results = futures::future::join_all(accept_futures).await;
for result in results {
match result {
Ok(response) => {
if response.into_inner().accepted {
accepts += 1;
}
}
Err(e) => tracing::error!("Failed to send Accept to a peer: {}", e),
}
}
if accepts >= self.quorum_size {
tracing::info!("Proposal {} succeeded with {} accepts!", unique_proposal_id, accepts);
Ok(value_to_propose) // 返回最终达成共识的值
} else {
Err(format!("Failed to get quorum in Accept phase. Accepts: {}", accepts))
}
}
}
这里的错误处理和重试逻辑都做了简化,一个生产级的实现需要处理网络分区、节点超时、以及更复杂的提议ID生成策略来避免冲突。
第二步:集成 xDS 和管理 API
现在我们将 Paxos 逻辑封装到一个统一的服务中。这个服务需要监听两个端口:一个用于节点间 Paxos 通信,另一个用于对外的 xDS 和管理 API。
graph TD subgraph "管理" PWA -->|HTTP/gRPC-Web| MgmtAPI[Management API] end subgraph "Paxos 集群 (3个节点)" Node1[Rust Service 1] Node2[Rust Service 2] Node3[Rust Service 3] Node1 <-->|Paxos gRPC| Node2 Node2 <-->|Paxos gRPC| Node3 Node3 <-->|Paxos gRPC| Node1 end subgraph "Envoy 代理" Envoy1[Envoy] Envoy2[Envoy] end MgmtAPI -- "Propose New Config" --> Node1 Node1 -- "ECDS over gRPC" --> Envoy1 Node2 -- "ECDS over gRPC" --> Envoy2 Node3 -- "ECDS over gRPC" --> Envoy1
我们将把最终达成共识的值存储在内存中,并通过 ECDS (Extension Config Discovery Service) API 推送给 Envoy。
// src/server.rs
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tonic::{transport::Server, Request, Response, Status};
// ... 导入 Paxos 相关模块和 protobuf 生成的代码
// 代表最终达成共识并推送给 Envoy 的配置
// 使用 watch channel 来通知所有等待的 xDS 连接
type ConsensusState = Arc<watch::Sender<String>>;
// 管理 API 的 gRPC 服务
struct ManagementService {
// 引用 Proposer,用于发起新的提议
proposer: Arc<Mutex<Proposer>>,
// 引用共识状态的发送端,用于更新值
state_tx: ConsensusState,
}
#[tonic::async_trait]
impl Management for ManagementService {
async fn update_config(&self, request: Request<UpdateConfigRequest>) -> Result<Response<UpdateConfigResponse>, Status> {
let new_value = request.into_inner().config_value;
let mut proposer = self.proposer.lock().unwrap();
match proposer.propose(new_value).await {
Ok(committed_value) => {
// 提议成功,更新共享状态
self.state_tx.send(committed_value.clone()).unwrap();
Ok(Response::new(UpdateConfigResponse {
success: true,
committed_value,
}))
}
Err(e) => {
Err(Status::internal(format!("Paxos proposal failed: {}", e)))
}
}
}
}
// xDS ECDS 服务
struct EcdsService {
// 订阅共识状态的变化
state_rx: watch::Receiver<String>,
}
#[tonic::async_trait]
impl ExtensionConfigDiscoveryService for EcdsService {
// ... 实现 stream_extension_configs 方法
// 当 state_rx 接收到新值时,构建 Any 类型的 protobuf 消息并发送给 Envoy
}
// 主函数,启动所有服务
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// ... 初始化日志、解析命令行参数获取节点ID和Peers地址
// 启动 Paxos Acceptor gRPC 服务
let acceptor_service = PaxosAcceptor::new(node_id.clone());
// ...
// 创建到其他节点的 gRPC 客户端连接
let mut peer_clients = Vec::new();
// ...
// 创建 Proposer
let proposer = Arc::new(Mutex::new(Proposer::new(peer_clients, node_id_u64)));
// 创建共享状态
let (state_tx, state_rx) = watch::channel("initial_default_config".to_string());
let shared_state_tx = Arc::new(state_tx);
// 启动管理和 xDS 服务
let mgmt_service = ManagementService {
proposer: proposer.clone(),
state_tx: shared_state_tx,
};
let ecds_service = EcdsService { state_rx };
// ... 在不同端口上启动两个 gRPC 服务器
Ok(())
}
第三步:构建与工具链
一个健壮的项目离不开可靠的构建和部署流程。我们使用 Docker 进行容器化。
Dockerfile
:
# --- Builder Stage ---
FROM rust:1.73 AS builder
WORKDIR /usr/src/app
COPY . .
# 使用 cargo-chef 进行依赖缓存,加速后续构建
RUN cargo install cargo-chef
RUN cargo chef prepare --recipe-path recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
# 构建应用
RUN cargo build --release --bin paxos-config-store
# --- Runtime Stage ---
FROM debian:bullseye-slim
# 创建非 root 用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
# 从 builder stage 复制编译好的二进制文件
COPY /usr/src/app/target/release/paxos-config-store .
# 暴露 gRPC 端口
EXPOSE 50051 50052
USER appuser
CMD ["./paxos-config-store"]
这个多阶段 Dockerfile
首先在一个包含完整 Rust 工具链的镜像中编译项目,然后将最终的二进制文件复制到一个极小的 debian-slim
镜像中。这显著减小了最终镜像的体积,并减少了潜在的安全攻击面。
使用 docker-compose.yml
启动一个三节点集群:
version: '3.8'
services:
paxos-node1:
build: .
command: --node-id node1 --port 50051 --peers "paxos-node2:50051,paxos-node3:50051"
ports:
- "9091:9090" # Expose Mgmt/xDS port
paxos-node2:
build: .
command: --node-id node2 --port 50051 --peers "paxos-node1:50051,paxos-node3:50051"
paxos-node3:
build: .
command: --node-id node3 --port 50051 --peers "paxos-node1:50051,paxos-node2:50051"
envoy:
image: envoyproxy/envoy:v1.27.0
volumes:
- ./envoy.yaml:/etc/envoy/envoy.yaml
ports:
- "10000:10000"
command: /usr/local/bin/envoy -c /etc/envoy/envoy.yaml --service-cluster my-envoy-cluster
envoy.yaml
的关键部分是配置 config_discovery
,使其指向我们的 Rust 服务集群。
static_resources:
# ...
clusters:
- name: xds_cluster
type: STRICT_DNS
connect_timeout: 0.25s
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: paxos-node1 # Docker DNS
port_value: 9090
dynamic_resources:
cds_config:
# ...
ads_config:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
这里为了简化,我们只指向一个节点,但在生产环境中会配置所有节点地址。
第四步:PWA 作为操作界面
PWA 的作用是提供一个轻量级的、无需安装的管理界面。它的核心功能是通过 gRPC-Web 调用我们暴露的管理 API。
// pwa/src/api.js
import { ManagementClient } from './generated/management_grpc_web_pb.js';
import { UpdateConfigRequest } from './generated/management_pb.js';
// gRPC-Web 客户端指向 Envoy 代理(或直接指向 Rust 服务,如果配置了 CORS)
const client = new ManagementClient('http://localhost:9091');
export function updateConfig(newValue) {
return new Promise((resolve, reject) => {
const request = new UpdateConfigRequest();
request.setConfigValue(newValue);
client.updateConfig(request, {}, (err, response) => {
if (err) {
console.error(`gRPC error: ${err.code}, ${err.message}`);
return reject(err);
}
if (response.getSuccess()) {
resolve(response.getCommittedValue());
} else {
reject(new Error('Update failed on server.'));
}
});
});
}
这个 PWA 可以使用 Service Worker 缓存静态资源,实现离线访问。当运维人员在界面上提交一个新的配置字符串时,updateConfig
函数会被调用,向集群中的一个节点发起提议。由于 Paxos 的保证,无论请求发送到哪个节点,最终集群都会对这个值达成共لي。成功后,所有连接到 xDS 服务的 Envoy 实例都会收到这个更新。
局限与展望
这个实现是一个概念验证,距离生产环境还有很长的路。
- Paxos 实现简化: 我们没有实现 Leader 选举(这在 Multi-Paxos 中至关重要以避免活锁和提高性能),也没有实现日志压缩和快照,长时间运行会导致日志无限增长。节点成员变更(增加或移除节点)也未被处理。
- 持久化: Acceptor 的状态仅在内存中,节点重启后会丢失所有状态。生产系统必须将
promised_id
,accepted_id
,accepted_value
持久化到磁盘。 - xDS 服务不完整: 一个完整的 xDS 服务需要处理 ACK/NACK、版本号、以及更复杂的资源发现流程。我们的实现仅仅是单向推送。
- 构建与部署: 虽然使用了 Docker,但没有涉及 Kubernetes 部署、健康检查、可观测性(Metrics, Tracing)等生产环境必备的元素。
尽管存在这些简化,这个过程展示了如何将一个核心的分布式算法(Paxos)用一种现代、安全的系统编程语言(Rust)实现,并将其集成到一个云原生生态(Envoy, gRPC)中,解决一个具体的、有价值的工程问题。下一步的迭代方向将是引入一个成熟的 Raft 库(如 async-raft
),或者在我们当前的 Paxos 实现上补全 Leader 选举和日志持久化,并构建更完善的 xDS 服务端逻辑。