构建基于 Elixir 与 AWS Lambda 的事件溯源架构以驱动实时 React Native 应用


我们团队面临一个棘手的技术挑战:开发一款支持多人实时协作的移动应用。核心功能类似一个共享的白板,用户的每一个操作(绘制、移动、删除元素)都需要近乎实时地同步给所有在线的协作者。同时,应用必须支持离线操作,在网络恢复后能自动同步状态,且不能产生数据冲突。

在第一个 Sprint 的技术预研阶段,我们迅速排除了传统的基于 CRUD API 和轮询的方案。这种方法在处理高并发写入和状态一致性时会变得异常复杂,极易出现竞态条件,并且对客户端和服务器的资源消耗巨大。我们需要一个更稳健、更具扩展性的模型。

最终,我们决定采用事件溯源(Event Sourcing, ES)作为核心架构模式。在这种模式下,我们不存储应用当前的状态,而是将导致状态变更的每一个“事件”作为事实(source of truth)持久化下来。应用的状态可以通过重放这些事件流来随时重建。这个模型天然地解决了我们的核心痛点:

  1. 审计与追溯: 拥有完整的事件历史,可以轻松回溯到任意时间点的状态。
  2. 离线同步: 客户端在离线时可以将操作暂存为事件队列,在线后一次性提交。
  3. 状态重建与多视图: 可以基于同一事件流构建出多个不同的“读取模型”(Read Model),以满足不同的查询需求(CQRS 模式)。

技术栈的选择同样关键。我们需要一个能够处理高并发事件流、容错性强且运维成本低的后端,以及一个高效的跨平台移动端框架。我们的最终决策是:

  • 后端语言: Elixir。基于 BEAM 虚拟机,其轻量级进程和强大的并发模型(Actor Model)是处理海量独立事件的绝佳工具。其“let it crash”的哲学也为我们构建高容错系统提供了信心。
  • 运行环境: AWS Lambda。事件驱动的本质与 Serverless 完美契合。每个事件都可以触发一个独立的 Lambda 执行,实现了极致的弹性伸缩和成本效益。
  • 事件存储: Amazon DynamoDB。其单毫秒级延迟、强大的扩展性以及与 Lambda 的原生集成(DynamoDB Streams)使其成为事件存储的理想选择。
  • 移动端: React Native。跨平台能力可以显著加速我们在 Scrum 迭代中的交付速度。

整个项目在 Scrum 框架下推进。我们将架构的搭建和验证分解到多个 Sprint 中,每个 Sprint 都有明确的可交付增量,例如 “实现事件的原子性写入” 或 “构建第一个读取模型投影”。

架构核心:事件的写入与存储

系统的入口是一个 API Gateway,它接收来自 React Native 客户端的“命令”(Command)。这些命令代表用户的意图,例如 CreateElementMoveElement。API Gateway 将命令转发给我们的第一个核心 Lambda 函数:Command-Handler

这个函数负责验证命令、将其转换为一个或多个不可变的“事件”(Event),然后原子性地将事件写入 DynamoDB。在真实项目中,原子性写入至关重要,我们使用 DynamoDB 的 TransactWriteItems 操作来保证同一聚合(Aggregate,此处指白板上的单个元素)的多个事件要么全部成功,要么全部失败。

Command-Handler 的 Elixir 实现,使用了 AWS Elixir SDK。为了在 Lambda 中运行 Elixir,我们利用了社区提供的 Custom Runtime。

# lib/command_handler.ex
defmodule App.CommandHandler do
  use Lambda.Handler
  require Logger

  # 假设我们已经配置好了 AWS SDK 和 JSON 编解码库
  alias Aws.DynamoDB
  
  @table_name System.get_env("EVENTS_TABLE_NAME")

  @impl Lambda.Handler
  def handle(event, _context) do
    # 在生产环境中,这里的解析和验证会复杂得多
    # 需要处理不同类型的命令,并进行权限校验
    with {:ok, command} <- Jason.decode(event["body"]),
         {:ok, validated_command} <- validate_command(command),
         {:ok, events} <- command_to_events(validated_command) do

      persist_events(events)
    else
      {:error, reason} ->
        Logger.error("Failed to process command: #{inspect(reason)}")
        # 返回标准的 API Gateway 错误响应
        %{statusCode: 400, body: Jason.encode!(%{error: reason})}
    end
  end

  defp validate_command(command) do
    # 真实的验证逻辑:检查字段、权限、业务规则等
    # 这里为了简化,我们只检查 command_id 是否存在
    case Map.get(command, "command_id") do
      nil -> {:error, "Missing command_id"}
      _ -> {:ok, command}
    end
  end

  defp command_to_events(%{"type" => "CreateElement", "payload" => payload} = command) do
    # 将命令转换为事件。事件应该包含所有必要的信息来重建状态。
    event = %{
      event_id: UUID.uuid4(),
      aggregate_id: payload["element_id"], # 聚合根 ID,即元素 ID
      type: "ElementCreated",
      version: 1, # 对于新创建的聚合,版本号总是 1
      payload: payload,
      timestamp: DateTime.utc_now() |> DateTime.to_iso8601(),
      causation_id: command["command_id"] # 记录导致此事件的命令ID,用于追踪
    }
    {:ok, [event]}
  end
  # ... 其他命令到事件的转换逻辑 ...

  defp persist_events(events) do
    # 使用 TransactWriteItems 保证原子性
    # 这是事件溯源实现中最关键的部分之一
    requests = Enum.map(events, fn event ->
      %{
        put: %{
          table_name: @table_name,
          item: Jason.encode!(event),
          # 关键:乐观锁。确保我们是在最新的聚合版本上追加事件
          # 如果 aggregate_id 不存在(即新聚合),则写入成功
          # 如果已存在,其 version 必须是 event.version - 1
          condition_expression: "attribute_not_exists(aggregate_id) OR version = :expected_version",
          expression_attribute_values: %{
            ":expected_version" => %{n: to_string(event.version - 1)}
          }
        }
      }
    end)
    
    case DynamoDB.transact_write_items(requests) do
      {:ok, _response} ->
        Logger.info("Successfully persisted #{length(events)} events.")
        # 成功响应
        %{statusCode: 202, body: Jason.encode!(%{message: "Accepted"})}
      {:error, %{__type: "TransactionCanceledException"} = reason} ->
        # 这通常意味着版本冲突,需要客户端重试
        Logger.warning("Transaction canceled, likely a concurrency conflict: #{inspect(reason)}")
        %{statusCode: 409, body: Jason.encode!(%{error: "Conflict, please retry."})}
      {:error, reason} ->
        # 其他持久化错误,需要告警
        Logger.error("Failed to persist events: #{inspect(reason)}")
        %{statusCode: 500, body: Jason.encode!(%{error: "Internal server error"})}
    end
  end
end

这里的坑在于 condition_expression。这是实现乐观并发控制的核心。它确保了只有当事件的版本号能够正确地接续在现有事件流之后时,写入才会成功。如果两个用户同时操作同一个元素,只有一个会成功,另一个会收到 409 Conflict 错误,客户端需要基于最新的状态重试其操作。

状态投影:构建读取模型

事件被写入 DynamoDB 后,我们如何获得可供客户端查询的当前状态?直接从事件流中实时计算是不可行的,尤其当事件数量增长时。这里,我们利用 DynamoDB Streams 和第二个 Lambda 函数 Event-Projector 来解决。

events 表有新的写入时,DynamoDB Streams 会捕获这些变更,并异步触发 Event-Projector Lambda。这个函数唯一的职责就是读取新事件,更新一个或多个专门用于查询的“读取模型”(Read Model)。

我们将读取模型也存储在 DynamoDB 的一个单独的表中,例如 whiteboard_state。这张表的结构被优化为查询友好型,例如以白板 ID 为分区键,可以直接获取整个白板的当前所有元素状态。

graph TD
    subgraph "客户端 (React Native)"
        A[User Action] --> B{Command};
    end

    B --> C[API Gateway];
    C --> D[Lambda: Command-Handler];
    D -- 1. Validate & Convert --> E{Event};
    E -- 2. Persist with Optimistic Locking --> F[(DynamoDB: Event Store)];
    
    subgraph "异步投影流程"
        F -- 3. DynamoDB Stream --> G[Lambda: Event-Projector];
        G -- 4. Apply Event --> H[(DynamoDB: Read Model)];
    end

    subgraph "实时通知"
        G -- 5. Publish Update --> I[AWS IoT Core];
    end
    
    I -- 6. Push Update via WebSocket --> J[React Native Client];
    
    subgraph "查询流程"
        K[React Native Client] -- Query Current State --> L[API Gateway];
        L --> M[Lambda: Query-Handler];
        M -- Reads from --> H;
    end

Event-Projector 的 Elixir 代码片段:

# lib/event_projector.ex
defmodule App.EventProjector do
  use Lambda.Handler
  require Logger

  alias Aws.DynamoDB
  alias Aws.IoTDataPlane
  
  @state_table_name System.get_env("STATE_TABLE_NAME")
  @iot_topic_prefix "whiteboard/"

  @impl Lambda.Handler
  def handle(event, _context) do
    # DynamoDB Stream 的事件体结构比较复杂
    # 我们需要从 event["Records"] 中提取出新的事件数据
    Enum.each(event["Records"], fn record ->
      # 只处理 INSERT 事件
      if record["eventName"] == "INSERT" do
        # new_image 是一个 DynamoDB AttributeValue Map, 需要 unmarshall
        new_image = record["dynamodb"]["NewImage"]
        # 假设有一个工具函数来转换 DynamoDB 格式为 Elixir Map
        event_data = unmarshall(new_image)
        
        process_event(event_data)
      end
    end)
    
    {:ok, "Projection successful"}
  end

  defp process_event(%{"type" => "ElementCreated", "payload" => payload} = event) do
    element_id = event["aggregate_id"]
    whiteboard_id = payload["whiteboard_id"]

    # 读取模型:我们直接将元素状态写入 state 表
    # Key: PK=WHITEBOARD#{whiteboard_id}, SK=ELEMENT#{element_id}
    item = %{
      "PK" => whiteboard_id,
      "SK" => element_id,
      "type" => "element",
      "x" => payload["x"],
      "y" => payload["y"],
      "shape" => payload["shape"],
      "last_updated" => event["timestamp"]
    }

    update_read_model_and_notify(whiteboard_id, item)
  end

  defp process_event(%{"type" => "ElementMoved", "payload" => payload} = event) do
    # 对于更新操作,我们需要先读取当前状态
    # 但更高效的方式是直接更新特定字段
    element_id = event["aggregate_id"]
    whiteboard_id = payload["whiteboard_id"] # 事件中需要冗余一些关联信息

    update_expression = "SET x = :x, y = :y, last_updated = :ts"
    expression_attribute_values = %{
      ":x" => payload["new_x"],
      ":y" => payload["new_y"],
      ":ts" => event["timestamp"]
    }
    
    # 同样可以更新读取模型并通知
    # ... 实现省略 ...
  end
  # ... 其他事件处理逻辑 ...


  defp update_read_model_and_notify(whiteboard_id, item) do
    case DynamoDB.put_item(@state_table_name, item) do
      {:ok, _} ->
        Logger.info("Read model updated for whiteboard: #{whiteboard_id}")
        # 投影成功后,通过 AWS IoT Core (MQTT over WebSocket) 推送实时更新
        notify_clients(whiteboard_id, item)
      {:error, reason} ->
        # 关键:错误处理。如果投影失败,DynamoDB Stream 会自动重试。
        # 多次失败后,事件会进入我们配置的 DLQ (Dead Letter Queue),
        # 以便进行手动干预,避免数据丢失。
        Logger.error("Failed to update read model: #{inspect(reason)}")
        # 重新抛出异常,触发 Lambda 的重试机制
        raise "Projection failed for whiteboard: #{whiteboard_id}"
    end
  end

  defp notify_clients(whiteboard_id, payload) do
    topic = @iot_topic_prefix <> whiteboard_id
    case IoTDataPlane.publish(topic, Jason.encode!(payload)) do
      {:ok, _} -> :ok
      {:error, reason} -> 
        # 通知失败是次要问题,不应影响主流程。记录日志即可。
        Logger.warning("Failed to notify clients on topic #{topic}: #{inspect(reason)}")
    end
  end
end

这个投影过程是整个架构的另一个核心。它是异步且最终一致的。这意味着从写入事件到读取模型更新之间存在短暂的延迟。对于我们的协作应用,这种毫秒级的延迟是完全可以接受的。

React Native 客户端的实现思路

客户端的责任同样重大。

  1. 命令发送与重试: 用户操作被封装成命令,通过 HTTPS 请求发送给 API Gateway。我们实现了一个命令队列,当网络不可用时,命令被暂存在本地(如使用 AsyncStorage)。网络恢复后,队列中的命令被依次发送。每个命令都有一个唯一的 ID,以支持后端的幂等性处理。

  2. 状态管理与实时更新: 客户端使用状态管理库(如 Redux 或 Zustand)来维护本地的白板状态。它通过 WebSocket 连接到 AWS IoT Core,订阅对应白板 ID 的 Topic。当 Event-Projector 推送更新时,客户端接收到新的元素状态,并直接更新本地 store,触发 UI 的重新渲染。

  3. 乐观更新: 为了提升用户体验,当用户执行一个操作时,我们可以立即在本地 UI 上应用这个变更(乐观更新),同时将命令发送到后端。如果后端返回成功,一切照旧。如果返回冲突(409),我们需要回滚本地的乐观更新,并使用从 IoT Core 推送过来的最新状态来覆盖本地状态。

以下是一个简化的 React Native 端服务,用于处理与后端的交互:

// services/WhiteboardService.js
import { API } from 'aws-amplify';
import { PubSub } from 'aws-amplify';
import AsyncStorage from '@react-native-async-storage/async-storage';
import { v4 as uuidv4 } from 'uuid';

const COMMAND_QUEUE_KEY = 'command_queue';

class WhiteboardService {
  constructor(whiteboardId, onStateUpdate) {
    this.whiteboardId = whiteboardId;
    this.apiName = 'WhiteboardAPI'; // 在 Amplify 中配置的 API 名称
    this.subscription = null;
    this.onStateUpdate = onStateUpdate;
  }

  // 订阅实时更新
  subscribeToUpdates() {
    const topic = `whiteboard/${this.whiteboardId}`;
    this.subscription = PubSub.subscribe(topic).subscribe({
      next: data => {
        // 收到来自后端的投影更新
        console.log('Received real-time update:', data.value);
        this.onStateUpdate(data.value);
      },
      error: error => console.error(error),
      complete: () => console.log('Subscription done'),
    });
  }

  unsubscribe() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }

  // 发送命令,支持离线队列
  async sendCommand(type, payload) {
    const command = {
      command_id: uuidv4(),
      whiteboard_id: this.whiteboardId,
      type,
      payload,
      timestamp: new Date().toISOString(),
    };

    try {
      // 这里的 API.post 会调用 API Gateway -> Command-Handler Lambda
      const response = await API.post(this.apiName, '/commands', {
        body: command,
      });
      console.log('Command accepted:', response);
      this.processQueuedCommands(); // 发送成功后,尝试处理队列中的其他命令
      return { success: true };
    } catch (error) {
      console.error('Failed to send command:', error);
      // 网络错误或其他临时性问题,将命令加入队列
      if (error.response?.status !== 409 && error.response?.status !== 400) {
        await this.queueCommand(command);
      }
      return { success: false, error };
    }
  }

  async queueCommand(command) {
    const queue = JSON.parse(await AsyncStorage.getItem(COMMAND_QUEUE_KEY) || '[]');
    queue.push(command);
    await AsyncStorage.setItem(COMMAND_QUEUE_KEY, JSON.stringify(queue));
  }
  
  // 在应用启动或网络恢复时调用
  async processQueuedCommands() {
    let queue = JSON.parse(await AsyncStorage.getItem(COMMAND_QUEUE_KEY) || '[]');
    if (queue.length === 0) return;
    
    const commandToSend = queue.shift();
    await AsyncStorage.setItem(COMMAND_QUEUE_KEY, JSON.stringify(queue));
    
    // 重新发送
    await this.sendCommand(commandToSend.type, commandToSend.payload);
  }
}

export default WhiteboardService;

局限性与未来迭代方向

这套架构并非银弹。在 Sprint 的回顾会议中,我们也识别出了一些潜在的复杂性和需要持续关注的点。

首先,Elixir on Lambda 的冷启动问题是真实存在的。对于需要极低延迟的交互,首次调用的性能可能不理想。我们通过配置 Lambda 的 Provisioned Concurrency 在一定程度上缓解了这个问题,但这带来了额外的成本。一个潜在的优化是,对于写操作,或许可以考虑使用常驻的 Fargate 服务而非 Lambda,只将异步的、非关键路径的投影工作放在 Lambda 中。

其次,事件模型的设计和演进是一个长期挑战。一旦事件被写入,它们就是不可变的。如果业务逻辑变化导致需要新的事件结构,我们就必须处理版本兼容问题。通常采用的策略是版本化事件,并在投影逻辑中处理不同版本的事件,但这无疑增加了代码的复杂性。

最后,对于具有极长生命周期的聚合(例如一个使用了数年的白板),从头重放所有事件来重建状态会变得非常缓慢。虽然我们的读取模型避免了查询时的重放,但在调试、迁移或重建新的读取模型时,这个问题会暴露出来。未来的迭代计划中,我们已经加入了一个技术故事:实现“快照”(Snapshotting)机制。即定期将聚合的当前状态保存为一个快照,在重建时,只需从最新的快照开始,重放后续的少量事件即可。


  目录